This is an example implementation of Push Replication pattern.
The Push Replication Pattern advocates that;
Operations (such as insert, update and delete) occurring on Data in one Site should be pushed using one or more Publishers to an associated Device.
A Publisher is responsible for optimistically replicatingOperations (in the order in which the said Operations originally occurred) on or with the associated Device.
If a Device is unavailable for some reason, the Operations to be replicated using the associated Publisher will be queued and executed (in the original order) at a later point in time.
This implementation of the Push Replication Pattern additionally advocates that;
The Data on which Operations occur are standard Coherence Cache Entries.
Operations replicated include inserts, updates, and deletes of Cache Entries*. This includes NamedCache put and remove, as well as updates that are artifacts of invoking AbstractProcessor.
The Operations that are enqueued for replication are called EntryOperations. They contain key and value pairs of updated cache entries in Binary form.
A Site is a Coherence Cluster.
A Site may act as both a sender of Operations and receiver of Operations. That is, multi-way multi-site push replication is permitted.
A Device may be anyone of the following; a local cluster, a remote cluster, a file system, a database, an i/o stream, a logging system etc.
Rationale
The purpose of this pattern is to provide extensible, flexible, high-performance, highly-available and scalable infrastructure to support the replication of EntryOperations occurring in one Coherence Cluster to one or more possibly globally distributed other Coherence Clusters.
While this naturally forms a "hub-and-spoke" architecture of federated Coherence Clusters, by configuring each Coherence Cluster as a "hub" and a "spoke", multi-way push replication may be achieved.
This project (like other Coherence Incubator projects) uses Apache Ivy for dependency specification and management. While a standard ivy.xml definition file ships with the source and documentation distribution, the following diagram visually indicates the current dependencies.
What's New?
The following changes have been made since the Push Replication Pattern 2.6.0 release.
Now dependent on Coherence 3.5.3+ for the Coherence 3.5 Editions.
Performance optimizations have been added to reduce serialization and deserialization overhead during the process of en-queuing, sending, and processing entry operations. Entries are now retrieved from a cache store in binary form and are sent to a replication target site without deserialization. Deserialization, if it needs to happen, is performed by conflict resolver classes on the the target site.
Conflict Resolvers are abstracted to return a ConflictResolution object that dictates what is to be done with an entry: use source entry (as is), use target entry (as is), use a merged entry (defined by the conflict resolver class), or remove an entry. Only when merging does an entry have to be deserialized.
PushReplication transparently decorates entries with site information to be more intelligent about managing entries. With this release users of Push Replication should assign unique names to Coherence clusters participating in push replication using the tangosol.coherence.site setting. (e.g. -Dtangosol.coherence.site=Site1)
Decorated entries prevent "ping-pong" (i.e. replicated updates going back to the source) and allowed Push Replication to eliminate the SafePublishing layer (which was used to prevent "ping pong" by using a set-aside list.)
Introduces the ReplicationRole which controls how the RemoteInvocationService publisher handles updates that are artifacts of replication from other clusters. Currently there are two roles: LEAF and HUB. LEAF is the default behavior and tells the publisher to filter out updates it sees that were caused by replication. Simply stated, if an update is replicated to a LEAF, the update will be processed by the leaf but not replicated elsewhere. The HUB role declares that updates it sees caused by replication will be forwarded to other clusters that are targets of replication with one exception: if the target is the same cluster from which the update originated, it will not be replicated to that one cluster (i.e. prevents ping-pong)
The Message Pattern layer has been revamped to provide better scalability for Push Replication. Serialization of updates is enforced on a per entry basis which relaxes the need to serialize all updates to the message queues used to service push replication.
New JMX PublishingService attributes keep track of batches published (i.e. publishCount), total entry operations processed, and total entry operations published. (Processed entries mean that they are candidates for replication, but may be rejected because they represent updates from a replicating site.)
API Changes
The runtime invocation for all cluster members requires use of -Dtangosol.coherence.site. The site name assigned must be unique for each cluster involved in push replication. The assigned site names are used to know where entry updates originated.
The ConflictResolver interface has changed. The method now returns a ConflictResolution object which abstracts how a conflict is to be resolved.
The ConflictResolver accepts two inputs that are in Binary form: the EntryOperation (which encapulates an Entry being replicated from another cluster), and a BinaryEntry (which represents the target entry in Binary form). The ConflictResolver returns a ConflictResolution object which denotes how the system should handle the conflict: preserve the target, preserve the source, remove the target, or update the target with a merged entry.
Optional declaration of TargetClusterSiteName and ReplicationMode (LEAF or HUB) in RemoteInvocationServicePublisher. These settings only need to be set when a cluster is a replication hub. The TargetClusterSiteName must agree with the site name expressed in the target cluster using the the -Dtangosol.coherence.site setting.
In the "Active Passive" deployment model updates to data made in the active grid are are sent to the passive grid asynchronously and ordered per NamedCache.
"Hub and Spoke" aka "Master and Slaves"
In the "Hub and Spoke" deployment model updates to data made in the active "hub" grid are are sent to any number of passive "spoke" grids asynchronously and ordered per NamedCache.
"Active Active" aka "Hot Hot" aka "Federated"
In the "Active Active" deployment model updates to data made in either of the active grids are are sent to other active grid asynchronously and ordered per NamedCache.
In the "Multi-Master" deployment model updates to data made in any active grid are are sent all other active grids asynchronously and ordered per NamedCache.
"Centralized Replication"
In the "Centralized Replication" model, a cluster serves as a hub to capture and route replicated entries from a set of "leaf" clusters. This works best when a cluster "owns" a set of entries in a cache (i.e. is exclusively responsible for updating a set of entries.) Leaf clusters only have to publish to a "hub" cluster, which is responsible for processing all updates from all leafs, and propagating a update to all other leafs.
All clusters need to have unique site names that are declared using Coherence system property tangosol.coherence.site.
One creates a hub publisher by creating and registering a BatchPublisherAdaptor in the hub cluster using two additional arguments: the site name of the leaf the hub will be publishing to (this needs to be the identical string as the site name mentioned above), and an enum which designates ReplicatonRole.HUB.
Without these arguments updates in a cluster that are artifacts of replication are processed locally in a cluster but not replicated elsewhere. The cluster behaves as a leaf and a terminus for replicate activity. With the aforementioned hub settings, the publisher acts as a hub and propogates all updates (local and replicated updates from leaf clusters) to all clusters (the exception being that a hub cluster will not replicate an update to a site who was the origin of an update operation.
Each model additionally supports Conflict Resolution at the destination site through the specification of ConflictResolvers. The default conflict resolver (called a BruteForceConflictResolver) will simply overwrite the existing value - that is, last write wins.
Implementation Discussion
Before you start
The Push Replication Pattern makes extensive use of the Messaging Pattern as infrastructure to capture EntryOperations and deliver them in-order of occurrence to BatchPublishers. Consequently may be beneficial for you to get familiar with the Messaging Pattern (and it's implementation) to understand the implementation of this pattern.
After such operations are captured (represented as EntryOperations) they are sent to the messaging layer (implemented using the Messaging Pattern) for distribution to specialized TopicSubscriptions called PublishingSubscriptions.
When PublishingSubscriptions receive the EntryOperations from the messaging layer, an internal event (captured by a backing-map-listener executing in the same JVM in which the PublishingSubscription is being managed by Coherence) signals to a PublishingService to commence publishing the currently queued EntryOperations with an associated BatchPublisher implementation. How the ultimate publishing occurs is entirely dependent on the implementation of the BatchPublisher.
The following diagram provides an overview of the push replication process through the various layers of the system.
Example
The following section provides an example use of the push replication pattern, including establishing publishers.
Step 1: Configuring a Cache (with a PublishingCacheStore)
As push replication occurs on a cache-by-cache basis, the first step in making use of this pattern is configuring your application cache(s) to use an appropriate PublishingCacheStore implementation. For the "hub-and-spoke" model, where one cluster is a "master" and the other clusters are "spokes", you should use a PublishingCacheStore.
By default, the Push Replication cache configuration (coherence-pushreplicationpattern-pof-cache-config.xml) defines a "publishing-*" mapping to a distributed-scheme that is configured to use a PublishingCacheStore. Consequently any cache name prefixed with "publishing-" will automatically support Push Replication.
When your application first accesses a "publishing-*" based scheme (which in turn creates the PublishingCacheStore), the underlying messaging infrastructure will be created to support Push Replication. This includes establishing a Topic that is named exactly the same as your cache. In this case, a Topic called "publishing-cache" will be created in the messaging layer.
From here on any operation that mutates the "publishing-cache" will have the said operation captured and sent (as an EntryOperation) to the "publishing-cache" Topic, after which the operation will be forwarded in-order to the Subscriptions on the said Topic.
Step 2: Registering a BatchPublisher to perform Push Replication
Until a BatchPublisher is registered for a cache configured with a PublishingCacheStore, no publishing will occur. To register a BatchPublisher to publish EntryOperations, you need to use a PushReplicationManager.
PushReplicationManager pushReplicationManager = DefaultPushReplicationMananger.getInstance();
pushReplicationManager.registerBatchPublisher("publishing-cache", "stderr-publisher", new BatchPublisherAdapter(new StdErrPublisher()));
There are essentially two forms of publishers; those that support "batching" and those that don't. By default, the Push Replication pattern only works with BatchPublishers. Consequently to use a non-batching publisher (ie: an implementation of the Publisher interface), an adapter has been provided. The package com.oracle.coherence.pushreplication.publishers provides numerous implementations of Publishers and BatchPublishers together with adapters to convert between them (as demonstrated in the example above)
Step 3: Using a publishing- cache.*
Once you've registered one or more BatchPublishers for each cache from which you would like to publish EntryOperations, you may simply use the said cache(s) as regular caches (as they are!).
Hence the following code, registered with the above "stderr-publisher", would output the "put" operations to stderr.
What are the ordering characteristics of push replication?
Push Replication currently supports replication at the Data level and supports serialization of operations against a particular cache entry. This means that for a given entry in a cache, any updates made to that entry will be propagated to a target Site and applied in the same order. Serialization across different entries in a cache, or between updates to different caches is not enforced.
How can the Push Replication Publishers be monitored?
Like the Command Pattern this pattern is JMX ready. By simply enabling JMX on the Coherence Cluster, each of the PublishingServices will be presented in the JMX tree, detailing current replication state and statistics.
How do I enable Push Replication to one or more (remote) Coherence Clusters?
In order to publish to a remote cluster, you must;
Configure and enable one or more proxies on the remote cluster(s).
Ensure that the members of the remote cluster(s) have the Push Replication Pattern (and dependencies) in the class path
Configure and enable one or more Remote Invocation Services in the "hub" with the addresses of the remote cluster proxy members (of it you're using Coherence 3.4+, use an AddressProvider). An example scheme is defined in the coherence-pushreplicationpattern-cache-config.xml file.
Register appropriate RemoteInvocationPublishers for each cache (as follows).
String remoteCacheName = cacheName; //we'll publish to the same cache name as here in the remote site
pushReplicationManager.registerBatchPublisher(cacheName,
"remote-site",
new RemoteInvocationPublisher("RemoteSiteInvocationService", //the underlying service to use
new BatchPublisherAdapter(new LocalCachePublisher(remoteCacheName)),
false, //don't automatically start publishing
10000, //delay between batches being published
100, //maximum batch size
10000, //restart delay if failure occurs
5)); //maximum consecutive failures before suspension of publishing
How are conflicts resolved when replicating EntryOperations in remote clusters?
The LocalCachePublisher and RemoteCachePublisher classes support the specification of a CachePublisher.ConflictResolver implementation during construction. By implementing a CachePublisher.ConflictResolver your application may resolve any underlying Entry conflicts in an appropriate manner. If not specified, the LocalCachePublisher.BruteForceConflictResolver is used.