Part two in my series on Elixir’s Commanded library. Part one can be found here.
Context
In the previous post, I converted a vanilla Phoenix API to CQRS with Commanded.
This application writes to the database using the commanded_ecto_projections
hex package, which subscribes to events and projects their state into tables managed by the Ecto database library.
Since the core data model of this application is an append-only (immutable) log, the events can be replayed and the read model can be dramatically changed using existing data.
Goal
Implement a soft delete in the API, allowing items to be restored after deletion.
Follow along with what I learned while iterating on a project named todo_backend_commanded
. Its git history shows the steps involved to implement soft delete and restore functionality.
Updating the aggregate
The first step is to add a deleted_at
field to the Todo
aggregate.
defmodule TodoBackend.Todos.Aggregates.Todo do
defstruct [
:uuid,
:title,
:completed,
- :order
+ :order,
+ deleted_at: nil
]
...
- def apply(%Todo{uuid: uuid}, %TodoDeleted{uuid: uuid}) do
- nil
- end
+ def apply(%Todo{uuid: uuid} = todo, %TodoDeleted{uuid: uuid}) do
+ %Todo{todo | deleted_at: DateTime.utc_now()}
+ end
An aggregate is a struct representing the state of an object in an application. In Commanded, aggregates are hydrated in-memory from dispatched events. The aggregate struct is used to determine the side-effects of a command, if the command is to be accepted at all.
This diff causes the aggregate’s deleted_at
field to be set as the datetime when the event log is hydrated. We’ll come back to this, since it would be preferable to capture the time the todo item was deleted.
Aside: aggregates sound slow
If an aggregate has to be hydrated from its events every time it is used, how is this not a terrible performance burden?
In practice, an individual aggregate shouldn’t have many events. How many interactions could one todo item possibly have? If it turns out certain items have large event logs, there are a couple of levers that can be pulled in Commanded.
Lifespans
So, I lied told a half-truth about aggregates. They are not hydrated in-memory for every command / event. In reality, aggregates are implemented with GenServer
each caching their state and being managed under the commanded application’s supervision tree (ultimately by a DynamicSupervisor
called Commanded.Aggregates.Supervisor
, to be specific).
Since aggregates are cached as long-running processes, we only have to fetch each event once.
The timeout of an aggregate process can be configured via an implementation of the AggregateLifespan
behaviour. The default lifespan, DefaultLifespan
, sets an infinite timeout unless an exception is raised.
Depending on the number of aggregate instances in a live system, this could introduce another problem—these processes could stay alive for the life of the server. That sounds like an OOM waiting to happen.
To address this, I implemented a lifespan for the Todo aggregate that keeps aggregates around for a minute—unless they receive the TodoDeleted
event. Being able to pattern match and implement this within the context of an application means the timeouts can reflect the specific details of your domain.
Snapshots
For aggregates with many events, Commanded has aggregate state snapshots. These can be configured by adding a snapshotting
section to the Commanded application in config/config.exs
.
There are two options per aggregate: snapshot_every
and snapshot_version
. snapshot_every
is used to cause snapshots to be created after a certain number of events. This value acts as an upper bound of events that must be evaluated when hydrating an aggregate state. snapshot_version
is a non-negative integer that should be updated every time the aggregate is updated. It is used to invalidate old snapshots that may be missing new fields or were created with old apply
methods.
Snapshots can be valuable in applications with long event logs, but didn’t seem to be worthwhile in this todo API. If we were using snapshots, the addition of the deleted_at
field would have required an increment of snapshot_version
.
To learn more about aggregate performance, check out this blog post where an organization dramatically improved the performance and footprint of their Commanded application through lifespans and snapshots.
Effective dating the delete event
As I noted when adding the deleted_at
field to the aggregate, it’s not appropriate to use the datetime when the event is applied. This would cause the deleted_at
value to be different every time the service is deployed or the aggregate genserver is restarted. It would be much better to capture the datetime when the event is created, so it can be persisted in the event log.
First, I added a datetime
field to the TodoDeleted
event struct:
defmodule TodoBackend.Todos.Events.TodoDeleted do
@derive Jason.Encoder
defstruct [
- :uuid
+ :uuid,
+ :datetime
]
end
Next, I populated this value in the execute
method of the todo aggregate, when the DeleteTodo
command received. Since execute
is called when a command is dispatched, this will be set to the datetime when the delete endpoint is called.
I also updated the apply
method to use the datetime from the event, instead of the current datetime. This value will come from the event log.
defmodule TodoBackend.Todos.Aggregates.Todo do
...
def execute(%Todo{uuid: uuid}, %DeleteTodo{uuid: uuid}) do
- %TodoDeleted{uuid: uuid}
+ %TodoDeleted{uuid: uuid, datetime: DateTime.utc_now()}
end
...
- def apply(%Todo{uuid: uuid} = todo, %TodoDeleted{uuid: uuid}) do
- %Todo{todo | deleted_at: DateTime.utc_now()}
+ def apply(%Todo{uuid: uuid} = todo, %TodoDeleted{uuid: uuid, datetime: effective_datetime}) do
+ %Todo{todo | deleted_at: effective_datetime}
end
Read your writes
The log stores events in JSON format. Since JSON does not have a native datetime format, the Commanded.Serialization.JsonDecoder
protocol must be implemented on the event struct.
defmodule TodoBackend.Todos.Events.TodoDeleted do
...
defimpl Commanded.Serialization.JsonDecoder, for: TodoDeleted do
@doc """
Parse the datetime included in the aggregate state
"""
def decode(%TodoDeleted{} = state) do
%TodoDeleted{datetime: datetime} = state
{:ok, dt, _} = DateTime.from_iso8601(datetime)
%TodoDeleted{state | datetime: dt}
end
end
end
What about existing events?
This is fine, but what about all of those events that are already in the log without datetimes?
Since the log is append-only and immutable, the way to extend an event definition is to add a fallback value for the new field.
In this case, I modified the JSON decoder method—substituting a nil
value for the current datetime. Since the value was never written to old events, the actual deletion time is lost. The best thing we can do is provide some value and ensure we write a datetime on all future events.
defmodule TodoBackend.Todos.Events.TodoDeleted do
...
defimpl Commanded.Serialization.JsonDecoder, for: TodoDeleted do
@doc """
Parse the datetime included in the aggregate state
"""
def decode(%TodoDeleted{} = state) do
%TodoDeleted{datetime: datetime} = state
-
- {:ok, dt, _} = DateTime.from_iso8601(datetime)
-
- %TodoDeleted{state | datetime: dt}
+
+ if datetime == nil do
+ %TodoDeleted{state | datetime: DateTime.utc_now()}
+ else
+ {:ok, dt, _} = DateTime.from_iso8601(datetime)
+
+ %TodoDeleted{state | datetime: dt}
+ end
+ end
end
end
Updating the read model
In a production environment it would be best to create a new database table and projector containing a deleted_at
field, deploy the application to build the table, update callers to use the new table, and then delete the old table. This is safe to do because all writes happen in the event layer—the read model is read-only.
In some cases, it could also be reasonable to update the projector such that the data can be migrated in-place. This seemed like more effort than it was worth for a blog post, so I opted for the dangerous option: blow away the table and rebuild it with an updated projector.
Preparing the database
First, I generated a migration to add the deleted_at
column to the todos
table:
mix ecto.gen.migration add_deleted_at_to_todo
In the generated migration file, I added the column and an index:
defmodule TodoBackend.Repo.Migrations.AddDeletedAtToTodo do
use Ecto.Migration
def change do
alter table(:todos) do
add :deleted_at, :naive_datetime_usec
end
create index(:todos, [:deleted_at])
end
end
Finally, I added the field to the projection:
defmodule TodoBackend.Todos.Projections.Todo do
...
schema "todos" do
field :completed, :boolean, default: false
field :title, :string
field :order, :integer, default: 0
+ field :deleted_at, :naive_datetime_usec, default: nil
timestamps()
end
end
After running mix ecto.migrate
, the database side is ready to go.
Updating the projector
The projector update is very simple: instead of deleting the row from the table, just update the deleted_at
column.
defmodule TodoBackend.Todos.Projectors.Todo do
...
- project(%TodoDeleted{uuid: uuid}, _, fn multi ->
- Ecto.Multi.delete(multi, :todo, fn _ -> %Todo{uuid: uuid} end)
+ project(%TodoDeleted{uuid: uuid, datetime: effective_datetime}, _, fn multi ->
+ case Repo.get(Todo, uuid) do
+ nil ->
+ multi
+
+ todo ->
+ Ecto.Multi.update(
+ multi,
+ :todo,
+ Todo.delete_changeset(todo, %{deleted_at: effective_datetime})
+ )
+ end
+ end)
...
end
I added a delete_changeset
to the Todo
projection to represent a specific desired change. It ensures we only update the deleted_at
column. This is referenced in the new TodoDeleted
projector method.
defmodule TodoBackend.Todos.Projections.Todo do
...
def delete_changeset(todo, attrs \\ %{}) do
todo
|> cast(attrs, [:deleted_at])
end
end
Forcefully re-populating the database
Like I mentioned before, it’s a terrible idea to truncate a database in production. Here’s how I did it locally to explore Commanded. It’s possible this could also be done by updating the name
value provided to the use Commanded.Projections.Ecto
call in TodoBackend.Todos.Projections.Todo
and restarting the server.
If there is no existing data in the log, this step can be skipped.
In an iex shell (iex -S mix phx.server
):
# Delete the projected values
TodoBackend.Repo.delete_all(TodoBackend.Todos.Projections.Todo)
# Remove the version tracking entry for the ecto projection
TodoBackend.Repo.delete(%TodoBackend.Todos.Projectors.Todo.ProjectionVersion{projection_name: "Todos.Projectors.Todo"})
alias Commanded.Event.Handler
alias Commanded.Registration
# Trigger a reset of the projector
registry_name = Handler.name(TodoBackend.App, "Todos.Projectors.Todo")
projector = Registration.whereis_name(TodoBackend.App, registry_name)
send(projector, :reset)
Hiding deleted items
Now that deleted items are present in the todos
table, it’s important to update the Todos
context module to filter out deleted items. This can be done in the context without needing any updates to the controller, since all access is encapsulated. As I noted in the previous post, this is a really valuable property of the DDD-inspired approach of modern Phoenix applications.
defmodule TodoBackend.Todos do
...
def list_todos do
- Repo.all(Todo)
+ from(t in Todo,
+ where: is_nil(t.deleted_at)
+ )
+ |> Repo.all()
end
...
- def get_todo!(uuid), do: Repo.get_by!(Todo, uuid: uuid)
+ def get_todo!(uuid) do
+ from(t in Todo,
+ where: is_nil(t.deleted_at)
+ )
+ |> Repo.get_by!(uuid: uuid)
+ end
end
Where are we, now?
Soft deletes have been implemented. The read model has been reconstructed from existing data, including previously-deleted items.
In CQRS applications, data never is really deleted—at most, it just gets removed from the read model.
Not legal advice on handling GDPR: some applications may choose to encrypt log entries with a per-entity encryption key in order to satisfy permanent data deletion requests. To fully delete data, the encryption key could be discarded. This would cause the events to be unreadable if they were ever replayed. Some special handling would be necessary to prevent the application from crashing on GDPR-deleted entities. This is very clearly out of scope for this blog post, but a very interesting concept.
Michiel Rook wrote two posts on this topic, which can be found here and here.
New feature: restore deleted items
Now that soft deletes are implemented, it’s time to write the new feature: un-delete!
Implementation
Let’s quickly walk through the code changes required to implement the restoration of deleted todo items:
Create a RestoreTodo
command
defmodule TodoBackend.Todos.Commands.RestoreTodo do
defstruct [
:uuid
]
end
Create a TodoRestored
event
defmodule TodoBackend.Todos.Events.TodoRestored do
@derive Jason.Encoder
defstruct [
:uuid
]
end
Update the aggregate
defmodule TodoBackend.Todos.Aggregates.Todo do
...
+ def execute(%Todo{uuid: uuid}, %RestoreTodo{uuid: uuid}) do
+ %TodoRestored{uuid: uuid}
+ end
...
+ def apply(%Todo{uuid: uuid} = todo, %TodoRestored{uuid: uuid}) do
+ %Todo{todo | deleted_at: nil}
+ end
...
end
Route the command to the aggregate
defmodule TodoBackend.Router do
...
- dispatch([CreateTodo, DeleteTodo, UpdateTodo],
+ dispatch([CreateTodo, DeleteTodo, UpdateTodo, RestoreTodo],
to: Todo,
identity: :uuid,
lifespan: Todo
)
end
Update the projector
defmodule TodoBackend.Todos.Projectors.Todo do
...
+ project(%TodoRestored{uuid: uuid}, _, fn multi ->
+ case Repo.get(Todo, uuid) do
+ nil ->
+ multi
+
+ todo ->
+ Ecto.Multi.update(multi, :todo, Todo.delete_changeset(todo, %{deleted_at: nil}))
+ end
+ end)
end
At this point, we have implemented everything required for restoring deleted todo items, except for actually dispatching the RestoreTodo
command.
Adding the API method
To dispatch the RestoreTodo
command, I added PUT /api/todos/:id/restore
to the API.
Add a method to the context module
defmodule TodoBackend.Todos do
...
def restore_todo(id) do
command = %RestoreTodo{uuid: id}
with :ok <- App.dispatch(command, consistency: :strong) do
{:ok, get_todo!(id)}
else
reply -> reply
end
end
end
Add a method to the controller
defmodule TodoBackendWeb.TodoController do
...
def restore(conn, %{"id" => id}) do
with {:ok, %Todo{} = todo} <- Todos.restore_todo(id) do
render(conn, "show.json", todo: todo)
end
end
end
Register the route
defmodule TodoBackendWeb.Router do
...
scope "/api", TodoBackendWeb do
pipe_through :api
resources "/todos", TodoController
delete "/todos", TodoController, :delete_all
+ put "/todos/:id/restore", TodoController, :restore
end
end
Validation
For the first time in this system, let’s look at how we can use aggregate state and the execute
command handler to prevent deleting already-deleted items (and restoring non-deleted items).
Command validation is a core property of CQRS designs. Once events are emitted, they are never changed. The only way to enforce validations is to accept or reject commands.
The first validation I’ll add is that items which are currently deleted can not be deleted:
defmodule TodoBackend.Todos.Aggregates.Todo do
...
- def execute(%Todo{uuid: uuid}, %DeleteTodo{uuid: uuid}) do
+ def execute(%Todo{uuid: uuid, deleted_at: nil}, %DeleteTodo{uuid: uuid}) do
%TodoDeleted{uuid: uuid, datetime: DateTime.utc_now()}
end
...
+ def execute(%Todo{}, %DeleteTodo{}) do
+ {:error, "Can not delete todo that is already deleted"}
+ end
end
This pattern-matched method will only emit a TodoDeleted
event when the existing aggregate state contains a deleted_at
value of nil
. In all other cases, it will fall through to the implementation that returns an {:error, message}
tuple.
In a production system, structured errors should be returned in order to provide API clients with useful responses. Currently, this implementation just causes a 500.
The other validation will prevent restoring non-deleted items. It reads nearly identically to the first one.
defmodule TodoBackend.Todos.Aggregates.Todo do
...
+ def execute(%Todo{deleted_at: nil}, %RestoreTodo{}) do
+ {:error, "Can only restore deleted todos"}
+ end
+
def execute(%Todo{uuid: uuid}, %RestoreTodo{uuid: uuid}) do
%TodoRestored{uuid: uuid}
end
end
Summary
In this post, I shared how the CQRS implementation of a todo list API made it simple to add soft-delete and restore functionality.
I added validations and showed how to up-convert existing events to ensure they are compatible with the evolution of a Commanded application.
In most designs, this would probably not be possible unless a table tracking extension is being used in an ORM. Even with change tracking enabled through extensions like paper trail or Django simple history, it can be tricky to restore deleted entities. Object tracking would need to have been enabled before it is needed to ensure the data is still around to be restored.
When an immutable, append-only log is the source of truth for an application, it can be updated to meet requirements that were not known at the onset of the project.