PRODUCTS AND SERVICES INDUSTRIES SUPPORT PARTNERS COMMUNITIES ABOUT
  The Coherence Incubator
  Push Replication Pattern
Added by Brian Oliver, last edited by Rob Misek on Feb 26, 2010  (view change)

Labels

 

The Push Replication Pattern

This is an example implementation of Push Replication pattern.

The Push Replication Pattern advocates that;

  1. Operations (such as insert, update and delete) occurring on Data in one Site should be pushed using one or more Publishers to an associated Device.
  2. A Publisher is responsible for optimistically replicating Operations (in the order in which the said Operations originally occurred) on or with the associated Device.
  3. If a Device is unavailable for some reason, the Operations to be replicated using the associated Publisher will be queued and executed (in the original order) at a later point in time.

This implementation of the Push Replication Pattern additionally advocates that;

  1. The Data on which Operations occur are standard Coherence Cache Entries.
  2. Operations replicated include inserts, updates, and deletes of Cache Entries*. This includes NamedCache put and remove, as well as updates that are artifacts of invoking AbstractProcessor.
  3. The Operations that are enqueued for replication are called EntryOperations. They contain key and value pairs of updated cache entries in Binary form.
  4. A Site is a Coherence Cluster.
  5. A Site may act as both a sender of Operations and receiver of Operations. That is, multi-way multi-site push replication is permitted.
  6. A Device may be anyone of the following; a local cluster, a remote cluster, a file system, a database, an i/o stream, a logging system etc.

Rationale

The purpose of this pattern is to provide extensible, flexible, high-performance, highly-available and scalable infrastructure to support the replication of EntryOperations occurring in one Coherence Cluster to one or more possibly globally distributed other Coherence Clusters.

While this naturally forms a "hub-and-spoke" architecture of federated Coherence Clusters, by configuring each Coherence Cluster as a "hub" and a "spoke", multi-way push replication may be achieved.

Outline

Release Name: Version 2.6.1: March 12th, 2010
Target Platforms: Java Standard Edition 5+
Requires Coherence Version: 3.5.3+
Dependencies: Coherence Common 1.6.1
Messaging Pattern 2.6.1
Download: coherence-pushreplicationpattern-2.6.1.14471.jar
MD5:be285567005f11dc1e4e4b3dbdffab2b
Source Code and Documentation: coherence-pushreplicationpattern-2.6.1.14471-src.zip
MD5:c912ef92babfb0afe669b306ba7e8449
Previous Releases: Push Replication Pattern 2.6.0
Push Replication Pattern 2.5.0
Push Replication Pattern 2.4.0
Push Replication Pattern 2.3.0
Push Replication Pattern 2.2.0
Push Replication Pattern 2.1.1

Dependencies

This project (like other Coherence Incubator projects) uses Apache Ivy for dependency specification and management. While a standard ivy.xml definition file ships with the source and documentation distribution, the following diagram visually indicates the current dependencies.

What's New?

The following changes have been made since the Push Replication Pattern 2.6.0 release.

  • Fixed casting defect caused by write-behind in BinaryReadWriteBackingMap storeAllInternal.
  • Fixed remove defect where a remove of an entry did not propogate because of decorations.
  • Fixed defect when keys containing decorations weren't "cleaned" before they are replicated.

The following changes have been made since the Push Replication Pattern 2.5.0 release.

  • Now dependent on Coherence 3.5.3+ for the Coherence 3.5 Editions.
  • Performance optimizations have been added to reduce serialization and deserialization overhead during the process of en-queuing, sending, and processing entry operations. Entries are now retrieved from a cache store in binary form and are sent to a replication target site without deserialization. Deserialization, if it needs to happen, is performed by conflict resolver classes on the the target site.
  • Conflict Resolvers are abstracted to return a ConflictResolution object that dictates what is to be done with an entry: use source entry (as is), use target entry (as is), use a merged entry (defined by the conflict resolver class), or remove an entry. Only when merging does an entry have to be deserialized.
  • PushReplication transparently decorates entries with site information to be more intelligent about managing entries. With this release users of Push Replication should assign unique names to Coherence clusters participating in push replication using the tangosol.coherence.site setting. (e.g. -Dtangosol.coherence.site=Site1)
  • Decorated entries prevent "ping-pong" (i.e. replicated updates going back to the source) and allowed Push Replication to eliminate the SafePublishing layer (which was used to prevent "ping pong" by using a set-aside list.)
  • Introduces the ReplicationRole which controls how the RemoteInvocationService publisher handles updates that are artifacts of replication from other clusters. Currently there are two roles: LEAF and HUB. LEAF is the default behavior and tells the publisher to filter out updates it sees that were caused by replication. Simply stated, if an update is replicated to a LEAF, the update will be processed by the leaf but not replicated elsewhere. The HUB role declares that updates it sees caused by replication will be forwarded to other clusters that are targets of replication with one exception: if the target is the same cluster from which the update originated, it will not be replicated to that one cluster (i.e. prevents ping-pong)
  • The Message Pattern layer has been revamped to provide better scalability for Push Replication. Serialization of updates is enforced on a per entry basis which relaxes the need to serialize all updates to the message queues used to service push replication.
  • New JMX PublishingService attributes keep track of batches published (i.e. publishCount), total entry operations processed, and total entry operations published. (Processed entries mean that they are candidates for replication, but may be rejected because they represent updates from a replicating site.)

API Changes

  • The runtime invocation for all cluster members requires use of -Dtangosol.coherence.site. The site name assigned must be unique for each cluster involved in push replication. The assigned site names are used to know where entry updates originated.
  • The ConflictResolver interface has changed. The method now returns a ConflictResolution object which abstracts how a conflict is to be resolved.
    The ConflictResolver accepts two inputs that are in Binary form: the EntryOperation (which encapulates an Entry being replicated from another cluster), and a BinaryEntry (which represents the target entry in Binary form). The ConflictResolver returns a ConflictResolution object which denotes how the system should handle the conflict: preserve the target, preserve the source, remove the target, or update the target with a merged entry.
  • Optional declaration of TargetClusterSiteName and ReplicationMode (LEAF or HUB) in RemoteInvocationServicePublisher. These settings only need to be set when a cluster is a replication hub. The TargetClusterSiteName must agree with the site name expressed in the target cluster using the the -Dtangosol.coherence.site setting.

Examples

Example source code and uses of this pattern can be found in the Coherence Incubator Examples project

Supported Deployment Models

The following section outlines common deployment models supported by the Push Replication Pattern.

Each model additionally supports Conflict Resolution at the destination site through the specification of ConflictResolvers. The default conflict resolver (called a BruteForceConflictResolver) will simply overwrite the existing value - that is, last write wins.

Implementation Discussion

Before you start

The Push Replication Pattern makes extensive use of the Messaging Pattern as infrastructure to capture EntryOperations and deliver them in-order of occurrence to BatchPublishers. Consequently may be beneficial for you to get familiar with the Messaging Pattern (and it's implementation) to understand the implementation of this pattern.

After such operations are captured (represented as EntryOperations) they are sent to the messaging layer (implemented using the Messaging Pattern) for distribution to specialized TopicSubscriptions called PublishingSubscriptions.

When PublishingSubscriptions receive the EntryOperations from the messaging layer, an internal event (captured by a backing-map-listener executing in the same JVM in which the PublishingSubscription is being managed by Coherence) signals to a PublishingService to commence publishing the currently queued EntryOperations with an associated BatchPublisher implementation. How the ultimate publishing occurs is entirely dependent on the implementation of the BatchPublisher.

The following diagram provides an overview of the push replication process through the various layers of the system.

Example

The following section provides an example use of the push replication pattern, including establishing publishers.

Step 1: Configuring a Cache (with a PublishingCacheStore)

As push replication occurs on a cache-by-cache basis, the first step in making use of this pattern is configuring your application cache(s) to use an appropriate PublishingCacheStore implementation. For the "hub-and-spoke" model, where one cluster is a "master" and the other clusters are "spokes", you should use a PublishingCacheStore.

By default, the Push Replication cache configuration (coherence-pushreplicationpattern-pof-cache-config.xml) defines a "publishing-*" mapping to a distributed-scheme that is configured to use a PublishingCacheStore. Consequently any cache name prefixed with "publishing-" will automatically support Push Replication.

String cacheName = "publishing-cache";

NamedCache namedCache = CacheFactory.getCache(cacheName);

When your application first accesses a "publishing-*" based scheme (which in turn creates the PublishingCacheStore), the underlying messaging infrastructure will be created to support Push Replication. This includes establishing a Topic that is named exactly the same as your cache. In this case, a Topic called "publishing-cache" will be created in the messaging layer.

From here on any operation that mutates the "publishing-cache" will have the said operation captured and sent (as an EntryOperation) to the "publishing-cache" Topic, after which the operation will be forwarded in-order to the Subscriptions on the said Topic.

Step 2: Registering a BatchPublisher to perform Push Replication

Until a BatchPublisher is registered for a cache configured with a PublishingCacheStore, no publishing will occur. To register a BatchPublisher to publish EntryOperations, you need to use a PushReplicationManager.

PushReplicationManager pushReplicationManager = DefaultPushReplicationMananger.getInstance();

pushReplicationManager.registerBatchPublisher("publishing-cache", "stderr-publisher", new BatchPublisherAdapter(new StdErrPublisher()));

There are essentially two forms of publishers; those that support "batching" and those that don't. By default, the Push Replication pattern only works with BatchPublishers. Consequently to use a non-batching publisher (ie: an implementation of the Publisher interface), an adapter has been provided. The package com.oracle.coherence.pushreplication.publishers provides numerous implementations of Publishers and BatchPublishers together with adapters to convert between them (as demonstrated in the example above)

Step 3: Using a publishing- cache.*

Once you've registered one or more BatchPublishers for each cache from which you would like to publish EntryOperations, you may simply use the said cache(s) as regular caches (as they are!).

Hence the following code, registered with the above "stderr-publisher", would output the "put" operations to stderr.

namedCache.put("australian-welcome", "Gudday");
namedCache.put("regular-welcome", "Hello");
namedCache.put("french-welcome", "Bonjour");

Frequently Asked Questions

What are the ordering characteristics of push replication?

Push Replication currently supports replication at the Data level and supports serialization of operations against a particular cache entry. This means that for a given entry in a cache, any updates made to that entry will be propagated to a target Site and applied in the same order. Serialization across different entries in a cache, or between updates to different caches is not enforced.

How can the Push Replication Publishers be monitored?

Like the Command Pattern this pattern is JMX ready. By simply enabling JMX on the Coherence Cluster, each of the PublishingServices will be presented in the JMX tree, detailing current replication state and statistics.

How do I enable Push Replication to one or more (remote) Coherence Clusters?

In order to publish to a remote cluster, you must;

  1. Configure and enable one or more proxies on the remote cluster(s).
  2. Ensure that the members of the remote cluster(s) have the Push Replication Pattern (and dependencies) in the class path
  3. Configure and enable one or more Remote Invocation Services in the "hub" with the addresses of the remote cluster proxy members (of it you're using Coherence 3.4+, use an AddressProvider). An example scheme is defined in the coherence-pushreplicationpattern-cache-config.xml file.
  4. Register appropriate RemoteInvocationPublishers for each cache (as follows).
String remoteCacheName = cacheName;	//we'll publish to the same cache name as here in the remote site
pushReplicationManager.registerBatchPublisher(cacheName, 
                                              "remote-site",
                                              new RemoteInvocationPublisher("RemoteSiteInvocationService",  //the underlying service to use
                                                                            new BatchPublisherAdapter(new LocalCachePublisher(remoteCacheName)),
                                                                            false, //don't automatically start publishing
                                                                            10000, //delay between batches being published
                                                                            100,   //maximum batch size
                                                                            10000, //restart delay if failure occurs
                                                                            5));   //maximum consecutive failures before suspension of publishing

How are conflicts resolved when replicating EntryOperations in remote clusters?

The LocalCachePublisher and RemoteCachePublisher classes support the specification of a CachePublisher.ConflictResolver implementation during construction. By implementing a CachePublisher.ConflictResolver your application may resolve any underlying Entry conflicts in an appropriate manner. If not specified, the LocalCachePublisher.BruteForceConflictResolver is used.

References and Additional Information