PRODUCTS AND SERVICES INDUSTRIES SUPPORT PARTNERS COMMUNITIES ABOUT
  Coherence 3.1 User Guide
  Coherence User Guide (Full)
Added by Rob Misek, last edited by Jason Howes on Feb 17, 2006  (view change)

Labels

 
(None)

Overview: What Coherence can do for you...

Getting Started

Technical Overview

Operational Configuration

Cache Configuration

Troubleshooting

Overview: What Coherence can do for you...

Overview for Implementors


1. Basic Concepts

Audience

This document is targeted at software developers and architects who need an overview of how Coherence can be used. This document outlines product capabilities, usage possibilities, and provides an overview of how one would go about implementing particular features. This document bridges the more abstract Coherence white-papers and the more concrete developer documentation (Tangosol Coherence User Guide and the Coherence JavaDoc).

Clustered Data Management

At the core of Coherence is the concept of clustered data management. This implies the following goals:

  • A fully coherent, single system image (SSI)
  • Scalability for both read and write access
  • Fast, transparent failover and failback
  • Linear scalability for storage and processing
  • No Single-Points-of-Failure (SPOFs)
  • Cluster-wide locking and transactions

Built on top of this foundation are the various services that Coherence provides, including database caching, HTTP session management, grid agent invocation and distributed queries. Before going into detail about these features, some basic aspects of Coherence should be discussed.

A single API for the logical layer, XML configuration for the physical layer

Coherence supports many topologies for clustered data management. Each of these topologies has trade-offs in terms of performance and fault-tolerance. By using a single API, the choice of topology can be deferred until deployment if desired. This allows developers to work with a consistent logical view of Coherence, while providing flexibility during tuning or as application needs change.

Clients are cluster members

A key benefit to Coherence's clustering model is that both server nodes and client nodes are part of the cluster. The benefit of this is that data can be very closely managed by the client without polling or asynchronous events. Locally managed data can be accessed directly without the need to synchronize with other servers. And it also centralizes cluster management, as both server and client membership can be managed. Cluster membership is a very inexpensive resource, so thousands of clients are readily supported. And furthermore, in a typical enterprise application, it is the application tier machines that are client nodes – end users do not directly access Coherence.

Caching Strategies

Coherence provides several cache implementations:

  • Local - Local on-heap caching for non-clustered caching.
  • Replicated - Perfect for small, read-heavy caches.
  • Optimistic - A version of the Replicated cache which gives up locking (but not coherent behavior) for a boost in performance.
  • Partitioned - True linear scalability for both read and write access. Data is automatically, dynamically and transparently partitioned across nodes. The distribution algorithm minimizes network traffic and avoids service pauses by incrementally shifting data.
  • Near Cache - Provides the performance of local caching with the scalability of distributed caching. Several different near-cache strategies provide varying tradeoffs between performance and synchronization guarantees.

In-process caching provides the highest level of raw performance, since objects are managed within the local JVM. This benefit is most directly realized by the Local, Replicated, Optimistic and Near Cache implementations.

Out-of-process (client-server) caching provides the option of using dedicated cache servers. This can be helpful when you wish to partition workloads (to avoid stressing the application servers). This is accomplished by using the Partitioned cache implementation and simply disabling local storage on client nodes via a single command-line option or a one-line entry in the XML configuration.

Tiered caching (using the NearCache functionality) allows you to couple local caches on the application server with larger, partitioned caches on the cache servers, combining the raw performance of local caching with the scalability of partitioned caching. This is useful for both dedicated cache servers as well as co-located caching (cache partitions stored within the application server JVMs).

Coherence supports heterogeneous configurations – for example, using inexpensive Linux blades as dedicated cache servers, and UNIX machines for the application server instances.

Data Storage Options

While most customers use on-heap storage combined with dedicated cache servers, Coherence has several options for data storage:

  • On-heap - The fastest option, though it can affect JVM garbage collection times.
  • NIO RAM - No impact on garbage collection, though it does require serialization/deserialization.
  • NIO Disk - Similar to NIO RAM, but using memory-mapped files.
  • File-based - Uses a special disk-optimized storage system to optimize speed and minimize I/O.

It should be noted that Coherence storage is transient – the disk-based storage options are for managing cached data only. If long-term persistence of data is required, Coherence provides snapshot functionality to persist an image to disk. This is especially useful when the external data sources used to build the cache are extremely expensive. By using the snapshot, the cache can be rebuilt from an image file (rather than reloading from a very slow external datasource).

Serialization Options

Because serialization is often the most expensive part of clustered data management, Coherence provides three options for serializing/deserializing data:

  • java.io.Serializable - The simplest, but slowest option.
  • java.io.Externalizable - This requires developers to implement serialization manually, but can provide significant performance benefits. Compared to java.io.Serializable, this can cut serialized data size by a factor of two or more (especially helpful with Distributed caches, as they generally cache data in serialized form). Most importantly, CPU usage is dramatically reduced.
  • com.tangosol.io.ExternalizableLite - This is very similar to java.io.Externalizable, but offers better performance and less memory usage by using a more efficient I/O stream implementation. The com.tangosol.run.xml.XmlBean class provides a default implementation of this interface (see the section on XmlBean for more details).

Configurability and Extensibility

Coherence's API provides access to all Coherence functionality. The most commonly used subset of this API is exposed via simple XML options to minimize effort for typical use cases. There is no penalty for mixing direct configuration via the API with the easier XML configuration.

Coherence is designed to allow the replacement of its modules as needed. For example, the local "backing maps" (which provide the actual physical data storage on each node) can be easily replaced as needed. The vast majority of the time, this is not required, but it is there for the situations that require it. The general guideline is that 80% of tasks are easy, and the remaining 20% of tasks (the special cases) require a little more effort, but certainly can be done without significant hardship.

Namespace Hierarchy

Coherence is organized as set of services. At the root is the "Cluster" service. A cluster is defined as a set of Coherence instances (one instance per JVM, with one or more JVMs on each physical machine). A cluster is defined by the combination of multicast address and port. A TTL (network packet time-to-live; i.e., the number of network hops) setting can be used to restrict the cluster to a single machine, or the machines attached to a single switch.

Under the cluster service are the various services that comprise the Coherence API. These include the various caching services (Replicated, Distributed, etc.) as well as the Invocation Service (for deploying agents to various nodes of the cluster). Each instance of a service is named, and there is typically a default service instance for each type.

The cache services contain NamedCaches (com.tangosol.net.NamedCache), which are analogous to database tables – that is, they typically contain a set of related objects.

2. Read/Write Caching

NamedCache

The following source code will return a reference to a NamedCache instance. The underlying cache service will be started if necessary.

import com.tangosol.net.*;
...
NamedCache cache = CacheFactory.getCache("MyCache");

Coherence will scan the cache configuration XML file for a name mapping for MyCache. NamedCache name mapping is similar to Servlet name mapping in a web container's web.xml file. Coherence's cache configuration file contains (in the simplest case) a set of mappings (from cache name to cache strategy) and a set of cache strategies.

By default, Coherence will use the coherence-cache-config.xml file found at the root of coherence.jar. This can be overridden on the JVM command-line with -Dtangosol.coherence.cacheconfig=file.xml. This argument can reference either a file system path, or a Java resource path.

The com.tangosol.net.NamedCache interface extends a number of other interfaces:

  • java.util.Map - basic Map methods such as get(), put(), remove().
  • com.tangosol.util.QueryMap - methods for querying the cache.
  • com.tangosol.util.ConcurrentMap - methods for concurrent access such as lock() and unlock().
  • com.tangosol.util.ObservableMap - methods for listening to cache events.
  • com.tangosol.util.InvocableMap - methods for server-side processing of cache data.
Requirements for cached objects

Cache keys and values must be serializable (e.g. java.io.Serializable). Furthermore, cache keys must provide an implementation of the hashCode() and equals() methods, and those methods must return consistent results across cluster nodes. This implies that the implementation of hashCode() and equals() must be based solely on the object's serializable state (i.e. the object's non-transient fields); most built-in Java types, such as String, Integer and Date, meet this requirement. Some cache implementations (specifically the partitioned cache) use the serialized form of the key objects for equality testing, which means that keys for which equals() returns true must serialize identically; most built-in Java types meet this requirement as well.

NamedCache Usage Patterns

There are two general approaches to using a NamedCache:

  • As a clustered implementation of java.util.Map with a number of added features (queries, concurrency), but with no persistent backing (a "side" cache).
  • As a means of decoupling access to external data sources (an "inline" cache). In this case, the application uses the NamedCache interface, and the NamedCache takes care of managing the underlying database (or other resource).

Typically, an inline cache is used to cache data from:

  • a database - The most intuitive use of a cache – simply caching database tables (in the form of Java objects).
  • a service - Mainframe, web service, service bureau – any service that represents an expensive resource to access (either due to computational cost or actual access fees).
  • calculations - Financial calculations, aggregations, data transformations. Using an inline cache makes it very easy to avoid duplicating calculations. If the calculation is already complete, the result is simply pulled from the cache. Since any serializable object can be used as a cache key, it's a simple matter to use an object containing calculation parameters as the cache key.

Write-back options:

  • write-through - Ensures that the external data source always contains up-to-date information. Used when data must be persisted immediately, or when sharing a data source with other applications.
  • write-behind - Provides better performance by caching writes to the external data source. Not only can writes be buffered to even out the load on the data source, but multiple writes can be combined, further reducing I/O. The trade-off is that data is not immediately persisted to disk; however, it is immediately distributed across the cluster, so the data will survive the loss of a server. Furthermore, if the entire data set is cached, this option means that the application can survive a complete failure of the data source temporarily as both cache reads and writes do not require synchronous access the the data source.

To implement a read-only inline cache, you simply implement two methods on the com.tangosol.net.cache.CacheLoader interface, one for singleton reads, the other for bulk reads. Coherence provides an abstract class com.tangosol.net.cache.AbstractCacheLoader which provides a default implementation of the bulk method, which means that you need only implement a single method:public Object load(Object oKey);
This method accepts an arbitrary cache key and returns the appropriate value object.

If you want to implement read-write caching, you need to extend com.tangosol.net.cache.AbstractCacheStore (or implement the interface com.tangosol.net.cache.CacheStore), which adds the following methods:

public void erase(Object oKey);
public void store(Object oKey, Object oValue);

The method erase() should remove the specified key from the external data source. The method store() should update the specified item in the data source if it already exists, or insert it if it does not presently exist.

Once the CacheLoader/CacheStore is implemented, it can be connected easily via the coherence-cache-config.xml file.

3. Querying the Cache

Coherence provides the ability to execute search queries against cached data. With distributed caches, the queries are indexed and parallelized. This means that adding servers to a distributed cache not only increases throughput (total queries per second) but also reduces latency, with queries taking less user time. To query against a NamedCache, all objects should implement a common interface (or base class). Any field of an object can be queried; indexes are optional, and used to increase performance. With a replicated cache, queries are performed locally, and do not use indexes. This approach works well due to the nature of replicated caching (as opposed to distributed caching).

To add an index to a NamedCache, you first need a value extractor (which accepts as input a value object and returns an attribute of that object). Indexes can be added blindly (duplicate indexes are ignored). Indexes can be added at any time, before or after inserting data into the cache.

It should be noted that queries apply only to cached data. For this reason, queries should not be used unless the entire data set has been loaded into the cache, unless additional support is added to manage partially loaded sets.

Developers have the option of implementing additional custom filters for queries, thus taking advantage of query parallelization. For particularly performance-sensitive queries, developers may implement index-aware filters, which can access Coherence's internal indexing structures.

Coherence includes a built-in optimizer, and will apply indexes in the optimal order. Because of the focused nature of the queries, the optimizer is both effective and efficient. No maintenance is required.

Example code to create an index:

NamedCache cache = CacheFactory.getCache("MyCache");
ValueExtractor extractor = new ReflectionExtractor("getAttribute");
cache.addIndex(extractor, true, null);

Example code to query a NamedCache (returns the keys corresponding to all of the value objects with an "Attribute" greater than 5):

NamedCache cache = CacheFactory.getCache("MyCache");
Filter filter = new GreaterFilter("getAttribute", 5);
Set keySet = cache.keySet(filter);

4. Transactions

Coherence supports local transactions against the cache through both a direct API, as well as through J2CA adapters for J2EE containers. Transactions support either pessimistic or optimistic concurrency strategies, as well as the Read Committed, Repeatable Read, Serializable isolation levels.

5. HTTP Session Management

Coherence*Web is an HTTP session-management module (shipped with Coherence) with support for a wide range of application servers. Using Coherence session management does not require any changes to the application.

Coherence*Web uses the NearCache technology to provide fully fault-tolerant caching, with almost unlimited scalability (to several hundred cluster nodes without issue).

Heterogeneous applications running on mixed hardware/OS/application servers can share common user session data. This dramatically simplifies supporting Single-Sign-On across applications.

6. Invocation Service

The Coherence Invocation service can be used to deploy computational agents to various nodes within the cluster. These agents can be either execute-style (deploy and asynchronously listen) or query-style (deploy and wait for results).

The Invocation service is accessed through the interface com.tangosol.net.InvocationService through the two following methods:

public void execute(Invocable task, Set setMembers, InvocationObserver observer);
public Map query(Invocable task, Set setMembers);

An instance of the service can be retrieved from the com.tangosol.net.CacheFactory class.

7. Listeners

All NamedCache instances in Coherence implement the com.tangosol.util.ObservableMap interface, which allows the option of attaching a cache listener implementation (of com.tangosol.util.MapListener). It should be noted that applications can observe events as logical concepts regardless of which physical machine caused the event. Customizable server-side filters and lightweight events can be used to minimize network traffic and processing. Cache listeners follow the JavaBean paradigm, and can distinguish between system cache events (e.g., eviction) and application cache events (e.g., get/put operations).

Similarly, any service can be watched for members joining and leaving, including the cluster service as well as the cache and invocation services.

8. JDO/JDBC Integration

The following products support Coherence as a clustered caching plug-in:

  • BEA SolarMetric Kodo (JDO)
  • Hemtech JDO Genie (JDO)
  • Riflexo JCredo (JDO)
  • Hibernate (O/R Mapping)
  • Isocra Livestore (transparent JDBC caching)

9. C++/.Net Integration

Coherence can be accessed by C++ applications (via CodeMesh JunC++ion). and .Net applications (via JNBridge). The integration takes the form of local proxy objects that control remote Coherence objects within the JVM.

10. WAN Support

Coherence's TCMP clustering protocol is specifically designed to handle the unreliable, high-latency, low-bandwidth conditions typically found in WAN links. Distributed locking provides better performance by avoiding single-server bottlenecks. Tiered caching minimizes network traffic. Transactions and deterministic split-brain behavior ensure proper application function. Coherence supports wire compression for WAN environments.

Coherence 3.0 has expanded support for WAN environments, including WKA (well-known-address) support and improvements to the TCMP protocol specifically for geographically-distributed applications.

11. XmlBean

Coherence includes a helper class for managing XML-to-Object mapping. XmlBean (com.tangosol.run.xml.XmlBean) supports most common Java types including primitives (both intrinsic and object forms) as well as collections. An added benefit is that XmlBean provides an implementation of com.tangosol.io.ExternalizableLite for very fast serialization/deserialization (as well as reducing both memory and network usage). XmlBean supports does not support cyclical object graphs (which generally do not appear in value objects anyway).

12. Manageability

Coherence offers almost transparent manageability. Coherence automatically maintains cluster membership and handles damaged nodes transparently. New machines can be added dynamically to increase cluster capacity.

Coherence is specifically designed to require minimal maintenance, as this is an implied part of the Reliability and Availability goals. Evidence of this can be found in the number of OEMs who embed Coherence within their products (including some vendors of high-volume packaged software).

Coherence offers extensive JMX instrumentation. For more details, please see com.tangosol.net.management.Registry.

Cluster your objects and your data


Overview

Coherence is an essential ingredient for building reliable, high-scale clustered applications. The term clustering refers to the use of more than one server to run an application, usually for reliability and scalability purposes. Coherence provides all of the necessary capabilities for applications to achieve the maximum possible availability, reliability, scalability and performance. Virtually any clustered application will benefit from using Coherence.

One of primary uses of Coherence is to cluster an application's objects and data. In the simplest sense, this means that all of the objects and data that an application delegates to Coherence is automatically available to and accessible by all servers in the application cluster, and none of those objects and none of that data will be lost in the event of server failure.

By clustering the application's objects and data, Coherence solves many of the difficult problems related to achieving availability, reliability, scalability, performance, serviceability and manageability of clustered applications.

Availability

Availability refers to the percentage of time that an application is operating. High Availability refers to achieving availability close to 100%. Coherence is used to achieve High Availability in several different ways:

Supporting redundancy in Java applications

Coherence makes it possible for an application to run on more than one server, which means that the servers are redundant. Using a load balancer, for example, an application running on redundant servers will be available as long as one server is still operating. Coherence enables redundancy by allowing an application to share, coordinate access to, update and receive modification events for critical runtime information across all of the redundant servers. Most applications cannot operate in a redundant server environment unless they are architected to run in such an environment; Coherence is a key enabler of such an architecture.

Enabling dynamic cluster membership

Coherence tracks exactly what servers are available at any given moment. When the application is started on an additional server, Coherence is instantly aware of that server coming online, and automatically joins it into the cluster. This allows redundancy (and thus availability) to be dynamically increased by adding servers.

Exposing knowledge of server failure

Coherence reliably detects most types of server failure in less than a second, and immediately fails over all of the responsibilities of the failed server without losing any data. As a result, server failure does not impact availability.

Part of an availability management is Mean Time To Recovery (MTTR), which is a measurement of how much time it takes for an unavailable application to become available. Since server failure is detected and handled in less than a second, and since redundancy means that the application is available even when that server goes down, the MTTR due to server failure is zero from the point of view of application availability, and typically sub-second from the point of view of a load-balancer re-routing an incoming request.

Eliminating other Single Points Of Failure (SPOFs)

Coherence provides insulation against failures in other infrastructure tiers. For example, Coherence write-behind caching and Coherence distributed parallel queries can insulate an application from a database failure; in fact, using these capabilities, two different Coherence customers have had database failure during operational hours, yet their production Coherence-based applications maintained their availability and their operational status.

Providing support for Disaster Recovery (DR) and Continuancy Planning

Coherence can even insulate against failure of an entire data center, by clustering across multiple data centers and failing over the responsibilities of an entire data center. Again, this capability has been proven in production, with a Coherence customer running a mission-critical real-time financial system surviving a complete data center outage.

Reliability

Reliability refers to the percentage of time that an application is able to process correctly. In other words, an application may be available, yet unreliable if it cannot correctly handle the application processing. An example that we use to illustrate high availability but low reliability is a mobile phone network: While most mobile phone networks have very high uptimes (referring to availability), dropped calls tend to be relatively common (referring to reliability).

Coherence is explicitly architected to achieve very high levels of reliability. For example, server failure does not impact "in flight" operations, since each operation is atomically protected from server failure, and will internally re-route to a secondary node based on a dynamic pre-planned recovery strategy. In other words, every operation has a backup plan ready to go!

Coherence is architected based on the assumption that failures are always about to occur. As a result, the algorithms employed by Coherence are carefully designed to assume that each step within a operation could fail due to a network, server, operating system, JVM or other resource outage. An example of how Coherence plans for these failures is the synchronous manner in which it maintains redundant copies of data; in other words, Coherence does not gamble with the application's data, and that ensures that the application will continue to work correctly, even during periods of server failure.

Scalability

Scalability refers to the ability of an application to predictably handle more load. An application exhibits linear scalability if the maximum amount of load that an application can sustain is directly proportional to the hardware resources that the application is running on. For example, if an application running on 2 servers can handle 2000 requests per second, then linear scalability would imply that 10 servers would handle 10000 requests per second.

Linear scalability is the goal of a scalable architecture, but it is difficult to achieve. The measurement of how well an application scales is called the scaling factor (SF). A scaling factor of 1.0 represents linear scalability, while a scaling factor of 0.0 represents no scalability. Coherence provides a number of capabilities designed to help applications achieve linear scalability.

When planning for extreme scale, the first thing to understand is that application scalability is limited any necessary shared resource that does not exhibit linear scalability. The limiting element is referred to as a bottleneck, and in most applications, the bottleneck is the data source, such as a database or an EIS.

Coherence helps to solve the scalability problem by targeting obvious bottlenecks, and by completely eliminating bottlenecks whenever possible. It accomplishes this through a variety of capabilities, including:

Distributed Caching

Coherence uses a combination of replication, distribution, partitioning and invalidation to reliably maintain data in a cluster in such a way that regardless of which server is processing, the data that it obtains from Coherence is the same. In other words, Coherence provides a distributed shared memory implementation, also referred to as Single System Image (SSI) and Coherent Clustered Caching.

Any time that an application can obtain the data it needs from the application tier, it is eliminating the data source as the Single Point Of Bottleneck (SPOB).

Partitioning

Partitioning refers to the ability for Coherence to load-balance data storage, access and management across all of the servers in the cluster. For example, when using Coherence data partitioning, if there are four servers in a cluster then each will manage 25% of the data, and if another server is added, each server will dynamically adjust so that each of the five servers will manage 20% of the data, and this data load balancing will occur without any application interruption and without any lost data or operations. Similarly, if one of those five servers were to die, each of the remaining four servers would be managing 25% of the data, and this data load balancing will occur without any application interruption and without any lost data or operations – including the 20% of the data that was being managed on the failed server.

Coherence accomplishes failover without data loss by synchronously maintaining a configurable number of copies of the data within the cluster. Just as the data management responsibility is spread out over the cluster, so is the responsibility for backing up data, so in the previous example, each of the remaining four servers would have roughly 25% of the failed server's data backed up on it. This mesh architecture guarantees that on server failure, no particular remaining server is inundated with a massive amount of additional responsibility.

Coherence prevents loss of data even when multiple instances of the application are running on a single physical server within the cluster. It does so by ensuring that backup copies of data are being managed on different physical servers, so that if a physical server fails or is disconnected, all of the the data being managed by the failed server has backups ready to go on a different server.

Lastly, partitioning supports linear scalability of both data capacity and throughput. It accomplishes the scalability of data capacity by evenly balancing the data across all servers, so four servers can naturally manage two times as much data as two servers. Scalability of throughput is also a direct result of load-balancing the data across all servers, since as servers are added, each server is able to utilize its full processing power to manage a smaller and smaller percentage of the overall data set. For example, in a ten-server cluster each server has to manage 10% of the data operations, and – since Coherence uses a peer-to-peer architecture – 10% of those operations are coming from each server. With ten times that many servers (i.e. 100 servers), each server is managing only 1% of the data operations, and only 1% of those operations are coming from each server – but there are ten times as many servers, so the cluster is accomplishing ten times the total number of operations! In the 10-server example, if each of the ten servers was issuing 100 operations per second, they would each be sending 10 of those operations to each of the other servers, and the result would be that each server was receiving 100 operations (10x10) that it was responsible for processing. In the 100-server example, each would still be issuing 100 operations per second, but each would be sending only one operation to each of the other servers, so the result would be that each server was receiving 100 operations (100x1) that it was responsible for processing. This linear scalability is made possible by modern switched network architectures that provide backplanes that scale linearly to the number of ports on the switch, providing each port with dedicated fully-duplexed (upstream and downstream) bandwidth. Since each server is only sending and receiving 100 operations (in both the 10-server and 100-server examples), the network bandwidth utilization is roughly constant per port regardless of the number of servers in the cluster.

Session Management

One common use case for Coherence clustering is to manage user sessions (conversational state) in the cluster. This capability is provided by the Coherence*Web module, which is a built-in feature of Coherence. Coherence*Web provides linear scalability for HTTP Session Management in clusters of hundreds of production servers. It can achieve this linear scalability because at its core it is built on Coherence dynamic partitioning.

Session management highlights the scalability problem that typifies shared data sources: If an application could not share data across the servers, it would have to delegate that data management entirely to the shared store, which is typically the application's database. If the HTTP session were stored in the database, each HTTP request (in the absence of sticky load-balancing) would require a read from the database, causing the desired reads-per-second from the database to increase linearly with the size of the server cluster. Further, each HTTP request causes an update of its corresponding HTTP session, so regardless of sticky load balancing, to ensure that HTTP session data is not lost when a server fails the desired writes-per-second to the database will also increase linearly with the size of the server cluster. In both cases, the actual reads and writes per second that a database is capable of does not scale in relation to the number of servers requesting those reads and writes, and the database quickly becomes a bottleneck, forcing availability, reliability (e.g. asynchronous writes) and performance compromises. Additionally, related to performance, each read from a database has an associated latency, and that latency increases dramatically as the database experiences increasing load.

Coherence*Web, on the other hand, has the same latency in a 2-server cluster as it has in a 200-server cluster, since all HTTP session read operations that cannot be handled locally (e.g. locality as the result of to sticky load balancing) are spread out evenly across the rest of the cluster, and all update operations (which must be handled remotely to ensure survival of the HTTP sessions) are likewise spread out evently across the rest of the cluster. The result is linear scalability with constant latency, regardless of the size of the cluster.

Performance

Performance is the inverse of latency, and latency is the measurement of how long something takes to complete. If increasing performance is the goal, then getting rid of anything that has any latency is the solution. Obviously, it is impossible to get rid of all latencies, since the High Availability and reliability aspects of an application are counting on the underlying infrastructure, such as Coherence, to maintain reliable up-to-date back-ups of important information, which means that some operations (such as data modifications and pessimistic transactions) have unavoidable latencies. On the other hand, every remaining operation that could possibly have any latency needs to be targeted for elimination, and Coherence provides a large number of capabilities designed to do just that:

Replication

Just like partitioning dynamically load-balances data evenly across the entire server cluster, replication ensures that a desired set of data is up-to-date on every single server in the cluster at all times. Replication allows operations running on any server to obtain the data that they need locally, at basically no cost, because that data has already been replicated to that server. In other words, replication is a tool to guarantee locality of reference, and the end result is zero-latency access to replicated data.

Near Caching

Since replication works best for data that should be on all servers, it follows that replication is inefficient for data that an application would want to avoid copying to all servers. For example, data that changes all of the time and very large data sets are both poorly suited to replication, but both are excellently suited to partitioning, since it exhibits linear scale of data capacity and throughput.

The only downside of partitioning is that it introduces latency for data access, and in most applications the data access rate far out-weighs the data modification rate. To eliminate the latency associated with partitioned data access, near caching maintains frequently- and recently-used data from the partitioned cache on the specific servers that are accessing that data, and it keeps that data coherent by means of event-based invalidation. In other words, near caching keeps the most-likely-to-be-needed data near to where it will be used, thus providing good locality of access, yet backed up by the linear scalability of partitioning.

Write-Behind, Write-Coalescing and Write-Batching

Since the transactional throughput in the cluster is linearly scalable, the cost associated with data changes can be a fixed latency, typically in the range of a few milliseconds, and the total number of transactions per second is limited only by the size of the cluster. In one application, Coherence was able to achieve transaction rates close to a half-million transactions per second – and that on a cluster of commodity two-CPU servers.

Often, the data being managed by Coherence is actually a temporary copy of data that exists in an official System Of Record (SOR), such as a database. To avoid having the database become a transaction bottleneck, and to eliminate the latency of database updates, Coherence provides a Write-Behind capability, which allows the application to change data in the cluster, and those changes are asynchronously replayed to the application's database (or EIS). By managing the changes in a clustered cache (which has all of the High Availability, reliability and scalability attributes described previously,) the pending changes are immune to server failure and the total rate of changes scales linearly with the size of the cluster.

The Write-Behind functionality is implemented by queueing each data change; the queue contains a list of what changes needs to be written to the System Of Record. The duration of an item within the queue is configurable, and is referred to as the Write-Behind Delay. When data changes, it is added to the write-behind queue (if it is not already in the queue), and the queue entry is set to ripen after the configured Write-Behind Delay has passed. When the queue entry has ripened, the latest copy of the corresponding data is written to the System Of Record.

To avoid overwhelming the System Of Record, Coherence will replay only the latest copies of data to the database, thus coalescing many updates that occur to the same piece data into a single database operation. The longer the Write-Behind Delay, the more coalescing may occur. Additionally, if many different pieces of data have changed, all of those updates can be batched (e.g. using JDBC statement batching) into a single database operation. In this way, a massive breadth of changes (number of pieces of data changed) and depth of changes (number of times each was changed) can be bundled into a single database operation, which results in dramatically reduced load on the database. The batching is also fully configurable; one option, called the Write Batch Factor, even allows some of the queue entries that have not yet ripened to be included in the batched update.

Serviceability

Serviceability refers to the ease and extent of changes that can be affected without affecting availability. Coherence helps to increase an application's serviceability by allowing servers to be taken off-line without impacting the application availability. Those servers can be serviced and brought back online without any end-user or processing interruptions. Many configuration changes related to Coherence can also be made on a node-by-node basis in the same manner. With careful planning, even major application changes can be rolled into production – again, one node at a time – without interrupting the application.

Manageability

Manageability refers to the level of information that a running system provides, and the capability to tweak settings related to that information. For example, Coherence provides a cluster-wide view of management information via the standard JMX API, so that the entire cluster can be managed from a single server. The information provided includes hit and miss rates, cace sizes, read-, write- and write-behind statistics, and detailed information all the way down to the network packet level.

Additionally, Coherence allows applications to place their own management information – and expose their own tweakable settings – through the same clustered JMX implementation. The result is an application infrastructure that makes managing and monitoring a clustered application as simple as managing and monitoring a single server, and all through Java's standard management API.

Summary

There are a lot of challenges in building a highly available application that exhibits scalable performance and is both serviceable and manageable. While there are many ways to build distributed applications, only Coherence reliably clusters objects and data. Once objects and data are clustered by Coherence, all the servers in the cluster can access and modify those objects and that data, and the objects and data managed by Coherence will not be effected if and when servers fail. By providing a variety of advanced capabilities, each of which is configurable, and application can achieve the optimal balance of redundancy, scalability and performance, and do so within a manageable and serviceable environment.

Deliver events for changes as they occur


Overview

Coherence provides cache events using the JavaBean Event model. It is extremely simple to receive the events that you need, where you need them, regardless of where the changes are actually occurring in the cluster. Developers with any experience with the JavaBean model will have no difficulties working with events, even in a complex cluster.

Listener interface and Event object

In the JavaBeans Event model, there is an EventListener interface that all listeners must extend. Coherence provides a MapListener interface, which allows application logic to receive events when data in a Coherence cache is added, modified or removed:

excerpt from com.tangosol.util.MapListener
public interface MapListener
        extends EventListener
    {
    /**
    * Invoked when a map entry has been inserted.
    *
    * @param evt  the MapEvent carrying the insert information
    */
    public void entryInserted(MapEvent evt);

    /**
    * Invoked when a map entry has been updated.
    *
    * @param evt  the MapEvent carrying the update information
    */
    public void entryUpdated(MapEvent evt);

    /**
    * Invoked when a map entry has been removed.
    *
    * @param evt  the MapEvent carrying the delete information
    */
    public void entryDeleted(MapEvent evt);
    }

An application object that implements the MapListener interface can sign up for events from any Coherence cache or class that implements the ObservableMap interface, simply by passing an instance of the application's MapListener implementation to one of the addMapListener() methods.

The MapEvent object that is passed to the MapListener carries all of the necessary information about the event that has occurred, including the source (ObservableMap) that raised the event, the identity (key) that the event is related to, what the action was against that identity (insert, update or delete), what the old value was and what the new value is:

excerpt from com.tangosol.util.MapEvent
public class MapEvent
        extends EventObject
    {
    /**
    * Return an ObservableMap object on which this event has actually
    * occured.
    *
    * @return an ObservableMap object
    */
    public ObservableMap getMap()

    /**
    * Return this event's id. The event id is one of the ENTRY_*
    * enumerated constants.
    *
    * @return an id
    */
    public int getId()

    /**
    * Return a key assosiated with this event.
    *
    * @return a key
    */
    public Object getKey()

    /**
    * Return an old value assosiated with this event.
    * <p>
    * The old value represents a value deleted from or updated in a map.
    * It is always null for "insert" notifications.
    *
    * @return an old value
    */
    public Object getOldValue()

    /**
    * Return a new value assosiated with this event.
    * <p>
    * The new value represents a new value inserted into or updated in
    * a map. It is always null for "delete" notifications.
    *
    * @return a new value
    */
    public Object getNewValue()


    // ----- Object methods -------------------------------------------------

    /**
    * Return a String representation of this MapEvent object.
    *
    * @return a String representation of this MapEvent object
    */
    public String toString()


    // ----- constants ------------------------------------------------------

    /**
    * This event indicates that an entry has been added to the map.
    */
    public static final int ENTRY_INSERTED = 1;

    /**
    * This event indicates that an entry has been updated in the map.
    */
    public static final int ENTRY_UPDATED  = 2;

    /**
    * This event indicates that an entry has been removed from the map.
    */
    public static final int ENTRY_DELETED  = 3;
    }

Caches and classes that support events

All Coherence caches implement ObservableMap; in fact, the NamedCache interface that is implemented by all Coherence caches extends the ObservableMap interface. That means that an application can sign up to receive events from any cache, regardless of whether that cache is local, partitioned, near, replicated, using read-through, write-through, write-behind, overflow, disk storage, etc.

Regardless of the cache topology and the number of servers, and even if the modifications are being made by other servers, the events will be delivered to the application's listeners.

In addition to the Coherence caches (those objects obtained through a Coherence cache factory), several other supporting classes in Coherence also implement the ObservableMap interface:

  • ObservableHashMap
  • LocalCache
  • OverflowMap
  • NearCache
  • ReadWriteBackingMap
  • AbstractSerializationCache, SerializationCache and SerializationPagedCache
  • WrapperObservableMap, WrapperConcurrentMap and WrapperNamedCache

For a full list of published implementing classes, see the Coherence JavaDoc for ObservableMap.

Signing up for all events

To sign up for events, simply pass an object that implements the MapListener interface to one of the addMapListener methods on ObservableMap:

ObservableMap methods
public void addMapListener(MapListener listener);
public void addMapListener(MapListener listener, Object oKey, boolean fLite);
public void addMapListener(MapListener listener, Filter filter, boolean fLite);

Let's create an example MapListener implementation:

Example MapListener implementation
/**
* A MapListener implementation that prints each event as it receives
* them.
*/
public static class EventPrinter
        extends Base
        implements MapListener
    {
    public void entryInserted(MapEvent evt)
        {
        out(evt);
        }

    public void entryUpdated(MapEvent evt)
        {
        out(evt);
        }

    public void entryDeleted(MapEvent evt)
        {
        out(evt);
        }
    }

Using this implementation, it is extremely simple to print out all events from any given cache (since all caches implement the ObservableMap interface):

cache.addMapListener(new EventPrinter());

Of course, to be able to later remove the listener, it is necessary to hold on to a reference to the listener:

Listener listener = new EventPrinter();
cache.addMapListener(listener);
m_listener = listener; // store the listener in a field

Later, to remove the listener:

Listener listener = m_listener;
if (listener != null)
    {
    cache.removeMapListener(listener);
    m_listener = null; // clean up the listener field
    }

Each addMapListener method on the ObservableMap interface has a corresponding removeMapListener method. To remove a listener, use the removeMapListener method that corresponds to the addMapListener method that was used to add the listener.

Using an inner class as a MapListener

When creating an an inner class to use as a MapListener, or when implementing a MapListener that only listens to one or two types of events (inserts, updates or deletes), you can use the AbstractMapListener base class. For example, the following anonymous inner class prints out only the insert events for the cache:

cache.addMapListener(new AbstractMapListener()
    {
    public void entryInserted(MapEvent evt)
        {
        out(evt);
        }
    });

Another helpful base class for creating a MapListener is the MultiplexingMapListener, which routes all events to a single method for handling. For example, the EventPrinter example could be simplified to:

public static class EventPrinter
        extends MultiplexingMapListener
    {
    public void onMapEvent(MapEvent evt)
        {
        out(evt);
        }
    }

Since only one method needs to be implemented to capture all events, the MultiplexingMapListener can also be very useful when creating an an inner class to use as a MapListener.

Configuring a MapListener for a Cache

If the listener should always be on a particular cache, then place it into the cache configuration using the listener element and Coherence will automatically add the listener when it configures the cache.

Signing up for Events on specific identities

Signing up for events that occur against specific identities (keys) is just as simple. For example, to print all events that occur against the Integer key "5":

cache.addMapListener(new EventPrinter(), new Integer(5), false);

So the following code would only trigger an event when the Integer key "5" is inserted or updated:

for (int i = 0; i < 10; ++i)
    {
    Integer key   = new Integer(i);
    String  value = "test value for key " + i;
    cache.put(key, value);
    }

Filtering Events

Similar to listening to a particular key, it is possible to listen to particular events. Consider the following example:

public class DeletedFilter
        implements Filter, Serializable
    {
    public boolean evaluate(Object o)
        {
        MapEvent evt = (MapEvent) o;
        return evt.getId() == MapEvent.ENTRY_DELETED;
        }
    }

cache.addMapListener(new EventPrinter(), new DeletedFilter(), false);
Filtering events versus filtering cached data

When building a Filter for querying, the object that will be passed to the evaluate method of the Filter will be a value from the cache, or – if the Filter implements the EntryFilter interface – the entire Map.Entry from the cache. When building a Filter for filtering events for a MapListener, the object that will be passed to the evaluate method of the Filter will always be of type MapEvent.

For more information on how to use a query filter to listen to cache events, see the section below titled Advanced: Listening to Queries.

The listener is added to the cache with a filter that allows the listener to only receive delete events. For example, if the following sequence of calls were made:

cache.put("hello", "world");
cache.put("hello", "again");
cache.remove("hello");

The result would be:

CacheEvent{LocalCache deleted: key=hello, value=again}

For more information, see the Advanced: Listening to Queries section below.

"Lite" Events

By default, Coherence provides both the old and the new value as part of an event. Consider the following example:

MapListener listener = new MultiplexingMapListener()
    {
    public void onMapEvent(MapEvent evt)
        {
        out("event has occurred: " + evt);
        out("(the wire-size of the event would have been "
            + ExternalizableHelper.toBinary(evt).length()
            + " bytes.)");
        }
    };
cache.addMapListener(listener);

// insert a 1KB value
cache.put("test", new byte[1024]);

// update with a 2KB value
cache.put("test", new byte[2048]);

// remove the 2KB value
cache.remove("test");

The output from running the test shows that the first event carries the 1KB inserted value, the second event carries both the replaced 1KB value and the new 2KB value, and the third event carries the removed 2KB value:

event has occurred: CacheEvent{LocalCache added: key=test, value=[B@a470b8}
(the wire-size of the event would have been 1283 bytes.)
event has occurred: CacheEvent{LocalCache updated: key=test, old value=[B@a470b8, new value=[B@1c6f579}
(the wire-size of the event would have been 3340 bytes.)
event has occurred: CacheEvent{LocalCache deleted: key=test, value=[B@1c6f579}
(the wire-size of the event would have been 2307 bytes.)

When an application does not require the old and the new value to be included in the event, it can indicate that by requesting only "lite" events. When adding a listener, you can request lite events by using one of the two addMapListener methods that takes an additional boolean fLite parameter. In the above example, the only change would be:

cache.addMapListener(listener, (Filter) null, true);

Obviously, a lite event's old value and new value may be null. However, even if you request lite events, the old and the new value may be included if there is no additional cost to generate and deliver the event. In other words, requesting that a MapListener receive lite events is simply a hint to the system that the MapListener does not need to know the old and new values for the event.

Advanced: Listening to Queries

All Coherence caches support querying by any criteria. When an application queries for data from a cache, the result is a point-in-time snapshot, either as a set of identities ("keySet") or a set of identity/value pairs ("entrySet"). The mechanism for determining the contents of the resulting set is referred to as filtering, and it allows an application developer to construct queries of arbitrary complexity using a rich set of out-of-the-box filters (e.g. equals, less-than, like, between, etc.), or to provide their own custom filters (e.g. XPath).

The same filters that are used to query a cache can be used to listen to events from a cache. For example, in a trading system it is possible to query for all open "Order" objects for a particular trader:

NamedCache mapTrades = ...
Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid),
                              new EqualsFilter("getStatus", Status.OPEN));
Set setOpenTrades = mapTrades.entrySet(filter);

To receive notifications of new trades being opened for that trader, closed by that trader or reassigned to or from another trader, the application can use the same filter:

// receive events for all trade IDs that this trader is interested in
trades.addMapListener(listener, new MapEventFilter(filter), true);

The MapEventFilter converts a query filter into an event filter.

Filtering events versus filtering cached data

When building a Filter for querying, the object that will be passed to the evaluate method of the Filter will be a value from the cache, or – if the Filter implements the EntryFilter interface – the entire Map.Entry from the cache. When building a Filter for filtering events for a MapListener, the object that will be passed to the evaluate method of the Filter will always be of type MapEvent.

The MapEventFilter converts a Filter that is used to do a query into a Filter that is used to filter events for a MapListener. In other words, the MapEventFilter is constructed from a Filter that queries a cache, and the resulting MapEventFilter is a filter that evaluates MapEvent objects by converting them into the objects that a query Filter would expect.

The MapEventFilter has a number of very powerful options, allowing an application listener to receive only the events that it is specifically interested in. More importantly for scalability and performance, only the desired events have to be communicated over the network, and they are communicated only to the servers and clients that have expressed interest in those specific events! For example:

// receive all events for all trades that this trader is interested in
trades.addMapListener(listener, new MapEventFilter(filter,
    MapEventFilter.E_ALL), true);

// receive events for all this trader's trades that are closed or
// re-assigned to a different trader
trades.addMapListener(listener, new MapEventFilter(filter,
    MapEventFilter.E_UPDATED_LEFT | MapEventFilter.E_DELETED), true);

// receive events for all trades as they are assigned to this trader
trades.addMapListener(listener, new MapEventFilter(filter,
    MapEventFilter.E_INSERTED | MapEventFilter.E_UPDATED_ENTERED), true);

// receive events only for new trades assigned to this trader
trades.addMapListener(listener, new MapEventFilter(filter,
    MapEventFilter.E_INSERTED), true);

For more information on the various options supported, see the API documentation for MapEventFilter.

Advanced: Synthetic Events

Events usually reflect the changes being made to a cache. For example, one server is modifying one entry in a cache while another server is adding several items to a cache while a third server is removing an item from the same cache, all while fifty threads on each and every server in the cluster is accessing data from the same cache! All the modifying actions will produce events that any server within the cluster can choose to receive. We refer to these actions as client actions, and the events as being dispatched to clients, even though the "clients" in this case are actually servers. This is a natural concept in a true peer-to-peer architecture, such as a Coherence cluster: Each and every peer is both a client and a server, both consuming services from its peers and providing services to its peers. In a typical Java Enterprise application, a "peer" is an application server instance that is acting as a container for the application, and the "client" is that part of the application that is directly accessing and modifying the caches and listening to events from the caches.

Some events originate from within a cache itself. There are many examples, but the most common cases are:

  • When entries automatically expire from a cache;
  • When entries are evicted from a cache because the maximum size of the cache has been reached;
  • When entries are transparently added to a cache as the result of a Read-Through operation;
  • When entries in a cache are transparently updated as the result of a Read-Ahead or Refresh-Ahead operation.

Each of these represents a modification, but the modifications represent natural (and typically automatic) operations from within a cache. These events are referred to as synthetic events.

When necessary, an application can differentiate between client-induced and synthetic events simply by asking the event if it is synthetic. This information is carried on a sub-class of the MapEvent, called CacheEvent. Using the previous EventPrinter example, it is possible to print only the synthetic events:

public static class EventPrinter
        extends MultiplexingMapListener
    {
    public void onMapEvent(MapEvent evt)
        {
        if (evt instanceof CacheEvent && ((CacheEvent) evt).isSynthetic())
            {
            out(evt);
            )
        }
    }

For more information on this feature, see the API documentation for CacheEvent.

Advanced: Backing Map Events

While it is possible to listen to events from Coherence caches, each of which presents a local view of distributed, partitioned, replicated, near-cached, continuously-queried, read-through/write-through and/or write-behind data, it is also possible to peek behind the curtains, so to speak. Normally, the advice from the Wizard of Oz is sufficient:

"Pay no attention to the man behind the curtain!"

For some advanced use cases, it may be necessary to pay attention the man behind the curtain – or more correctly, to "listen to" the "map" behind the "service". Replication, partitioning and other approaches to managing data in a distributed environment are all distribution services. The service still has to have something in which to actually manage the data, and that something is called a "backing map".

Backing maps are configurable. If all the data for a particular cache should be kept in object form on the heap, then use an unlimited and non-expiring LocalCache (or a SafeHashMap if statistics are not required). If only a small number of items should be kept in memory, use a LocalCache. If data are to be read on demand from a database, then use a ReadWriteBackingMap (which knows how to read and write through an application's DAO implementation), and in turn give the ReadWriteBackingMap a backing map such as a SafeHashMap or a LocalCache to store its data in.

Some backing maps are observable. The events coming from these backing maps are not usually of direct interest to the appication. Instead, Coherence translates them into actions that must be taken (by Coherence) to keep data in sync and properly backed up, and it also translates them when appropriate into clustered events that are delivered throughout the cluster as requested by application listeners. For example, if a partitioned cache has a LocalCache as its backing map, and the local cache expires an entry, that event causes Coherence to expire all of the backup copies of that entry. Furthermore, if any listeners have been registered on the partitioned cache, and if the event matches their event filter(s), then that event will be delivered to those listeners on the servers where those listeners were registered.

In some advanced use cases, an application needs to process events on the server where the data are being maintained, and it needs to do so on the structure (backing map) that is actually managing the data. In these cases, if the backing map is an observable map, a listener can be configured on the backing map or one can be programmatically added to the backing map. (If the backing map is not observable, it can be made observable by wrapping it in an WrapperObservableMap.)

For more information on this feature, see the API documentation for BackingMapManager.

Advanced: Synchronous Event Listeners

Some events are delivered asynchronously, so that application listeners do not disrupt the cache services that are generating the events. In some rare scenarios, asynchronous delivery can cause ambiguity of the ordering of events compared to the results of ongoing operations. To guarantee that the cache API operations and the events are ordered as if the local view of the clustered system were single-threaded, a MapListener must implement the SynchronousListener marker interface.

One example in Coherence itself that uses synchronous listeners is the Near Cache, which can use events to invalidate locally cached data ("Seppuku").

For more information on this feature, see the API documentation for SynchronousListener.

Summary

Coherence provides an extremely rich event model for caches, providing the means for an application to request the specific events it requires, and the means to have those events delivered only to those parts of the application that require them.

Automatically manage dynamic cluster membership


Overview

Coherence manages cluster membership, automatically adding new servers to the cluster when they start up and automatically detecting their departure when they are shut down or fail. Applications have full access to this information, and can sign up to receive event notifications when members join and leave the cluster. Coherence also tracks all the services that each member is providing and consuming, and uses this information to plan for service resiliency in case of server failure, and to load-balance data management and other responsibilities across all members of the cluster.

Cluster and Service objects

From any cache, the application can obtain a reference to the local representation of a cache's service. From any service, the application can obtain a reference to the local representation of the cluster.

CacheService service = cache.getCacheService();
Cluster      cluster = service.getCluster();

From the Cluster object, the application can determine the set of services that are running in the cluster:

for (Enumeration enum = cluster.getServiceNames(); enum.hasMoreElements(); )
    {
    String sName = (String) enum.nextElement();
    ServiceInfo info = cluster.getServiceInfo(sName);
    // ...
    }

The ServiceInfo object provides information about the service, including its name, type, version and membership.

For more information on this feature, see the API documentation for NamedCache, CacheService, Service, ServiceInfo and Cluster.

Member object

The primary information that an application can determine about each member in the cluster is:

  • The Member's IP address
  • What date/time the Member joined the cluster

As an example, if there are four servers in the cluster with each server running one copy ("instance") of the application and all four instances of the application are clustered together, then the cluster is composed of four Members. From the Cluster object, the application can determine what the local Member is:

Member memberThis = cluster.getLocalMember();

From the Cluster object, the application can also determine the entire set of cluster members:

Set setMembers = cluster.getMemberSet();

From the ServiceInfo object, the application can determine the set of cluster members that are participating in that service:

ServiceInfo info = cluster.getServiceInfo(sName);
Set setMembers = info.getMemberSet();

For more information on this feature, see the [API documentation for Member.

Listener interface and Event object

To listen to cluster and/or service membership changes, the application places a listener on the desired Service. As discussed before, the Service can come from a cache:

Service service = cache.getCacheService();

The Service can also be looked up by its name:

Service service = cluster.getService(sName);

To receive membership events, the application implements a MemberListener. For example, the following listener example prints out all the membership events that it receives:

public class MemberEventPrinter
        extends Base
        implements MemberListener
    {
    public void memberJoined(MemberEvent evt)
        {
        out(evt);
        }

    public void memberLeaving(MemberEvent evt)
        {
        out(evt);
        }

    public void memberLeft(MemberEvent evt)
        {
        out(evt);
        }
    }

The MemberEvent object carries information about the event type (joined / leaving / left), the member that generated the event, and the service that acts as the source of the event. Additionally, the event provides a method, isLocal(), that indicates to the application that it is this member that is joining or leaving the cluster. This is useful for recognizing soft restarts in which an application automatically rejoins a cluster after a failure occurs. For example:

public class RejoinEventPrinter
        extends Base
        implements MemberListener
    {
    public void memberJoined(MemberEvent evt)
        {
        if (evt.isLocal())
            {
            out("this member just rejoined the cluster: " + evt);
            }
        }

    public void memberLeaving(MemberEvent evt)
        {
        }

    public void memberLeft(MemberEvent evt)
        {
        }
    }

For more information on these feature, see the [API documentation for Service, MemberListener and MemberEvent.

Provide a Queryable Data Fabric


Overview

Tangosol invented the concept of a data fabric with the introduction of the Coherence partitioned data management service in 2002. Since then, Forrester Research has labeled the combination of data virtualization, transparent and distributed EIS integration, queryability and uniform accessibility found in Coherence as an information fabric. The term fabric comes from a 2-dimensional illustration of interconnects, as in a switched fabric. The purpose of a fabric architecture is that all points within a fabric have a direct interconnect with all other points.

Data Fabric

An information fabric, or the more simple form called a data fabric or data grid, uses a switched fabric concept as the basis for managing data in a distributed environment. Also referred to as a dynamic mesh architecture, Coherence automatically and dynamically forms a reliable, increasingly resilient switched fabric composed of any number of servers within a grid environment. Consider the attributes and benefits of this architecture:

  • The aggregate data throughput of the fabric is linearly proportional to the number of servers;
  • The in-memory data capacity and data-indexing capacity of the fabric is linearly proportional to the number of servers;
  • The aggregate I/O throughput for disk-based overflow and disk-based storage of data is linearly proportional to the number of servers;
  • The resiliency of the fabric increases with the extent of the fabric, resulting in each server being responsible for only 1/n of the failover responsibility for a fabric with an extent of n servers;
  • If the fabric is servicing clients, such as trading systems, the aggregage maximum number of clients that can be served is linearly proportional to the number of servers.

Coherence accomplishes these technical feats through a variety of algorithms:

  • Coherence dynamically partitions data across all data fabric nodes;
  • Since each data fabric node has a configurable maximum amount of data that it will manage, the capacity of the data fabric is linearly proportional to the number of data fabric nodes;
  • Since the partitioning is automatic and load-balancing, each data fabric node ends up with its fair share of the data management responsibilities, allowing the throughput (in terms of network throughput, disk I/O throughput, query throughput, etc.) to scale linearly with the number of data fabric nodes;
  • Coherence maintains a configurable level of redundancy of data, automatically eliminating single points of failure (SPOFs) by ensuring that data is kept synchronously up-to-date in multiple data fabric nodes;
  • Coherence spreads out the responsibility for data redundancy in a dynamically load-balanced manner so that each server backs up a small amount of data from many other servers, instead of backing up all of the data from one particular server, thus amortizing the impact of a server failure across the entire data fabric;
  • Each data fabric node can handle a large number of client connections, which can be load-balanced by a hardware load balancer.

EIS and Database Integration

The Coherence information fabric can automatically load data on demand from an underlying database or EIS using automatic read-through functionality. If data in the fabric are modified, the same functionality allows that data to be synchronously updated in the database, or queued for asynchronous write-behind.

Coherence automatically partitions data access across the data fabric, resulting in load-balanced data accesses and efficient use of database and EIS connectivity. Furthermore, the read-ahead and write-behind capabilities can cut data access latencies to near-zero levels and insulate the application from temporary database and EIS failures.

Coherence solves the data bottleneck for large-scale compute grids

In large-scale compute grids, such as in DataSynapse financial grids and biotech grids, the bottleneck for most compute processes is in loading a data set and making it available to the compute engines that require it. By layering a Coherence data fabric onto (or beside) a compute grid, these data sets can be maintained in memory at all times, and Coherence can feed the data in parallel at close to wire speed to all of the compute nodes. In a large-scale deployment, Coherence can provide several thousand times the aggregate data throughput of the underlying data source.

Queryable

The Coherence information fabric supports querying from any server in the fabric or any client of the fabric. The queries can be performed using any criteria, including custom criteria such as XPath queries and full text searches. When Coherence partitioning is used to manage the data, the query is processed in parallel across the entire fabric (i.e. the query is also partitioned), resulting in an data query engine that can scale its throughput up to fabrics of thousands of servers. For example, in a trading system it is possible to query for all open "Order" objects for a particular trader:

NamedCache mapTrades = ...
Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid),
                              new EqualsFilter("getStatus", Status.OPEN));
Set setOpenTrades = mapTrades.entrySet(filter);

When an application queries for data from the fabric, the result is a point-in-time snapshot. Additionally, the query results can be kept up-to-date by placing a listener on the query itself or by using the Coherence Continuous Query feature.

Continuous Query

While it is possible to obtain a point in time query result from a Coherence data fabric, and it is possible to receive events that would change the result of that query, Coherence provides a feature that combines a query result with a continuous stream of related events that maintain the query result in a real-time fashion. This capability is called Continuous Query, because it has the same effect as if the desired query had zero latency and the query were repeated several times every millisecond!

Coherence implements Continuous Query using a combination of its data fabric parallel query capability and its real-time event-filtering and streaming. The result is support for thousands of client application instances, such as trading desktops. Using the previous trading system example, it can be converted to a Continuous Query with only one a single line of code changed:

NamedCache mapTrades = ...
Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid),
                              new EqualsFilter("getStatus", Status.OPEN));
NamedCache mapOpenTrades = new ContinuousQueryCache(mapTrades, filter);

The result of the Continuous Query is maintained locally, and optionally all of corresponding data can be cached locally as well.

Summary

Coherence is successfully deployed as a large-scale data fabric for many of the world's largest financial, telecommunications, logistics, travel and media organizations. With unlimited scalability, the highest levels of availability, close to zero latency, an incredibly rich set of capabilities and a sterling reputation for quality, Coherence is the Information Fabric of choice.

Provide a Data Grid


Overview

Coherence provides the ideal infrastructure for building Data Grid services, as well as the client and server-based applications that utilize a Data Grid. At a basic level, Coherence can manage an immense amount of data across a large number of servers in a grid; it can provide close to zero latency access for that data; it supports parallel queries across that data; and it supports integration with database and EIS systems that act as the system of record for that data. For more information on the infrastructure for the Data Grid features in Coherence, refer to the discussion on Data Fabric capabilities. Additionally, Coherence provides a number of services that are ideal for building effective data grids.

All of the Data Grid capabilities described below are features of the Coherence Enterprise Edition.

Targeted Execution

Coherence provides for the ability to execute an agent against an entry in any map of data managed by the Data Grid:

map.invoke(key, agent);

In the case of partitioned data, the agent executes on the grid node that owns the data to execute against. This means that the queueing, concurrency management, agent execution, data access by the agent and data modification by the agent all occur on that grid node. (Only the synchronous backup of the resultant data modification, if any, requires additional network traffic.) For many processing purposes, it is much more efficient to move the serialized form of the agent (usually only a few hundred bytes, at most) than to handle distributed concurrency control, coherency and data updates.

For request/response processing, the agent returns a result:

Object oResult = map.invoke(key, agent);

In other words, Coherence as a Data Grid will determine the location to execute the agent based on the configuration for the data topology, move the agent there, execute the agent (automatically handling concurrency control for the item while executing the agent), back up the modifications if any, and return a result.

Parallel Execution

Coherence additionally provides for the ability to execute an agent against an entire collection of entries. In a partitioned Data Grid, the execution occurs in parallel, meaning that the more nodes that are in the grid, the broader the work is load-balanced across the Data Grid:

map.invokeAll(collectionKeys, agent);

For request/response processing, the agent returns one result for each key processed:

Map mapResults = map.invokeAll(collectionKeys, agent);

In other words, Coherence determines the optimal location(s) to execute the agent based on the configuration for the data topology, moves the agent there, executes the agent (automatically handling concurrency control for the item(s) while executing the agent), backing up the modifications if any, and returning the coalesced results.

Query-Based Execution

As discussed in the queryable data fabric topic, Coherence supports the ability to query across the entire data grid. For example, in a trading system it is possible to query for all open "Order" objects for a particular trader:

NamedCache map    = CacheFactory.getCache("trades");
Filter     filter = new AndFilter(new EqualsFilter("getTrader", traderid),
                                  new EqualsFilter("getStatus", Status.OPEN));
Set setOpenTradeIds = mapTrades.keySet(filter);

By combining this feature with Parallel Execution in the data grid, Coherence provides for the ability to execute an agent against a query. As in the previous section, the execution occurs in parallel, and instead of returning the identities or entries that match the query, Coherence executes the agents against the entries:

map.invokeAll(filter, agent);

For request/response processing, the agent returns one result for each key processed:

Map mapResults = map.invokeAll(filter, agent);

In other words, Coherence combines its Parallel Query and its Parallel Execution together to achieve query-based agent invocation against a Data Grid.

Data-Grid-Wide Execution

Passing an instance of AlwaysFilter (or a null) to the invokeAll method will cause the passed agent to be executed against all entries in the InvocableMap:

map.invokeAll((Filter) null, agent);

As with the other types of agent invocation, request/response processing is supported:

Map mapResults = map.invokeAll((Filter) null, agent);

In other words, with a single line of code, an application can process all the data spread across a particular map in the Data Grid.

Agents for Targeted, Parallel and Query-Based Execution

An agent implements the EntryProcessor interface, typically by extending the AbstractProcessor class.

A number of agents are included with Coherence, including:

  • AbstractProcessor - an abstract base class for building an EntryProcessor
  • ExtractorProcessor - extracts and returns a specific value (such as a property value) from an object stored in an InvocableMap
  • CompositeProcessor - bundles together a collection of EntryProcessor objects that are invoked sequentially against the same Entry
  • ConditionalProcessor - conditionally invokes an EntryProcessor if a Filter against the Entry-to-process evaluates to true
  • PropertyProcessor - an abstract base class for EntryProcessor implementations that depend on a PropertyManipulator