Show / Hide Table of Contents

    At-Least-Once Delivery and Dealing with Duplicate Messages

    We made some comments on the issues with AtLeastOnceDeliveryActor implementations from Akka.Persistence earlier and in this article we explain how to use the DeDuplicatingReceiveActor from the Akka.Persistence.Extras package package to solve the problem of message duplication in Akka.NET.

    Deduplication Algorithm

    To correctly de-duplicate messages the DeDuplicatingReceiveActor must be able to reliably do the following:

    1. Uniquely identify every message per-sender;
    2. Track of which messages have already been processed and which ones have not;
    3. Persist the "tracking" state so the actor can pick up where it left off in the event of a crash, restart, or simply being recreated later;
    4. Deal with the sticky problem of the sending actor moving to a new location in the network - in other words, the IActorRef of the actor who delivered the message to the DeDuplicatingReceiveActor isn't a reliable identity for the "sender" for most periods of time.

    These are the problems that are solved by the DeDuplicatingReceiverActor's algorithm its messaging protocol.

    IConfirmableMessages

    The first step in the process is uniquely identifying each message and each message's sender. De-duplication is something that occurs on a per-sender basis, therefore reliably and repeatedly identifying the same sender even as it moves around on the network is key to doing this correctly.

    These problems are solved through the use of decorating delivered messages that require reliable delivery via the IConfirmableMessage interface in Akka.Persistence.Extras.

    public interface IConfirmableMessage
    {
        /// <summary>
        ///     The confirmation ID assigned by any type of <see cref="AtLeastOnceDeliveryActor" />.
        /// </summary>
        /// <remarks>
        ///     Must be monotonic per-sender: this means that if your <see cref="AtLeastOnceDeliveryActor" />
        ///     doesn't persist it's delivery state, you're going to have a "bad time"
        ///     when using the <see cref="DeDuplicatingReceiveActor" />.
        /// </remarks>
        long ConfirmationId { get; }
    
        /// <summary>
        ///     The globally unique (cluster-wide) ID of the sender. Usually this is the
        ///     <see cref="PersistentActor.PersistenceId" />
        /// </summary>
        string SenderId { get; }
    }
    

    The IConfirmableMessage interface allows the DeDuplicatingReceiveActor to do the following:

    1. Use the PersistenceId of the sender as the sender's identity; given that these values are meant to be globally unique per-cluster (maximum of 1 instance of a persistent entity cluster-wide,) they are safe and reliable targets. So the IConfirmableMessage.SenderId is intended to match the PersistenceId of the actor who sent the message.
    2. Stick the correlation id of the original message into a dedicated field the ConfirmationId, so duplicate messages can be automatically recognized by the DeDuplicatingReceiveActor.

    But because it can be considered to be a bit "smelly" in some circles to decorate application-specific message classes with an interface from a third party library, Akka.Persistence.Extras also ships the ConfirmableMessageEnvelope, which can wrap any of your user-defined messages and will include all of the same information used by the IConfirmableMessage interface.

    /// </summary>
    public sealed class ConfirmableMessageEnvelope : IConfirmableMessage
    {
        public ConfirmableMessageEnvelope(long confirmationId, string senderId, object message)
        {
            ConfirmationId = confirmationId;
            SenderId = senderId;
            Message = message;
        }
    
        /// <summary>
        ///     The user-defined message.
        /// </summary>
        public object Message { get; }
    
        /// <inheritdoc />
        public long ConfirmationId { get; }
    
        /// <inheritdoc />
        public string SenderId { get; }
    

    The DeDuplicatingReceiveActor automatically unpacks the underlying message and it can be programmed to handle the message normally via a Command<TMessage>(Action<TMessage> handler).

    Thus the first step in working with the DeDuplicatingReceiveActor algorithm is to use IConfirmableMessages inside your AtLeastOnceDeliveryActors, when they send their payloads. You can see an example of this inside the Akka.Persistence.Extras project source code:

    Command<Write>(write =>
    {
        Deliver(_targetActor.Path,
            messageId => new ConfirmableMessageEnvelope(messageId, PersistenceId, write));
    
        // save the full state of the at least once delivery actor
        // so we don't lose any messages upon crash
        SaveSnapshot(GetDeliverySnapshot());
    });
    

    In the above sample we're using a ConfirmableMessageEnvelope to package up the original message sent to the AtLeastOnceDeliveryActor along with the correlation id it generated and the actor's PersistenceId.

    DeDuplicatingReceiveActor APIs

    The next stage in implementing the de-duplication algorithm is to use the DeDuplicatingReceiveActor's APIs the way they're intended to be used.

    The DeDuplicatingActor is a PersistentReceiveActor under the covers, so this actor will need to have a PeristenceId assigned to it. However, there is one major Nota Bene required here. Do not, under any circumstances, co-mingle the DeDuplicatingReceiveActor's persistent state with any other user-defined persistent state. If you take snapshots of user-defined objects, delete items from the journal, or delete snapshots then there is a 100% chance you will screw up this actor's ability to reliably track its receiver state across restarts.

    If you need to persist user-defined state, push it into another actor who works closely alongside with the DeDuplicatingReceiveActor - not directly in the DeDuplicatingReceiveActor itself.

    There are two important abstract methods you will need to implement in your DeDuplicatingReceiveActors:

    1. PersistenceId - a globally unique entity id for this PersistentActor;
    2. CreateConfirmationReplyMessage - a method which returns the confirmation message this actor will send back to the sender automatically when a duplicate message is detected.

    Sending ACKs back to the Sender

    The CreateConfirmationReplyMessage method will be invoked whenever the DeDuplicatingReceiveActor detects a duplicate, but it is also used when ConfirmAndReply is called from within end-user code. Here's an example of it in action, from the Akka.Persistence.Extras source code:

    protected override object CreateConfirmationReplyMessage(long confirmationId, string senderId,
        object originalMessage)
    {
        return new ReliableDeliveryAck(confirmationId);
    }
    

    In this instance, we're just sending back the ReliableDeliveryAck message defined in the code sample and expected by the AtLeastOnceDeliveryActor who originally sent us the message we're confirming. This precisely what you should do as well.

    Confirming Processed Messages

    Whenever the DeDuplicatingReceiveActor processes an IConfirmableMessage the end-user should always call the ConfirmDelivery method. This will cause the DeDuplicatingReceiveActor to automatically record a delivery to the Akka.Persistence.Journal and update its internal receiver state accordingly. This method will not automatically send a reply back to the sender, however - it only updates the actor's records.

    If you want to both confirm the message and send a reply back, this can be done via the ConfirmAndReply method built into the DeDuplicatingReceiveActor:

        Command<Write>(write =>
        {
            Console.WriteLine("Received message {0} [id: {1}] from {2} - accept?", write.Content,
                CurrentConfirmationId, CurrentSenderId);
            var response = Console.ReadLine()?.ToLowerInvariant();
            if (!string.IsNullOrEmpty(response) && (response.Equals("yes") || response.Equals("y")))
            {
                // confirm delivery only if the user explicitly agrees
                ConfirmAndReply(write);
                Console.WriteLine("Confirmed delivery of message ID {0}", CurrentConfirmationId);
            }
            else
            {
                Console.WriteLine("Did not confirm delivery of message ID {0}", CurrentConfirmationId);
            }
        });
    }
    

    Optional: Custom Duplicate Message Handling Code

    By default the DeDuplicatingReceiveActor will automatically send replies created via the CreateConfirmationReplyMessage method back to the Sender when a duplicate message is detected. However, if you wish to customize the way your actor handles duplicates you can do so by overriding the HandleDuplicate method:

    protected override void HandleDuplicate(long confirmationId, string senderId, object duplicateMessage)
    {
        Console.WriteLine("Automatically de-duplicated message with ID {0} from {1}", confirmationId, senderId);
        base.HandleDuplicate(confirmationId, senderId, duplicateMessage);
    }
    

    Important caveat here though: if you don't use the HandleDuplicate method to send a confirmation back to whoever the correct sender is, you will have just defeated the entire purpose of having the DeDupliatingReceiveActor in the first place. We strongly recommend that you read the DeDuplicatingReceiveActor source code before messing with this.

    Receiver State

    The most important piece of the puzzle is the DeDuplicatingReceiveActor's internal state that it uses to remember which messages it has seen from which senders in the past. This state is updated through the use of the ConfirmDelivery method.

    By default, the IReceiverState implementation used by the actor does the following:

    1. Assumes messages sent by any given sender can and will be delivered out of order (i.e. we can't rely on the ConfirmationId increasingly monotonically);
    2. Stores up to 1000 unique message ids per-sender;
    3. Will be automatically compacted into a snapshot in the Akka.Persistence.SnapshotStore once every 100 messages; and
    4. Will automatically purge data belonging to any sender that hasn't been active for longer than 30 minutes.

    We enforce these constraints on the IReceiverState in order to ensure that the memory footprint of any individual DeDuplicatingReceiveActor remains bounded even during periods of heavy use. If a DeDuplicatingReceiveActor receives more than 1000 messages from a single sender then it uses a circular buffer to overwrite the older ConfirmationIds with newer ones.

    If you wish to change any of these settings, this can be accomplished by passing a DeDuplicatingReceiverSettings object into the base class constructor of the DeDuplicatingReceiveActor.

    N.B. Akka.Persistence.Extras does not currently support strict ordering for IRecieverState. This will be added once ExactlyOnceStronglyOrderedDeliveryActor inside Akka.Persistence.Extras is implemented.

    Persistence

    By default the DeDuplicatingReceiveActor will persist its state each time ConfirmDelivery is called, using whatever the currently configured Akka.Persistence journal implementation is. The actor will also periodically compress that information into a single snapshot payload - usually once every 100 messages confirmed.

    The DeDuplicatingReceiveActor will automatically recover its state upon startup so it can remember its previous receiver state. Consider it the "receiver side" analog of the AtLeastOnceDeliveryActor.

    Back to top Copyright © 2015-2018 Petabridge®