Dogfooding Process Manager

Your ads will be inserted here by

Easy Plugin for AdSense.

Please go to the plugin admin page to
Paste your ad code OR
Suppress this ad slot.

Dogfooding Process Manager

Process managers (sometimes called Sagas) help us with modeling long running processes which happen in our domains. Think of such process as a series of domain events. When enough of them took place (and the particular ones we’re interested in) then we execute a command. The thing is that the events we’re waiting for might take a longer time to arrive, during which our process manager has to keep track of what has been already processed. And that’s where it gets interesting.

The Domain

Consider following example taken from catering domain. You’re an operations manager. Your task is to suggest your customer a menu they’d like to order and at the same time you have to confirm that caterer can deliver this particular menu (for given catering conditions). In short you wait for CustomerConfirmedMenu and CatererConfirmedMenu. Only after both happened you can proceed further. You’ll likely offer several menus to the customer and each of them will need a confirmation from corresponding caterers.

If there’s a match of CustomerConfirmedMenu and CatererConfirmedMenu for the same order_id you cheer and trigger ConfirmOrder command to push things forward. By the way there’s a chance you may as well never hear from the caterer or they may decline, so process may as well never complete 😉

Classical example

Given the tools from RailsEventStore ecosystem I use on a daily basis, the implementation might look more or less like this:

class CateringMatch   class State < ActiveRecord::Base     self.table_name = :catering_match_state     # order_id     # caterer_confirmed     # customer_confirmed      def self.get_by_order_id(order_id)       transaction do         yield lock.find_or_create_by(order_id: order_id)       end     end      def complete?       caterer_confirmed? && customer_confirmed?     end   end   private_constant :State    def initialize(command_bus:)     @command_bus = command_bus   end    def call(event)     order_id = event.data(:order_id)     State.get_by_order_id(order_id) do |state|       case event       when CustomerConfirmedMenu         state.update_column(:customer_confirmed, true)       when CatererConfirmedMenu         state.update_column(:caterer_confirmed, true)       end        command_bus.(ConfirmOrder.new(data: {         order_id: order_id       })) if state.complete?     end   end end 

This process manager is then enabled by following RailsEventStore instance configuration:

RailsEventStore::Client.new.tap do |client|   client.subscribe(CateringMatch.new(command_bus: command_bus),     [CustomerConfirmedMenu, CatererConfirmedMenu]) end 

Whenever one of the aforementioned domain events is published by the event store, our process manager will be called with that event as an argument.

Implementation above uses ActiveRecord (with dedicated table) to persist internal process state between those executions. In addition you’d have to run database migration and create this table. I was just about to code it but then suddenly one of those aha moments came.

We already know how to persist events — that’s what we use RailsEventStore for. We also know how to recreate state from events with event sourcing. Last but not least the input for process manager are events. Wouldn’t it be simpler for process managers to eat it’s own dog food?

Let’s do this!

My first take on event sourced process manager looked something like this:

Your ads will be inserted here by

Easy Plugin for AdSense.

Please go to the plugin admin page to
Paste your ad code OR
Suppress this ad slot.

require 'aggregate_root'  module EventSourcing   def apply(event)     apply_strategy.(self, event)     unpublished_events << event   end    def load(stream_name, event_store:)     events = event_store.read_stream_events_forward(stream_name)     events.each do |event|       apply(event)     end     @unpublished_events = nil   end    def store(stream_name, event_store:)     unpublished_events.each do |event|       event_store.append_to_stream(event, stream_name: stream_name)     end     @unpublished_events = nil   end    private          def unpublished_events     @unpublished_events ||= []   end    def apply_strategy     ::AggregateRoot::DefaultApplyStrategy.new   end end  class CateringMatch   class State     include EventSourcing      def initialize       @caterer_confirmed  = false       @customer_confirmed = false     end      def apply_caterer_confirmed_menu(_)       @caterer_confirmed = true     end      def apply_customer_confirmed_menu(_)       @customer_confirmed = true     end      def complete?       caterer_confirmed? && customer_confirmed?     end   end   private_constant :State    def initialize(command_bus:, event_store:)     @command_bus = command_bus     @event_store = event_store   end    def call(event)     order_id = event.data(:order_id)     stream_name = "CateringMatch$#{order_id}"      state = State.new     state.load(stream_name, event_store: @event_store)     state.apply(event)     state.store(stream_name, event_store: @event_store)      command_bus.(ConfirmOrder.new(data: {       order_id: order_id     })) if state.complete?   end end 

When process manager is executed, we load already processed events from stream (partitioned by order_id). Next we apply the event that just came in, in the end appending it to stream to persist. The trigger with condition stays unchanged since it is only the State implementation that we made different.

In theory that could work, I could already feel that dopamine kick after job well done. In practice, the reality brought me this:

Failure/Error: event_store.append_to_stream(event, stream_name: stream_name)  ActiveRecord::RecordNotUnique:   PG::UniqueViolation: ERROR:  duplicate key value violates unique constraint "index_event_store_events_on_event_id"   DETAIL:  Key (event_id)=(bddeffe8-7188-4004-918b-2ef77d94fa65) already exists.   : INSERT INTO "event_store_events" ("event_id", "stream", "event_type", "metadata", "data", "created_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id" 

Doh!

I forgot about this limitation of RailsEventStore. You can’t yet have the same event in multiple streams. By contrast in GetEventStore streams are cheap and that’s one of the common use cases.

Take 2

Given the RailsEventStore limitation I had to figure out something else. The idea was just too good to give it up that soon. And that’s when second aha moment arrived!

There’s this RailsEventStore::Projection mechanism, which let’s you traverse multiple streams in search for particular events. When one is found, given lambda is called. Sounds familiar? Let’s see it in full shape:

class CateringMatch   class State     def initialize(event_store:, stream_name:)       @event_store = event_store       @stream_name = stream_name     end      def complete?       initial =         { caterer_confirmed: false,           customer_confirmed: false,         }       state =         RailsEventStore::Projection           .from_stream(@stream_name)           .init(->{ initial })           .when(CustomerConfirmedMenu, ->(state, event) {               state[:customer_confirmed] = true             })           .when(CatererConfirmedMenu, ->(state, event) {               state[:caterer_confirmed] = true             })           .run(@event_store)       state[:customer_confirmed] && state[:caterer_confirmed]     end   end   private_constant :State    def initialize(command_bus:, event_store:)     @command_bus = command_bus     @event_store = event_store   end    def call(event)     order_id = event.data(:order_id)                 state    = State.new(event_store: @event_store, stream_name: "Order$#{order_id}")      command_bus.(ConfirmOrder.new(data: {      order_id: order_id     })) if state.complete?   end end 

Implementation is noticeably shorter (thanks to hidden parts of RailsEventStore::Projection). Works not only in theory. And this is the one I chose to stick with for my process manager.

I cannot however say I fully like it. The smell for me is that we peek into the stream that does not exclusively belong to the process manager (it does belong to aggregate into whose stream CustomerConfirmedMenu and CatererConfirmedMenu were published). Another culprit comes when testing. Projection can only work with events persisted in streams, so it is not sufficient to only pass an event as an input to process manager. You have to additionally persist it.

RSpec.describe CateringMatch do   facts = [     CustomerConfirmedMenu.new(data: { order_id: '42' }),     CatererConfirmedMenu.new(data: { order_id: '42' })   ]   facts.permutation.each do |fact1, fact2|     specify do       command_bus = spy(:command_bus)       event_store = RailsEventStore::Client.new        CateringMatch.new(event_store: event_store, command_bus: command_bus).tap do |process_manager|         event_store.append_to_stream(fact1, stream_name: "Order$#{fact1.data[:order_id]}")         process_manager.(fact1)          event_store.append_to_stream(fact2, stream_name: "Order$#{fact2.data[:order_id]}")         process_manager.(fact2)       end        expect(command_bus).to have_received(:call)     end   end end 

Would you choose event backed state for process manager as well? Let me know in comments!

Leave a Reply

Your email address will not be published. Required fields are marked *