Adding soft delete to a Phoenix Commanded (CQRS) API


Part two in my series on Elixir’s Commanded library. Part one can be found here.

Context

In the previous post, I converted a vanilla Phoenix API to CQRS with Commanded.

This application writes to the database using the commanded_ecto_projections hex package, which subscribes to events and projects their state into tables managed by the Ecto database library.

Since the core data model of this application is an append-only (immutable) log, the events can be replayed and the read model can be dramatically changed using existing data.

Goal

Implement a soft delete in the API, allowing items to be restored after deletion.

Follow along with what I learned while iterating on a project named todo_backend_commanded. Its git history shows the steps involved to implement soft delete and restore functionality.

Updating the aggregate

(Link to relevant commit)

The first step is to add a deleted_at field to the Todo aggregate.

 defmodule TodoBackend.Todos.Aggregates.Todo do
   defstruct [
     :uuid,
     :title,
     :completed,
-    :order
+    :order,
+    deleted_at: nil
   ]

   ...
-  def apply(%Todo{uuid: uuid}, %TodoDeleted{uuid: uuid}) do
-    nil
-  end
+  def apply(%Todo{uuid: uuid} = todo, %TodoDeleted{uuid: uuid}) do
+    %Todo{todo | deleted_at: DateTime.utc_now()}
+  end

An aggregate is a struct representing the state of an object in an application. In Commanded, aggregates are hydrated in-memory from dispatched events. The aggregate struct is used to determine the side-effects of a command, if the command is to be accepted at all.

This diff causes the aggregate’s deleted_at field to be set as the datetime when the event log is hydrated. We’ll come back to this, since it would be preferable to capture the time the todo item was deleted.

Aside: aggregates sound slow

If an aggregate has to be hydrated from its events every time it is used, how is this not a terrible performance burden?

In practice, an individual aggregate shouldn’t have many events. How many interactions could one todo item possibly have? If it turns out certain items have large event logs, there are a couple of levers that can be pulled in Commanded.

Lifespans

(Link to relevant commit)

So, I lied told a half-truth about aggregates. They are not hydrated in-memory for every command / event. In reality, aggregates are implemented with GenServer each caching their state and being managed under the commanded application’s supervision tree (ultimately by a DynamicSupervisor called Commanded.Aggregates.Supervisor, to be specific).

Since aggregates are cached as long-running processes, we only have to fetch each event once.

The timeout of an aggregate process can be configured via an implementation of the AggregateLifespan behaviour. The default lifespan, DefaultLifespan, sets an infinite timeout unless an exception is raised.

Depending on the number of aggregate instances in a live system, this could introduce another problem—these processes could stay alive for the life of the server. That sounds like an OOM waiting to happen.

To address this, I implemented a lifespan for the Todo aggregate that keeps aggregates around for a minute—unless they receive the TodoDeleted event. Being able to pattern match and implement this within the context of an application means the timeouts can reflect the specific details of your domain.

Snapshots

For aggregates with many events, Commanded has aggregate state snapshots. These can be configured by adding a snapshotting section to the Commanded application in config/config.exs.

There are two options per aggregate: snapshot_every and snapshot_version. snapshot_every is used to cause snapshots to be created after a certain number of events. This value acts as an upper bound of events that must be evaluated when hydrating an aggregate state. snapshot_version is a non-negative integer that should be updated every time the aggregate is updated. It is used to invalidate old snapshots that may be missing new fields or were created with old apply methods.

Snapshots can be valuable in applications with long event logs, but didn’t seem to be worthwhile in this todo API. If we were using snapshots, the addition of the deleted_at field would have required an increment of snapshot_version.

To learn more about aggregate performance, check out this blog post where an organization dramatically improved the performance and footprint of their Commanded application through lifespans and snapshots.

Effective dating the delete event

(Link to relevant commit)

As I noted when adding the deleted_at field to the aggregate, it’s not appropriate to use the datetime when the event is applied. This would cause the deleted_at value to be different every time the service is deployed or the aggregate genserver is restarted. It would be much better to capture the datetime when the event is created, so it can be persisted in the event log.

First, I added a datetime field to the TodoDeleted event struct:

 defmodule TodoBackend.Todos.Events.TodoDeleted do
   @derive Jason.Encoder
   defstruct [
-    :uuid
+    :uuid,
+    :datetime
   ]
end

Next, I populated this value in the execute method of the todo aggregate, when the DeleteTodo command received. Since execute is called when a command is dispatched, this will be set to the datetime when the delete endpoint is called.

I also updated the apply method to use the datetime from the event, instead of the current datetime. This value will come from the event log.

 defmodule TodoBackend.Todos.Aggregates.Todo do
   ...
   def execute(%Todo{uuid: uuid}, %DeleteTodo{uuid: uuid}) do
-    %TodoDeleted{uuid: uuid}
+    %TodoDeleted{uuid: uuid, datetime: DateTime.utc_now()}
   end

   ...
-  def apply(%Todo{uuid: uuid} = todo, %TodoDeleted{uuid: uuid}) do
-    %Todo{todo | deleted_at: DateTime.utc_now()}
+  def apply(%Todo{uuid: uuid} = todo, %TodoDeleted{uuid: uuid, datetime: effective_datetime}) do
+    %Todo{todo | deleted_at: effective_datetime}
   end

Read your writes

The log stores events in JSON format. Since JSON does not have a native datetime format, the Commanded.Serialization.JsonDecoder protocol must be implemented on the event struct.

defmodule TodoBackend.Todos.Events.TodoDeleted do
  ...

  defimpl Commanded.Serialization.JsonDecoder, for: TodoDeleted do
    @doc """
    Parse the datetime included in the aggregate state
    """
    def decode(%TodoDeleted{} = state) do
      %TodoDeleted{datetime: datetime} = state

      {:ok, dt, _} = DateTime.from_iso8601(datetime)

      %TodoDeleted{state | datetime: dt}
    end
  end
end

What about existing events?

This is fine, but what about all of those events that are already in the log without datetimes?

Since the log is append-only and immutable, the way to extend an event definition is to add a fallback value for the new field.

In this case, I modified the JSON decoder method—substituting a nil value for the current datetime. Since the value was never written to old events, the actual deletion time is lost. The best thing we can do is provide some value and ensure we write a datetime on all future events.

 defmodule TodoBackend.Todos.Events.TodoDeleted do
   ...

   defimpl Commanded.Serialization.JsonDecoder, for: TodoDeleted do
     @doc """
     Parse the datetime included in the aggregate state
     """
     def decode(%TodoDeleted{} = state) do
       %TodoDeleted{datetime: datetime} = state
-
-      {:ok, dt, _} = DateTime.from_iso8601(datetime)
-
-      %TodoDeleted{state | datetime: dt}
+
+      if datetime == nil do
+        %TodoDeleted{state | datetime: DateTime.utc_now()}
+      else
+        {:ok, dt, _} = DateTime.from_iso8601(datetime)
+
+        %TodoDeleted{state | datetime: dt}
+      end
+    end
   end
 end

Updating the read model

(Link to relevant commit)

In a production environment it would be best to create a new database table and projector containing a deleted_at field, deploy the application to build the table, update callers to use the new table, and then delete the old table. This is safe to do because all writes happen in the event layer—the read model is read-only.

In some cases, it could also be reasonable to update the projector such that the data can be migrated in-place. This seemed like more effort than it was worth for a blog post, so I opted for the dangerous option: blow away the table and rebuild it with an updated projector.

Preparing the database

First, I generated a migration to add the deleted_at column to the todos table:

mix ecto.gen.migration add_deleted_at_to_todo

In the generated migration file, I added the column and an index:

defmodule TodoBackend.Repo.Migrations.AddDeletedAtToTodo do
  use Ecto.Migration

  def change do
    alter table(:todos) do
      add :deleted_at, :naive_datetime_usec
    end

    create index(:todos, [:deleted_at])
  end
end

Finally, I added the field to the projection:

 defmodule TodoBackend.Todos.Projections.Todo do
   ...
   schema "todos" do
     field :completed, :boolean, default: false
     field :title, :string
     field :order, :integer, default: 0
+    field :deleted_at, :naive_datetime_usec, default: nil

     timestamps()
   end
 end

After running mix ecto.migrate, the database side is ready to go.

Updating the projector

The projector update is very simple: instead of deleting the row from the table, just update the deleted_at column.

 defmodule TodoBackend.Todos.Projectors.Todo do
   ...
-  project(%TodoDeleted{uuid: uuid}, _, fn multi ->
-    Ecto.Multi.delete(multi, :todo, fn _ -> %Todo{uuid: uuid} end)
+  project(%TodoDeleted{uuid: uuid, datetime: effective_datetime}, _, fn multi ->
+    case Repo.get(Todo, uuid) do
+      nil ->
+        multi
+
+      todo ->
+        Ecto.Multi.update(
+          multi,
+          :todo,
+          Todo.delete_changeset(todo, %{deleted_at: effective_datetime})
+        )
+    end
+  end)
   ...
 end

I added a delete_changeset to the Todo projection to represent a specific desired change. It ensures we only update the deleted_at column. This is referenced in the new TodoDeleted projector method.

defmodule TodoBackend.Todos.Projections.Todo do
  ...

  def delete_changeset(todo, attrs \\ %{}) do
    todo
    |> cast(attrs, [:deleted_at])
  end
end

Forcefully re-populating the database

Like I mentioned before, it’s a terrible idea to truncate a database in production. Here’s how I did it locally to explore Commanded. It’s possible this could also be done by updating the name value provided to the use Commanded.Projections.Ecto call in TodoBackend.Todos.Projections.Todo and restarting the server.

If there is no existing data in the log, this step can be skipped.

In an iex shell (iex -S mix phx.server):

# Delete the projected values
TodoBackend.Repo.delete_all(TodoBackend.Todos.Projections.Todo)

# Remove the version tracking entry for the ecto projection
TodoBackend.Repo.delete(%TodoBackend.Todos.Projectors.Todo.ProjectionVersion{projection_name: "Todos.Projectors.Todo"})

alias Commanded.Event.Handler
alias Commanded.Registration

# Trigger a reset of the projector
registry_name = Handler.name(TodoBackend.App, "Todos.Projectors.Todo")
projector = Registration.whereis_name(TodoBackend.App, registry_name)
send(projector, :reset)

Hiding deleted items

Now that deleted items are present in the todos table, it’s important to update the Todos context module to filter out deleted items. This can be done in the context without needing any updates to the controller, since all access is encapsulated. As I noted in the previous post, this is a really valuable property of the DDD-inspired approach of modern Phoenix applications.

 defmodule TodoBackend.Todos do
   ...

   def list_todos do
-    Repo.all(Todo)
+    from(t in Todo,
+      where: is_nil(t.deleted_at)
+    )
+    |> Repo.all()
   end

   ...
-  def get_todo!(uuid), do: Repo.get_by!(Todo, uuid: uuid)
+  def get_todo!(uuid) do
+    from(t in Todo,
+      where: is_nil(t.deleted_at)
+    )
+    |> Repo.get_by!(uuid: uuid)
+  end
 end

Where are we, now?

Soft deletes have been implemented. The read model has been reconstructed from existing data, including previously-deleted items.

In CQRS applications, data never is really deleted—at most, it just gets removed from the read model.

Not legal advice on handling GDPR: some applications may choose to encrypt log entries with a per-entity encryption key in order to satisfy permanent data deletion requests. To fully delete data, the encryption key could be discarded. This would cause the events to be unreadable if they were ever replayed. Some special handling would be necessary to prevent the application from crashing on GDPR-deleted entities. This is very clearly out of scope for this blog post, but a very interesting concept.

Michiel Rook wrote two posts on this topic, which can be found here and here.

New feature: restore deleted items

Now that soft deletes are implemented, it’s time to write the new feature: un-delete!

Implementation

(Link to relevant commit)

Let’s quickly walk through the code changes required to implement the restoration of deleted todo items:

Create a RestoreTodo command

defmodule TodoBackend.Todos.Commands.RestoreTodo do
  defstruct [
    :uuid
  ]
end

Create a TodoRestored event

defmodule TodoBackend.Todos.Events.TodoRestored do
  @derive Jason.Encoder
  defstruct [
    :uuid
  ]
end

Update the aggregate

 defmodule TodoBackend.Todos.Aggregates.Todo do
   ...
+  def execute(%Todo{uuid: uuid}, %RestoreTodo{uuid: uuid}) do
+    %TodoRestored{uuid: uuid}
+  end

   ...
+  def apply(%Todo{uuid: uuid} = todo, %TodoRestored{uuid: uuid}) do
+    %Todo{todo | deleted_at: nil}
+  end

   ...
end

Route the command to the aggregate

 defmodule TodoBackend.Router do
   ...
-  dispatch([CreateTodo, DeleteTodo, UpdateTodo],
+  dispatch([CreateTodo, DeleteTodo, UpdateTodo, RestoreTodo],
    to: Todo,
    identity: :uuid,
    lifespan: Todo
  )
end

Update the projector

 defmodule TodoBackend.Todos.Projectors.Todo do
   ...
+  project(%TodoRestored{uuid: uuid}, _, fn multi ->
+    case Repo.get(Todo, uuid) do
+      nil ->
+        multi
+
+      todo ->
+        Ecto.Multi.update(multi, :todo, Todo.delete_changeset(todo, %{deleted_at: nil}))
+    end
+  end)
 end

At this point, we have implemented everything required for restoring deleted todo items, except for actually dispatching the RestoreTodo command.

Adding the API method

(Link to relevant commit)

To dispatch the RestoreTodo command, I added PUT /api/todos/:id/restore to the API.

Add a method to the context module

defmodule TodoBackend.Todos do
  ...

  def restore_todo(id) do
    command = %RestoreTodo{uuid: id}

    with :ok <- App.dispatch(command, consistency: :strong) do
      {:ok, get_todo!(id)}
    else
      reply -> reply
    end
  end
end

Add a method to the controller

defmodule TodoBackendWeb.TodoController do
  ...

  def restore(conn, %{"id" => id}) do
    with {:ok, %Todo{} = todo} <- Todos.restore_todo(id) do
      render(conn, "show.json", todo: todo)
    end
  end
end

Register the route

 defmodule TodoBackendWeb.Router do
   ...

   scope "/api", TodoBackendWeb do
     pipe_through :api

     resources "/todos", TodoController
     delete "/todos", TodoController, :delete_all
+    put "/todos/:id/restore", TodoController, :restore
   end
 end

Validation

For the first time in this system, let’s look at how we can use aggregate state and the execute command handler to prevent deleting already-deleted items (and restoring non-deleted items).

Command validation is a core property of CQRS designs. Once events are emitted, they are never changed. The only way to enforce validations is to accept or reject commands.

The first validation I’ll add is that items which are currently deleted can not be deleted:

 defmodule TodoBackend.Todos.Aggregates.Todo do
   ...
-  def execute(%Todo{uuid: uuid}, %DeleteTodo{uuid: uuid}) do
+  def execute(%Todo{uuid: uuid, deleted_at: nil}, %DeleteTodo{uuid: uuid}) do
     %TodoDeleted{uuid: uuid, datetime: DateTime.utc_now()}
   end
 
   ...
+  def execute(%Todo{}, %DeleteTodo{}) do
+    {:error, "Can not delete todo that is already deleted"}
+  end
 end

This pattern-matched method will only emit a TodoDeleted event when the existing aggregate state contains a deleted_at value of nil. In all other cases, it will fall through to the implementation that returns an {:error, message} tuple.

In a production system, structured errors should be returned in order to provide API clients with useful responses. Currently, this implementation just causes a 500.

The other validation will prevent restoring non-deleted items. It reads nearly identically to the first one.

 defmodule TodoBackend.Todos.Aggregates.Todo do
   ...
+  def execute(%Todo{deleted_at: nil}, %RestoreTodo{}) do
+    {:error, "Can only restore deleted todos"}
+  end
+
  def execute(%Todo{uuid: uuid}, %RestoreTodo{uuid: uuid}) do
    %TodoRestored{uuid: uuid}
  end
end

Summary

In this post, I shared how the CQRS implementation of a todo list API made it simple to add soft-delete and restore functionality.

I added validations and showed how to up-convert existing events to ensure they are compatible with the evolution of a Commanded application.

In most designs, this would probably not be possible unless a table tracking extension is being used in an ORM. Even with change tracking enabled through extensions like paper trail or Django simple history, it can be tricky to restore deleted entities. Object tracking would need to have been enabled before it is needed to ensure the data is still around to be restored.

When an immutable, append-only log is the source of truth for an application, it can be updated to meet requirements that were not known at the onset of the project.