This is an example implementation of store and forward messaging built on top of Coherence.
The Messaging Pattern advocates that;
Payload, typically represented as a Message object, may be sent to a Destination from a Sender (also commonly known as a Publisher).
It is the responsibility of the infrastructure managing the Destination to ensure that Messages (arriving at the said Destination) are then stored (in some manner) and consequently forwarded (in the order in which they arrived at the said Destination) to one or more Receivers (also commonly known as Subscribers).
The Subscribers appropriately consume (receive and acknowledge receipt) of the said Messages from the Destination in the order in which they were forwarded to the said Subscribers.
The infrastructure managing the Messages appropriately clean-up (remove and garbage collect) the said Messages that have been consumed by Subscribers.
The type of the Destination determines the method of delivery to the Subscribers on that Destination.
A Topic Destination (or Topic) will store and forward Messages to all of the Subscribers of the said Topic Destination. This form of Message delivery is often called "publish-and-subscribe messaging", "one-to-many messaging" or "the observer pattern".
A Queue Destination (or Queue) will store and forward Messages to at most one of the Subscribers of the said Queue Destination. For each Message a different Subscriber may be used, but this is implementation and runtime dependent. This form of Message delivery is often called "point-to-point messaging" or "one-to-one messaging".
A Message may be Persistent or Non-Persistent. In the case of Persistent Messages, the infrastructure managing the Destination must safely store the said Messages to a persistent (and recoverable) storage device so that in the case of infrastructure failure, Messages may be recovered (not lost).
A Subscriber to a Topic is either Durable or Non-Durable. Durable Subscriptions allow the system implementing the Subscriber to terminate and return without losing Messages that may have been delivered during the outage (or disconnection).
Rationale
While it's rare that an architecture making extensive use of Coherence will require Store and Forward Messaging (due to the ability to use MapListeners, Continuous Queries and events for notifications), there are arguably some circumstances (when clients may be disconnected) where the pattern is particularly useful.
Although providing an implementation of store and forward Messaging on-top-of-Coherence has always been possible (as demonstrated by this implementation), most application-level requirements for messaging are often satisfied using existing, off-the-shelf and standardized corporate messaging infrastructure, like JMS. In the cases where such infrastructure is not available or not appropriate, this implementation may provide some benefits.
While it is not the intention nor is it the purpose of this implementation to replace existing messaging infrastructure, this implementation is specifically designed to provide a flexible framework for application-specific, high-performance messaging on a Data Grid.
More specifically, this implementation has been designed as a minimal framework to support multi-directional and multi-point (no single point of failure) push replication between multiple Coherence clusters that are deployed and interconnected by high-latency, high-bandwidth but unreliable wide-area-networks (WANs)... aka: The Push Replication Pattern.
This project (like other Coherence Incubator projects) uses Apache Ivy for dependency specification and management. While a standard ivy.xml definition file ships with the source and documentation distribution, the following diagram visually indicates the current dependencies.
Benefits of Datagrid-based Hub-less Messaging
The implementation of the Store and Forward Messaging Pattern on Oracle Coherence provides many unique advantages over traditional hub-and-spoke messaging solutions.
The messaging infrastructure becomes part of and shares the same Coherence Datagrid infrastructure (by default) as your application. Consequently the requirement for separately installed and maintained "messaging servers" is not required.
Being "part" of your application, there is no reason to wrap application objects as Messages (unlike JMS). You can simply publish and consume your application and domain objects.
As all stateful objects in the pattern are managed using standard distributed caches (including Topics, Queues, Subscriptions, and Messages themselves), you may use standard Oracle Coherence features to monitor, manage, query and observe the state of the messaging infrastructure.
Given that the source code is publicly available, you are free to extend and enhance the messaging solution to suit your needs.
Being based on Oracle Coherence, you can rest assured that the solution will provide highly available, scalable (scale-out) and resilient messaging infrastructure.
What's New?
The following changes have been made since the Messaging Pattern 2.7.0 release.
The Store and Forward Messaging pattern no longer uses the Command Pattern, but rather makes extensive use of entry processors and asynchronous event processing scheduled by backing map listeners. As a result, message publishing to a given topic or queue is no longer funneled through a single command context running on a given node. Rather, publishing is now distributed across the cluster and will scale as the cluster scales.
Internally, the Messaging Pattern uses three Coherence distributed caches: the Destination cache (for Topic and Queue objects), the Subscription cache and the Message cache. To use Messaging, an application creates a destination, subscribes to that destination, then publishes messages to the destination. When an application publishes a message, it is put in the Message cache. A Message cache backing map listener receives an insert event and schedules a thread to process the newly inserted message. If the destination is a topic, the background thread will invoke each subscription to that topic notifying it about the new message. All topic subscriptions will receive the message. For a queue destination, the background thread will notify the queue, which then hands the message off to a single subscription, the next one waiting for a message. Meanwhile, the subscriber, which runs in the client JVM, has a client listener registered on the subscription cache. When the messages arrives at the subscription, the subscriber listener is called and it puts the subscription object on an internal LinkedBlockingQueue. When the client thread calls the Subscriber.getMessage method, the subscriber waits until the LinkedBlockingQueue has an entry. Once the subscriber takes a entry off the queue, it then gets the message from the Message cache and returns the payload to the application.
Some of features of Messages
Is naturally geared for accepting as many messages as physically possible (until the network capacity is exceeded or you run out of storage).
Is completely JMX enabled and monitorable. Simply enable Coherence JMX monitoring to the Messaging tree which shows all topics, queues and subscriptions.
Message delivery is guaranteed to occur in-order on a per client session basis (publisher).
Server-loss (i.e.: cluster member loss) does not effect the availability of the messaging infrastructure (as it's built using Coherence distributed caching).
The most significant interface (from an application-level perspective) is the MessagingSession. Implementations of this interface, namely the DefaultMessagingSession, are how you control the messaging infrastructure, including;
Creating Topic and QueueDestinations
Subscribing and Unsubscribing to Destinations (creating Subscribers)
Publishing Payload to Destinations, and
Subscribing and reading (ie: consuming) Payload from Subscriptions
Using Topics and Queues
The following tabs outline the use Topics and Queues in an application.
The following examples outline the use of Topics (one to many messaging), where each published message is delivered to all subscribers.
Step 1: Creating aMessagingSession
Once you've installed and appropriately configured the implementation (see below), the first thing you need to do is create a MessagingSession by callingDefaultMessagingSession.getInstance() method.MessagingSession messagingSession = DefaultMessagingSession.getInstance();
Message ordering is guaranteed on a per session basis. All messages published using a given session will arrive at the subscriber in the same order, however there is no defined ordering for messages between sessions. Consider this example. A client thread is publishing messages A and B. At the same time, another client thread is publishing C and D to the same topic using a different session. A subscriber might receive the messages in any of the following orders: A,B,C,D or A,C,B,D or C,A,B,D, etc.
Step 2: Creating aTopic
Once you have a MessagingSession you can create a Topic.
A Topic is uniquely identified within the cluster by its name. If the Topic with the specified name (or Identifier) already exists, the Identifier of the existing Topic is returned.
Step 3a: Subscribing to aTopic(non-durable subscribers)
Once you have created a Topic you can subscribe to it by creating a Subscriber. By default, Subscribers to Topics (and Queues) are non-durable. That is, when a Subscriber becomes disconnected from the messaging infrastructure, all of the visible messages for the said Subscriber are appropriately rolled back and the subscription for the Subscriber is removed.
Step 3b: Subscribing to aTopic(durable subscribers)
To create a durable subscription to a Topic, you need to provide an appropriate TopicSubscriptionConfiguration when subscribing.
To resubscribe to a previously created durable subscription, simply call the subscribe method again with the same durable configuration (including the same subscription name)
Step 4: Publishingpayloadto aDestination
To publish message payload to a Destination, either Topics or Queues, you need to again use the MessagingSession. It's simple, call publishMessage(...) with the Identifier (or name) of the Destination and the corresponding payload.
The only thing you need to ensure is that the payload being published is in some way serializable, either by implementing standard Java java.io.serializable or Coherence ExternalizableLite or PortableObject.
Step 5: Consuming a message with aSubscriber
In order to consume a message using a Subscriber you need to use the SubscribergetMessage() method. This method will request, block and wait for a message to be delivered to the Subscriber and then returned to your application.
String message = (String)subscriber.getMessage();
By default newly created Subscribers are in "auto-commit" mode. This means that any message received from a Subscriber will be automatically acknowledged and removed from underlying messaging infrastructure. If however you'd like the opportunity to rollback received messages, simply set the Susbcriber auto commit mode to false. For Example:
subscriber.setAutoCommit(false);
Once you have done this you may make use of the rollback or commit methods on the Subscriber interface to manually control message acknowledgement in the underlying messaging infrastructure.
Like most messaging systems, including JMS, subscribers should only be used by a single thread. If you require concurrent subscriptions to a Destination, simply create more Subcribers.
The following examples outline the use of Queues (many to many messaging), where each message published is delivered to one and only one subscriber.
Step 1: Creating aMessagingSession
Once you've installed and appropriately configured the implementation (see below), the first thing you need to do is create a MessagingSession. This simplest way to achieve this is to use the statically defined DefaultMessagingSession.getInstance() method.
If the Queue with the specified name (or Identifier) already exists, the Identifier of the existing Queue is returned.
Step 3: Subscribing to aQueue
Once you have created a Queue you can subscribe to it by creating a Subscriber. Subscribers to Queues are always non-durable. That is, when a Subscriber becomes disconnected from the messaging infrastructure, all of the visible messages for the said Subscriber are appropriately rolled back and the subscription for the Subscriber is removed.
Why don't you support the Java Messaging Specification (ie: JMS) or feature X of JMS?
While it is theoretically possible for this implementation to be a SPI (Service Provider Implementation) for the Java Messaging Specification (JMS), this implementation has been explicitly designed to support the development of the Push Replication Pattern and other WAN-based architectures, that of which do not necessarily require JMS.
How can I monitor the infrastructure?
Two ways. Firstly by enabling JMX, you'll find that all Destinations (ie: Topics) are automatically registered into the clustered JMX tree (under Messaging). Secondly, as all of the infrastructure state is represented using Coherence distributed caches; you may examine, listen to and mutate the appropriately named caches, called: "coherence.messagingpattern.destinations", "coherence.messagingpattern.messages" and "coherence.messagingpattern.subscriptions".