Using CQRS in a simple Phoenix API with Commanded


Despite being a fan of event sourcing and seeing the clear benefits of the approach, I never built anything from scratch. This weekend, I finally decided to break this study cycle and do something practical.

Follow along with what I learned while implementing a project named todo_backend_commanded. Its git history reflects the process of migrating from a vanilla Phoenix API to an event sourced solution.

Context

I have been curious about the concepts of event sourcing and CQRS for a while— obsessively reading books like Practical Microservices (Garofolo) and Architecture Patterns with Python (Percival, Gregory), along with documentation for libraries like Sequent (Ruby), Commanded (Elixir).

Whenever the topic comes up in conversation—something that happens more often than one might expect—I share a link to Kickstarter’s post on how they used event sourcing to launch d.rip and suggest it wouldn’t be too hard to follow their example and give it a try.

What did I do?

As noted in my ASP.NET Core post from 2017 (wow, I should write more often), there is a project called Todo-Backend that provides tests gradually guiding its users toward a full implementation of a backend API for a todo list app.

It’s a very familiar API that can be implemented in a few minutes with the Phoenix Framework.

In fact, the first commit of this post’s repository does just that. Most of it is the result of just two commands:

mix phx.new todo_backend_commanded --app todo_backend --no-assets --no-html --no-gettext --no-dashboard --no-live --no-mailer

mix phx.gen.json Todos Todo todos title:string completed:boolean

This is a testiment to the value and productivity of Phoenix, but the resulting code is just basic CRUD. The views are tied 1:1 with their database-backed Ecto schemas. One thing to note is that Phoenix generates DDD-style contexts. This is unlike Rails, which would produce a typical ActiveRecord sprawl: bloated models directly being accessed and lazily queried across the entire application.

Phoenix produces a single interface for each bounded context, making it a great framework for experimenting with a swap of the persistence layer.

Commanded

The Commanded hex package is a fabulous CQRS library used by some real companies in production, but it doesn’t have a great on-ramp.

There’s an example application called Conduit, which is the source code for an eBook’s project, but its guidance is not up to date and the book itself starts with account / user management (a pretty advanced domain). The other resource outside of the package documentation is a 20 minute conference talk from 2018.

Initializing

To get started with Commanded, I installed the hex package and created two modules within the TodoBackend application, App and EventStore:

defmodule TodoBackend.App do
  use Commanded.Application, otp_app: :todo_backend
end

defmodule TodoBackend.EventStore do
  use EventStore, otp_app: :todo_backend
end

Both of these modules rely on macros exposed by Commanded and EventStore. The Commanded documentation uses the name Application, but this didn’t make much sense to me, since TodoBackend already had an Application module for the supervision tree. Initially, I even thought that I was supposed to add the Commanded.Application macro to the supervisor. Once that didn’t work, I renamed it to App and decided that would be the module to house my dispatching and routing.

Since I already had a Postgres database running, I decided to use EventStore rather than installing and babysitting EventStoreDB. To initialize the database and tables, I ran mix event_store.init and mix event_store.create.

First command: create todo

(Link to relevant commit)

CQRS libraries such as Commanded have a concept of aggregates, which are the core objects of a domain. With the simple data model of this API, I created a single aggregate to represent a todo. Its initial definition looks like this:

defmodule TodoBackend.Todos.Aggregates.Todo do
  defstruct [
    :uuid,
    :title,
    :completed,
    :order
  ]

An aggregate starts out as a plain struct. Once we have an aggregate, we can layer on commands and events.

A command represents a caller’s intent to have the system respond to some proposed action. It can be accepted or rejected. When accepted, a command produces one or more events. These represent the fact that something did happen. They tend to be named in the past tense.

The first action I created in this system is CreateTodo, which is defined as a struct with the same fields as the Todo aggregate. I also created a TodoCreated event struct—also containing the same fields.

To decide how to process a command, the execute/2 method is called on the aggregate module, where the first argument is the previous aggregate state (if any exists), and the second argument is the command. If this returns a value that is not an {:error, something} tuple, the return value will be interpreted as one or more events to be committed to the log.

In action, the the Todo aggregate implementation includes the following two methods:

def execute(%Todo{uuid: nil}, %CreateTodo{} = create) do
    %TodoCreated{
        uuid: create.uuid,
        title: create.title,
        completed: create.completed,
        order: create.order
    }
end

def apply(%Todo{} = todo, %TodoCreated{} = created) do
    %Todo{
        todo
        | uuid: created.uuid,
        title: created.title,
        completed: created.completed,
        order: created.order
    }
end

At this point, Commanded seems to be a complicated system where the same struct has to be defined many times. The initial boilerplate is quite verbose, but the benefits of centering the application around an append-only log will come soon.

Once a command exists, a router is needed to determine which aggregate a command belongs to. This basic router gets the CreateTodo command wired to the Todo aggregate, using the uuid property of the command to determine which Todo instance is being referred to:

defmodule TodoBackend.Router do
  use Commanded.Commands.Router

  alias TodoBackend.Todos.Aggregates.Todo
  alias TodoBackend.Todos.Commands.CreateTodo

  dispatch([CreateTodo], to: Todo, identity: :uuid)
end

To test out the first aggregate and command, the following can be executed in an iex session (just run these commands in iex -S mix)

alias TodoBackend.App
alias TodoBackend.Todos.Aggregates.Todo
alias TodoBackend.Todos.Commands.CreateTodo

# Generate an ID for the todo item
uuid = Ecto.UUID.generate()

# Create a command instance
command = %CreateTodo{uuid: uuid, title: "Hello, world!", completed: false, order: 66}

# Run the command, which is dispatched to the aggregate via the router
App.dispatch(command)

# Query for the aggregate state
App.aggregate_state(Todo, uuid)

# Result
%Todo{
    uuid: "51004ff5-5a73-4681-87bb-1b1ffbf03fe0",
    title: "Hello, world!",
    completed: false,
    order: 66
}

Second command: delete todo

(Link to relevant commit)

Since CQRS applications rely on append-only logs, there is no way events can be deleted directly. This is problematic when the requirement to delete todo items comes around.

Luckily, there is a solution to this problem: a TodoDeleted event. This can act as a tombstone record describing that the todo stopped existing at a certain point.

With the aggregate boilerplate out of the way, this is quite easy to implement.

  1. Create the command
defmodule TodoBackend.Todos.Commands.DeleteTodo do
  defstruct [
    :uuid
  ]
end
  1. Define the event
defmodule TodoBackend.Todos.Events.TodoDeleted do
  @derive Jason.Encoder
  defstruct [
    :uuid
  ]
end
  1. Add the execute and apply methods to the aggregate

In this case, we are turning the aggregate state into nil when receiving a TodoDeleted event.

def execute(%Todo{uuid: uuid}, %DeleteTodo{uuid: uuid}) do
    %TodoDeleted{uuid: uuid}
end

def apply(%Todo{uuid: uuid}, %TodoDeleted{uuid: uuid}) do
    nil
end
  1. Update the router
dispatch([DeleteTodo], to: Todo, identity: :uuid)

Now, the DeleteTodo command can be handled by the App!

Many events per command: update todo

(Link to relevant commit)

Up to this point, there has been a one-to-one correlation between commands and events. No decisions have been made in execute/2 methods.

One important thing about events is that they can (and should) represent something meaningful happening in the domain of the application. For example, it would be tempting to create a TodoUpdated event containing title, completed, and order values.

Imagine this todo list app becomes the product of a company, and that company has a team that wants to collect metrics on how often items are completed. An analytics pipeline might need to consume all of the TodoUpdated events to determine if any of them changed the completed value. This would require knowledge of the state prior to the event. “Updated” lacks domain context and doesn’t have as much utility as it could have.

Instead, we should choose to break down the update into many possible events:

  • Mark an item as completed
  • Update the title of an item
  • Mark an item as un-completed
  • Update the order of an item

Each of these events represent something meaningful happening, in the language of the domain.

To implement this—while keeping the API semantics as an “update”—I created a UpdateTodo command that produces many events:

def execute(%Todo{} = todo, %UpdateTodo{} = update) do
    completion_command =
        if todo.completed != update.completed and not is_nil(update.completed) do
            if update.completed do
                %TodoCompleted{uuid: todo.uuid}
            else
                %TodoUncompleted{uuid: todo.uuid}
            end
        end

    title_command =
        if todo.title != update.title and not is_nil(update.title),
            do: %TodoTitleUpdated{uuid: todo.uuid, title: update.title}

    order_command =
        if todo.order != update.order and not is_nil(update.order),
            do: %TodoOrderUpdated{uuid: todo.uuid, order: update.order}

    [completion_command, title_command, order_command] |> Enum.filter(&Function.identity/1)
end

The execute/2 method is invoked once per command, one at a time per instance. There are no data races, so we can put our business logic in this method and decide how to translate the command into events. If the todo’s completed value is updated, we can emit a meaningful event. If more than one field is updated, we create more than one event.

Quick Note

Filtering on the Function.identity/1 method is a neat little trick to remove falsy (nil and false) entries from an enumerable.

And we’re back

To update the state of the aggregate, I implemented simple apply/2 methods:

def apply(%Todo{} = todo, %TodoCompleted{}) do
    %Todo{todo | completed: true}
end

def apply(%Todo{} = todo, %TodoUncompleted{}) do
    %Todo{todo | completed: false}
end

def apply(%Todo{} = todo, %TodoTitleUpdated{title: title}) do
    %Todo{todo | title: title}
end

def apply(%Todo{} = todo, %TodoOrderUpdated{order: order}) do
    %Todo{todo | order: order}
end

Now, we have the ability to articulate things that happened in the language of the application’s domain!

Projecting state into DB

Related commits:

So far, three commands can produce six events. There is an aggregate that tracks the events. Its state can be fetched by directly querying TodoBackend.App.aggregate_state/4. This is great as a domain exploration exercise, but it isn’t queryable.

Enter: the read model—a representation of state, produced as a function of the event log. Unlike the original model, we only can use the read model for read operations. Aside from this one restriction, we are able to use all of the functionality available in the Ecto ORM—selecting, aggregating, and even joining.

The process of creating a read model from an event log is called “projecting.” To simplify the creation and maintenance of projections, I opted to use the commanded_ecto_projections hex package. This library uses a little bit of metaprogramming magic to add the project macro to a module, causing it to add operations to an Ecto Multi—which eventually is executed against the application’s database.

One housekeeping item that I had to take care of is replacing the numeric primary key of the table with a uuid, since we won’t be relying on the sequential generated identifiers within our read model. This migration is awful as-written since it changes the schema by dropping all of the data in the existing table, but it got the job done for this learning exercise.

The projector for the Todo model looks like this (with some parts omitted for brevity):

defmodule TodoBackend.Todos.Projectors.Todo do
  use Commanded.Projections.Ecto,
    # Register a name for the handler's subscription in the event store
    name: "Todos.Projectors.Todo",
    application: TodoBackend.App,
    # Ensure the database operation completes before allowing the command to be considered completed.
    consistency: :strong

  project(%TodoCreated{} = created, _, fn multi ->
    Ecto.Multi.insert(multi, :todo, %Todo{
      uuid: created.uuid,
      title: created.title,
      completed: created.completed,
      order: created.order
    })
  end)

  project(%TodoDeleted{uuid: uuid}, _, fn multi ->
    Ecto.Multi.delete(multi, :todo, fn _ -> %Todo{uuid: uuid} end)
  end)

  project(%TodoCompleted{uuid: uuid}, _, fn multi ->
    case Repo.get(Todo, uuid) do
      nil -> multi
      todo -> Ecto.Multi.update(multi, :todo, Todo.update_changeset(todo, %{completed: true}))
    end
  end)

  # …more project calls below
end

Once the projector is registered with the application’s supervision tree, the database should start populating with todo items from the log events.

This is one of the most important properties of an event-sourced system: a read model can easily be reconstructed from the event log. Rather than performing complicated migrations and backfills, a team can produce a new database table and replay events to produce the necessary representation.

Wire into the API

Link to relevant commit

Finally, it’s time to connect the commands and read model up to the API!

As noted earlier, Phoenix produces a module for each context of an application. This context serves as an API for any application code—API, background job, or script—to interact with the domain without needing to know the persistence implementation details.

To bring it all together, the commit for this section shows all of the changes that were required to migrate the context over to the CQRS solution.

I’ll only show one method here, but they’re all present in the code.

Before

def create_todo(attrs \\ %{}) do
  %Todo{}
  |> Todo.changeset(attrs)
  |> Repo.insert()
end

After

def create_todo(attrs \\ %{}) do
  uuid = Ecto.UUID.generate()

  command =
  attrs
  |> CreateTodo.new()
  |> CreateTodo.assign_uuid(uuid)

  with :ok <- App.dispatch(command, consistency: :strong) do
    {:ok, get_todo!(uuid)}
  else
    reply -> reply
  end
end

One key aspect of this invocation is the consistency: :strong, which ensures that all handlers and projectors with strong consistency enabled have committed before returning. This means the read model is ready to query after dispatch has completed.

By changing the context implementation, the controller didn’t have to be updated (aside from the omission of a deleted todo in the return value of delete_todo, which I thought was a pretty bad idea to begin with) in order to start using a CQRS solution.

Every change is tracked through a log of meaningful domain events. The read model can be migrated and updated as our requirements grow. New read models can be created from the same events, in case we start to have different views or different consumers. Arbitrary handlers can be written to extract the changes into some other system. Data is never truly deleted, and can be resurrected for new use cases in the future (possibly the topic of a sequel to this post).

Where to go from here

With this experience kicking the tires of Commanded, I’m inspired to try building something real and useful with CQRS.

As for this project, I have a few ideas for improvements:

  • Soft delete read model: Create a projection that populates a deleted_at column, to show how events can be re-used to enable new use cases like restoring deleted items
  • Validation: Determine how to apply validation to the commands, enforcing schemas and mandatory fields
  • Error handling: Test out the rejection of commands, returning the error messages to the API’s caller
  • Taking on a meaningful domain, not just “another todo app”