In this post I want to talk a little about concurrency in the context of CQRS and event sourcing. Specifically, when a single aggregate is concurrently accessed by two commands (or command handlers, really).
Concurrent commands
Let’s assume there are two commands, DepositMoney and WithdrawMoney. Both commands are handled by the BankAccount aggregate.
Let’s also assume that those two commands are dispatched at exactly the same time. Without locking, the result of that operation is impossible to determine. Most likely, one of the command handlers will fail to apply an event.
Depending on the framework, language and threading model used, one of the commands will “win” and its handler will apply an event with sequence number x – an event which is then persisted. The other command handler will also try to apply an event, with the same sequence number, this will fail to persist. The result is a nasty exception.
Ideally, we’d like to detect such situations a little earlier in the process, or prevent them altogether.
Pessimistic locking
Pessimistic locking will prevent any concurrent access to the aggregate. An aggregate can only be accessed by one thread, call or process at a time.
This locking strategy is the safest, but also the slowest. Each thread that wants to access an aggregate has to wait for the lock to be released by the previous thread.
Be aware of timeouts and deadlocks!
Optimistic locking
The optimistic locking strategy tries to detect concurrent access to an aggregate, and will throw an exception if it does so.
This is achieved by storing the version of the aggregate (describing the state of the aggregate when it was last loaded) with the command that’s being dispatched. In the example above, DepositMoney wins. When that command was dispatched, the aggregate was at version 1. After handling the command and applying the MoneyDeposited event, the aggregate is at version 2. Then WithdrawMoney is handled, but the version associated with the command does not match the current version of the aggregate, leading to a ConcurrencyException.
Multiple machines
Locking is slightly more difficult when dealing with clustered or replicated services. Typically, such services are behind a load balancer, and each replica handles a fair share of the requests to the service.
When it comes to locks, normally they are maintained per (virtual) machine or thread pool. This means that, in a clustered setup, concurrent access to aggregates is still a possibility,
There are a few ways to deal with that:
- A distributed lock manager, implemented using something like ZooKeeper, ETCD or Redis. Potentially complicated & expensive.
- Make sure that all commands for a specific aggregate are handled within the same (virtual) machine or thread pool. This can be achieved by a consistent hashing algorithm (based on the aggregate id, for example), to route commands to the correct replica. Axon Framework has native support for a distributed command bus (using JGroups or, more recently, Spring Cloud).
Conflict resolution
Concurrent access to or modification of aggregates doesn’t always pose a problem. Consider the aggregate Company, with the commands ChangeName and ChangeAddress and corresponding events NameChanged and AddressChanged. ChangeName and ChangeAddress deal with different (non-overlapping) parts of the aggregate state and can be safely merged if they are dispatched together. After all, the order in which NameChanged and AddressChanged are applied does not influence the final aggregate state.
However, two ChangeAddress commands that are simultaneously dispatched (because two users are submitting updates for a single company) do conflict. In this case you will want to display some sort of error message to the user that submitted the conflicting update, inviting them to try again.
In closing
I hope this post explains some of the things that you may encounter when dealing with concurrency & event sourcing. Let me know what you think!
@Michiel Rook Hi hope you’re doing well, My question about versioning number where and when we set the version number, which command will be used to solve the version conflicts.
Thanks
That depends on the framework you use. When using Axon for example, you can include the expected version in each command (and annotate that property with @TargetAggregateVersion). That property will then be used to validate the version as it’s known in the aggregate repository.
Another approach to solving the issue of handling concurrent commands sent to the same aggregate is to use the actor model[1]. By using a single actor per aggregate instance, account 12345678 in your example, commands are sent to the actor’s mailbox to be processed sequentially.
It is then the aggregate’s responsibility to protect its business invariants and ensure the commands are valid. For example, the withdraw money command will only be accepted if there are enough funds in account.
Two examples of actor based event sourced systems are Akka, with its event sourced persistence[2], and my own Commanded library[3] built in Elixir and taking advantage of Erlang’s actor model.
[1] https://en.wikipedia.org/wiki/Actor_model
[2] https://doc.akka.io/docs/akka/2.5/scala/persistence.html
[3] https://github.com/commanded/commanded
Yep, the actor model works well in this case. Thanks for the comment Ben!
I was actually expecting you to conclude this example with the idea of allowing both commands to complete, either through event merging, or by discussing the way banks work. The fact that you can withdraw more money from an account then it holds is helps to support this high-performance distributed and concurrent system.
Hi Dennis,
That’s actually what I mean with conflict resolution. Not all commands conflict, their events are valid in any order and thus can be merged.
For the CQRS+ES in a “competitive bidding” like scenario, which allow top-N winners in a competition. It seems aggregate affinity routing solution may not well-suited to, as we do need to avoid making huge requests to a single point for performance consideration. Introduce Actor_model also need a significant change in our deployment topology, which is also we do not want to try at the moment.
Inspired by the TCC, I am thinking how about introduce a business level “lock” to resolve this, saying, generate a “try-phase” event, this event will not be restricted by the seq#, indeed, it is just ordered by the timestamp written to store, then the downstream component will verify it in a business checking logic level, if passed the verification, we will then generate a confirm event, otherwise a cancel event get generated to release the locked resource. Actually, we also can some enhancement in this scenario to validate the try-phase command via snapshot status, e.g: there are already “N” winners come up, just reply a failure back without an try-phase event get generated.
A lot of write conflicts can be avoided by carefully designing aggregates so as to minimize the likelihood of this being an issue.
Independent of CQRS/ES, on optimistic concurrency control, the simple and cheap solution is to just retry the command. So in case of NastyException, the command gets retried at application/command handler level; The (hopefully) new version of the aggregate gets read again, and the command might very well succeed.
This is, in lots of cases, much better performing than waiting for a lock that might not have been necessary and would produce the same result anyway (ie: unexpected insufficient funds for our very slow human user and her UI with outdated data)
Good article and nice to see you around in the Intertubes!