Your ads will be inserted here by
Easy Plugin for AdSense.
Please go to the plugin admin page to
Paste your ad code OR
Suppress this ad slot.
Process managers (sometimes called Sagas) help us with modeling long running processes which happen in our domains. Think of such process as a series of domain events. When enough of them took place (and the particular ones we’re interested in) then we execute a command. The thing is that the events we’re waiting for might take a longer time to arrive, during which our process manager has to keep track of what has been already processed. And that’s where it gets interesting.
The Domain
Consider following example taken from catering domain. You’re an operations manager. Your task is to suggest your customer a menu they’d like to order and at the same time you have to confirm that caterer can deliver this particular menu (for given catering conditions). In short you wait for CustomerConfirmedMenu
and CatererConfirmedMenu
. Only after both happened you can proceed further. You’ll likely offer several menus to the customer and each of them will need a confirmation from corresponding caterers.
If there’s a match of CustomerConfirmedMenu
and CatererConfirmedMenu
for the same order_id
you cheer and trigger ConfirmOrder
command to push things forward. By the way there’s a chance you may as well never hear from the caterer or they may decline, so process may as well never complete 😉
Classical example
Given the tools from RailsEventStore
ecosystem I use on a daily basis, the implementation might look more or less like this:
class CateringMatch class State < ActiveRecord::Base self.table_name = :catering_match_state # order_id # caterer_confirmed # customer_confirmed def self.get_by_order_id(order_id) transaction do yield lock.find_or_create_by(order_id: order_id) end end def complete? caterer_confirmed? && customer_confirmed? end end private_constant :State def initialize(command_bus:) @command_bus = command_bus end def call(event) order_id = event.data(:order_id) State.get_by_order_id(order_id) do |state| case event when CustomerConfirmedMenu state.update_column(:customer_confirmed, true) when CatererConfirmedMenu state.update_column(:caterer_confirmed, true) end command_bus.(ConfirmOrder.new(data: { order_id: order_id })) if state.complete? end end end
This process manager is then enabled by following RailsEventStore
instance configuration:
RailsEventStore::Client.new.tap do |client| client.subscribe(CateringMatch.new(command_bus: command_bus), [CustomerConfirmedMenu, CatererConfirmedMenu]) end
Whenever one of the aforementioned domain events is published by the event store, our process manager will be called with that event as an argument.
Implementation above uses ActiveRecord (with dedicated table) to persist internal process state between those executions. In addition you’d have to run database migration and create this table. I was just about to code it but then suddenly one of those aha moments came.
We already know how to persist events — that’s what we use RailsEventStore
for. We also know how to recreate state from events with event sourcing. Last but not least the input for process manager are events. Wouldn’t it be simpler for process managers to eat it’s own dog food?
Let’s do this!
My first take on event sourced process manager looked something like this:
Your ads will be inserted here by
Easy Plugin for AdSense.
Please go to the plugin admin page to
Paste your ad code OR
Suppress this ad slot.
require 'aggregate_root' module EventSourcing def apply(event) apply_strategy.(self, event) unpublished_events << event end def load(stream_name, event_store:) events = event_store.read_stream_events_forward(stream_name) events.each do |event| apply(event) end @unpublished_events = nil end def store(stream_name, event_store:) unpublished_events.each do |event| event_store.append_to_stream(event, stream_name: stream_name) end @unpublished_events = nil end private def unpublished_events @unpublished_events ||= [] end def apply_strategy ::AggregateRoot::DefaultApplyStrategy.new end end class CateringMatch class State include EventSourcing def initialize @caterer_confirmed = false @customer_confirmed = false end def apply_caterer_confirmed_menu(_) @caterer_confirmed = true end def apply_customer_confirmed_menu(_) @customer_confirmed = true end def complete? caterer_confirmed? && customer_confirmed? end end private_constant :State def initialize(command_bus:, event_store:) @command_bus = command_bus @event_store = event_store end def call(event) order_id = event.data(:order_id) stream_name = "CateringMatch$#{order_id}" state = State.new state.load(stream_name, event_store: @event_store) state.apply(event) state.store(stream_name, event_store: @event_store) command_bus.(ConfirmOrder.new(data: { order_id: order_id })) if state.complete? end end
When process manager is executed, we load already processed events from stream (partitioned by order_id
). Next we apply the event that just came in, in the end appending it to stream to persist. The trigger with condition stays unchanged since it is only the State
implementation that we made different.
In theory that could work, I could already feel that dopamine kick after job well done. In practice, the reality brought me this:
Failure/Error: event_store.append_to_stream(event, stream_name: stream_name) ActiveRecord::RecordNotUnique: PG::UniqueViolation: ERROR: duplicate key value violates unique constraint "index_event_store_events_on_event_id" DETAIL: Key (event_id)=(bddeffe8-7188-4004-918b-2ef77d94fa65) already exists. : INSERT INTO "event_store_events" ("event_id", "stream", "event_type", "metadata", "data", "created_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id"
Doh!
I forgot about this limitation of RailsEventStore
. You can’t yet have the same event in multiple streams. By contrast in GetEventStore
streams are cheap and that’s one of the common use cases.
Take 2
Given the RailsEventStore
limitation I had to figure out something else. The idea was just too good to give it up that soon. And that’s when second aha moment arrived!
There’s this RailsEventStore::Projection
mechanism, which let’s you traverse multiple streams in search for particular events. When one is found, given lambda is called. Sounds familiar? Let’s see it in full shape:
class CateringMatch class State def initialize(event_store:, stream_name:) @event_store = event_store @stream_name = stream_name end def complete? initial = { caterer_confirmed: false, customer_confirmed: false, } state = RailsEventStore::Projection .from_stream(@stream_name) .init(->{ initial }) .when(CustomerConfirmedMenu, ->(state, event) { state[:customer_confirmed] = true }) .when(CatererConfirmedMenu, ->(state, event) { state[:caterer_confirmed] = true }) .run(@event_store) state[:customer_confirmed] && state[:caterer_confirmed] end end private_constant :State def initialize(command_bus:, event_store:) @command_bus = command_bus @event_store = event_store end def call(event) order_id = event.data(:order_id) state = State.new(event_store: @event_store, stream_name: "Order$#{order_id}") command_bus.(ConfirmOrder.new(data: { order_id: order_id })) if state.complete? end end
Implementation is noticeably shorter (thanks to hidden parts of RailsEventStore::Projection
). Works not only in theory. And this is the one I chose to stick with for my process manager.
I cannot however say I fully like it. The smell for me is that we peek into the stream that does not exclusively belong to the process manager (it does belong to aggregate into whose stream CustomerConfirmedMenu
and CatererConfirmedMenu
were published). Another culprit comes when testing. Projection can only work with events persisted in streams, so it is not sufficient to only pass an event as an input to process manager. You have to additionally persist it.
RSpec.describe CateringMatch do facts = [ CustomerConfirmedMenu.new(data: { order_id: '42' }), CatererConfirmedMenu.new(data: { order_id: '42' }) ] facts.permutation.each do |fact1, fact2| specify do command_bus = spy(:command_bus) event_store = RailsEventStore::Client.new CateringMatch.new(event_store: event_store, command_bus: command_bus).tap do |process_manager| event_store.append_to_stream(fact1, stream_name: "Order$#{fact1.data[:order_id]}") process_manager.(fact1) event_store.append_to_stream(fact2, stream_name: "Order$#{fact2.data[:order_id]}") process_manager.(fact2) end expect(command_bus).to have_received(:call) end end end
Would you choose event backed state for process manager as well? Let me know in comments!