At-Least-Once Delivery and Dealing with Duplicate Messages
We made some comments on the issues with AtLeastOnceDeliveryActor
implementations from Akka.Persistence earlier and in this article we explain how to use the DeDuplicatingReceiveActor
from the Akka.Persistence.Extras package package to solve the problem of message duplication in Akka.NET.
Deduplication Algorithm
To correctly de-duplicate messages the DeDuplicatingReceiveActor
must be able to reliably do the following:
- Uniquely identify every message per-sender;
- Track of which messages have already been processed and which ones have not;
- Persist the "tracking" state so the actor can pick up where it left off in the event of a crash, restart, or simply being recreated later;
- Deal with the sticky problem of the sending actor moving to a new location in the network - in other words, the
IActorRef
of the actor who delivered the message to theDeDuplicatingReceiveActor
isn't a reliable identity for the "sender" for most periods of time.
These are the problems that are solved by the DeDuplicatingReceiverActor
's algorithm its messaging protocol.
IConfirmableMessage
s
The first step in the process is uniquely identifying each message and each message's sender. De-duplication is something that occurs on a per-sender basis, therefore reliably and repeatedly identifying the same sender even as it moves around on the network is key to doing this correctly.
These problems are solved through the use of decorating delivered messages that require reliable delivery via the IConfirmableMessage
interface in Akka.Persistence.Extras.
public interface IConfirmableMessage
{
/// <summary>
/// The confirmation ID assigned by any type of <see cref="AtLeastOnceDeliveryActor" />.
/// </summary>
/// <remarks>
/// Must be monotonic per-sender: this means that if your <see cref="AtLeastOnceDeliveryActor" />
/// doesn't persist it's delivery state, you're going to have a "bad time"
/// when using the <see cref="DeDuplicatingReceiveActor" />.
/// </remarks>
long ConfirmationId { get; }
/// <summary>
/// The globally unique (cluster-wide) ID of the sender. Usually this is the
/// <see cref="PersistentActor.PersistenceId" />
/// </summary>
string SenderId { get; }
}
The IConfirmableMessage
interface allows the DeDuplicatingReceiveActor
to do the following:
- Use the
PersistenceId
of the sender as the sender's identity; given that these values are meant to be globally unique per-cluster (maximum of 1 instance of a persistent entity cluster-wide,) they are safe and reliable targets. So theIConfirmableMessage.SenderId
is intended to match thePersistenceId
of the actor who sent the message. - Stick the correlation id of the original message into a dedicated field the
ConfirmationId
, so duplicate messages can be automatically recognized by theDeDuplicatingReceiveActor
.
But because it can be considered to be a bit "smelly" in some circles to decorate application-specific message classes with an interface from a third party library, Akka.Persistence.Extras also ships the ConfirmableMessageEnvelope
, which can wrap any of your user-defined messages and will include all of the same information used by the IConfirmableMessage
interface.
/// </summary>
public sealed class ConfirmableMessageEnvelope : IConfirmableMessage
{
public ConfirmableMessageEnvelope(long confirmationId, string senderId, object message)
{
ConfirmationId = confirmationId;
SenderId = senderId;
Message = message;
}
/// <summary>
/// The user-defined message.
/// </summary>
public object Message { get; }
/// <inheritdoc />
public long ConfirmationId { get; }
/// <inheritdoc />
public string SenderId { get; }
The DeDuplicatingReceiveActor
automatically unpacks the underlying message and it can be programmed to handle the message normally via a Command<TMessage>(Action<TMessage> handler)
.
Thus the first step in working with the DeDuplicatingReceiveActor
algorithm is to use IConfirmableMessage
s inside your AtLeastOnceDeliveryActor
s, when they send their payloads. You can see an example of this inside the Akka.Persistence.Extras project source code:
Command<Write>(write =>
{
Deliver(_targetActor.Path,
messageId => new ConfirmableMessageEnvelope(messageId, PersistenceId, write));
// save the full state of the at least once delivery actor
// so we don't lose any messages upon crash
SaveSnapshot(GetDeliverySnapshot());
});
In the above sample we're using a ConfirmableMessageEnvelope
to package up the original message sent to the AtLeastOnceDeliveryActor
along with the correlation id it generated and the actor's PersistenceId
.
DeDuplicatingReceiveActor
APIs
The next stage in implementing the de-duplication algorithm is to use the DeDuplicatingReceiveActor
's APIs the way they're intended to be used.
The
DeDuplicatingActor
is aPersistentReceiveActor
under the covers, so this actor will need to have aPeristenceId
assigned to it. However, there is one major Nota Bene required here. Do not, under any circumstances, co-mingle theDeDuplicatingReceiveActor
's persistent state with any other user-defined persistent state. If you take snapshots of user-defined objects, delete items from the journal, or delete snapshots then there is a 100% chance you will screw up this actor's ability to reliably track its receiver state across restarts.If you need to persist user-defined state, push it into another actor who works closely alongside with the
DeDuplicatingReceiveActor
- not directly in theDeDuplicatingReceiveActor
itself.
There are two important abstract methods you will need to implement in your DeDuplicatingReceiveActor
s:
PersistenceId
- a globally unique entity id for thisPersistentActor
;CreateConfirmationReplyMessage
- a method which returns the confirmation message this actor will send back to the sender automatically when a duplicate message is detected.
Sending ACKs back to the Sender
The CreateConfirmationReplyMessage
method will be invoked whenever the DeDuplicatingReceiveActor
detects a duplicate, but it is also used when ConfirmAndReply
is called from within end-user code. Here's an example of it in action, from the Akka.Persistence.Extras source code:
protected override object CreateConfirmationReplyMessage(long confirmationId, string senderId,
object originalMessage)
{
return new ReliableDeliveryAck(confirmationId);
}
In this instance, we're just sending back the ReliableDeliveryAck
message defined in the code sample and expected by the AtLeastOnceDeliveryActor
who originally sent us the message we're confirming. This precisely what you should do as well.
Confirming Processed Messages
Whenever the DeDuplicatingReceiveActor
processes an IConfirmableMessage
the end-user should always call the ConfirmDelivery
method. This will cause the DeDuplicatingReceiveActor
to automatically record a delivery to the Akka.Persistence.Journal and update its internal receiver state accordingly. This method will not automatically send a reply back to the sender, however - it only updates the actor's records.
If you want to both confirm the message and send a reply back, this can be done via the ConfirmAndReply
method built into the DeDuplicatingReceiveActor
:
Command<Write>(write =>
{
Console.WriteLine("Received message {0} [id: {1}] from {2} - accept?", write.Content,
CurrentConfirmationId, CurrentSenderId);
var response = Console.ReadLine()?.ToLowerInvariant();
if (!string.IsNullOrEmpty(response) && (response.Equals("yes") || response.Equals("y")))
{
// confirm delivery only if the user explicitly agrees
ConfirmAndReply(write);
Console.WriteLine("Confirmed delivery of message ID {0}", CurrentConfirmationId);
}
else
{
Console.WriteLine("Did not confirm delivery of message ID {0}", CurrentConfirmationId);
}
});
}
Optional: Custom Duplicate Message Handling Code
By default the DeDuplicatingReceiveActor
will automatically send replies created via the CreateConfirmationReplyMessage
method back to the Sender
when a duplicate message is detected. However, if you wish to customize the way your actor handles duplicates you can do so by overriding the HandleDuplicate
method:
protected override void HandleDuplicate(long confirmationId, string senderId, object duplicateMessage)
{
Console.WriteLine("Automatically de-duplicated message with ID {0} from {1}", confirmationId, senderId);
base.HandleDuplicate(confirmationId, senderId, duplicateMessage);
}
Important caveat here though: if you don't use the HandleDuplicate
method to send a confirmation back to whoever the correct sender is, you will have just defeated the entire purpose of having the DeDupliatingReceiveActor
in the first place. We strongly recommend that you read the DeDuplicatingReceiveActor
source code before messing with this.
Receiver State
The most important piece of the puzzle is the DeDuplicatingReceiveActor
's internal state that it uses to remember which messages it has seen from which senders in the past. This state is updated through the use of the ConfirmDelivery
method.
By default, the IReceiverState
implementation used by the actor does the following:
- Assumes messages sent by any given sender can and will be delivered out of order (i.e. we can't rely on the
ConfirmationId
increasingly monotonically); - Stores up to 1000 unique message ids per-sender;
- Will be automatically compacted into a snapshot in the Akka.Persistence.SnapshotStore once every 100 messages; and
- Will automatically purge data belonging to any sender that hasn't been active for longer than 30 minutes.
We enforce these constraints on the IReceiverState
in order to ensure that the memory footprint of any individual DeDuplicatingReceiveActor
remains bounded even during periods of heavy use. If a DeDuplicatingReceiveActor
receives more than 1000 messages from a single sender then it uses a circular buffer to overwrite the older ConfirmationId
s with newer ones.
If you wish to change any of these settings, this can be accomplished by passing a DeDuplicatingReceiverSettings
object into the base class constructor of the DeDuplicatingReceiveActor
.
N.B. Akka.Persistence.Extras does not currently support strict ordering for
IRecieverState
. This will be added onceExactlyOnceStronglyOrderedDeliveryActor
inside Akka.Persistence.Extras is implemented.
Persistence
By default the DeDuplicatingReceiveActor
will persist its state each time ConfirmDelivery
is called, using whatever the currently configured Akka.Persistence journal implementation is. The actor will also periodically compress that information into a single snapshot payload - usually once every 100 messages confirmed.
The DeDuplicatingReceiveActor
will automatically recover its state upon startup so it can remember its previous receiver state. Consider it the "receiver side" analog of the AtLeastOnceDeliveryActor
.