PRODUCTS AND SERVICES INDUSTRIES SUPPORT PARTNERS COMMUNITIES ABOUT
  Coherence 3.3 User Guide
  Coherence User Guide (Full)
Added by Rob Misek, last edited by Jon Purdy on Oct 17, 2006  (view change)

Labels

 
(None)

Table of Contents

Overview: What Coherence can do for you...

Defining a Data Grid Start Here!
Cluster your objects and your data
Deliver events for changes as they occur
Automatically manage dynamic cluster membership
Provide a Queryable Data Fabric
Provide a Data Grid
Real Time Client - RTC
Using Coherence and BEA WebLogic Portal
Using Coherence and JPA
Using Coherence and TopLink Essentials
Using Coherence and Hibernate

Architecture

Overview for Implementors
Managing an Object Model

Getting Started

Introduction
Terms
Installing Oracle Coherence
Installing Coherence*Web Session Management Module
Using the Coherence*Web Installer Ant Task
Types of Caches in Coherence
Cache Semantics
Creating and Using Coherence Caches
Configuring and Using Coherence*Extend

Code Examples and FAQ

Technical Overview

Clustering
Cache Topologies
Cluster Services Overview
Replicated Cache Service
Partitioned Cache Service
Local Storage
Local Cache
Best Practices
Network Protocols

Patterns

Bulk Loading and Processing with Coherence
CacheFactory Spring Integration

Production Planning and Troubleshooting

Production Checklist
Multicast Test (verifying multicast support)
Datagram Test (network performance testing)
Performance Tuning

Management and Monitoring

Managing Coherence using JMX
Manage Custom MBeans within the Coherence Cluster

Operational Configuration

Operational Configuration Elements
Parameter Setttings
Element Attributes
Command Line Setting Override Feature
Operational Configuration Elements Reference

Cache Configuration

Cache Configuration Elements
Parameter Macros
Sample Cache Configurations
Cache Configuration Elements Reference

Data Management Features

Read-Through, Write-Through, Refresh-Ahead and Write-Behind Caching
Querying the Cache
Continuous Query
Transactions, Locks and Concurrency

Advanced Features

Specifying a Custom Eviction Policy
Partition Affinity
Serialization Paged Cache
Security Framework
Network Filters
Priority Tasks

Testing

Evaluating Performance and Scalability
Scaling Out Your Data Grid Aggregations Linearly

Release Notes

Coherence 3.3 Release Notes
Coherence 3.3.1 Release Notes

Feature Listing

Coherence Features by Edition

Overview: What Coherence can do for you...


Defining a Data Grid


The Oracle Coherence In-Memory Data Grid is a data management system for application objects that are shared across multiple servers, require low response time, very high throughput, predictable scalability, continuous availability and information reliability. For clarity, each of these terms and claims is explained:

  • A Data Grid is a system composed of multiple servers that work together to manage information and related operations - such as computations - in a distributed environment.

  • An In-Memory Data Grid is a Data Grid that stores the information in memory in order to achieve very high performance, and uses redundancy - by keeping copies of that information synchronized across multiple servers - in order to ensure the resiliency of the system and the availability of the data in the event of server failure.

  • The application objects are the actual components of the application that contain information that is shared across multiple servers and that must survive server failure in order for the application to be continuously available. These objects are typically built in an object-oriented language such as Java (e.g. POJOs), C++, C#, VB.NET or Ruby. Unlike a relational schema, application objects are often hierarchical in nature, and may contain information that is pulled from any database.

  • The application objects must be shared across multiple servers because a middleware application (such as eBay and Amazon.com) is horizontally scaled by adding servers, with each server running an instance of that application. Since the application instance running on one server may read and write some of the same information that an application instance running on another server reads and writes, that information must be shared. The alternative is to always access that information from a shared resource, such as a database, which will lower performance by requiring both remote coordinated access and Object/Relational Mapping (ORM), and decrease scalability by making that shared resource a bottleneck.

  • Because an application object is not relational, in order to retrieve it from a relational database the information must be mapped from a relational query into the object; this is known as Object/Relational Mapping (ORM). Examples include Java's EJB 3.0 and JPA, and ADO.NET. The same technology allows that object to be stored in a relational database by deconstructing the object (or changes to the object) into a series of SQL inserts, updates and deletes. Since a single object may be composed of information from many tables, the cost of accessing objects from a database using Object/Relational Mapping can be significant, both in terms of the load on the database and the latency of the data access.

  • An In-Memory Data Grid achieves low response times for data access by keeping the information in-memory and in the application object form, and by sharing that information across multiple servers. In other words, applications may be able to access the information that they require without any network communication and without any data transformation step such as ORM. In cases where network communication is required, the Oracle Coherence avoids introducing a Single Point of Bottleneck (SPOB) by partitioning - spreading out - information across the grid, with each server being responsible for managing its own fair share of the total set of information.

  • High throughput of information access and change is achieved through four different aspects of the In-Memory Data Grid. First, Oracle Coherence employs an extremely sophisticated clustering protocol that can achieve wire speed throughput of information on each server, allowing the aggregate flow of information to increase linearly with the number of servers. Second, by partitioning the information, as servers are added each one assumes responsibility for its fair share of the total set of information, thus load-balancing the data management responsibilities into smaller and smaller portions. Third, by combining the wire speed throughput and the partitioning with automatic knowledge of the location of information within the Data Grid, Oracle Coherence routes all read and write requests directly to the servers that manage the targeted information, resulting in true linear scalability of both read and write operations; in other words, high throughput of information access and change. Fourth, for queries, transactions and calculations, particularly those that operate against large sets of data, Oracle Coherence can route those operations to the servers that manage the target data and execute them in parallel.

  • By using dynamic partitioning to eliminate bottlenecks and achieving predictably low latency regardless of the number of servers in the Data Grid, Oracle Coherence provides predictable scalability of applications. While certain applications can use Coherence to achieve linear scalability, that is largely determined by the nature of the application, and thus varies from application to application. More important is the ability of a customer to examine the nature of their application and to be able to predict how many servers will be required in order to achieve a certain level of scale, such as supporting a specified number of concurrent users on a system or completing a complex financial calculation within a certain number of minutes. One way that Coherence accomplishes this is by executing large-scale operations - such as queries, transactions and calculations - in parallel using all of the servers in the Data Grid.

  • One of the ways that Coherence can eliminate bottlenecks is to queue up transactions that have occurred in memory, and asynchronously write the end result to a system of record, such as an Oracle database. This is particularly appropriate in systems that have extremely high rates of change due to the processing of many small transactions, particularly when only the end result needs to be made persistent. Coherence both (i) coalesces multiple changes to a single application object and (ii) batches multiple modified application objects into a single database transaction, meaning that a hundred different changes to each of a hundred different application objects could be persisted to a database in a single, large - and thus highly efficient - transaction. Application objects pending to be written are safeguarded from loss by being managed in a continuously available manner.

  • Continuous availability is achieved by a combination of four capabilities. First, the clustering protocol used by Oracle Coherence can rapidly detect server failure and achieve consensus across all the surviving servers about the detected failure. Second, information is synchronously replicated across multiple servers, so no Single Point of Failure (SPOF) exists. Third, each server knows where the synchronous replicas of each piece of information are located, and automatically re-routes information access and change operations to those replicas. Fourth, Oracle Coherence ensures that each operation executes in a Once-and-Only-Once manner, so that operations that are being executed when a server fails do not accidentally corrupt information during failover.

  • Failover is the process of switching over automatically to a redundant or standby computer server, system, or network upon the failure or abnormal termination of the previously active server, system, or network. Failover happens without human intervention and generally without warning. (As defined by Wikipedia: http://en.wikipedia.org/wiki/Failover)\\
  • Information reliability is achieved through a combination of four capabilities. First, Oracle Coherence uses cluster consensus to achieve unambiguous ownership of information within the Data Grid; in other words, at all times exactly one server is responsible for managing the master copy of each piece of information in the Data Grid. Second, because that master copy is owned by a specific server, that server can order the operations that are occurring to that information and synchronize the results of those operations with other servers. Third, because the information is continuously available, these qualities of service exist even during and after the failure of a server. Fourth, by ensuring Once-and-Only-Once operations, no operations are lost or accidentally repeated when server failure does occur. The combination of these four capabilities results is the information within the Data Grid being reliable for use by transactional applications.

As a result of these capabilities, Oracle Coherence is ideally suited for use in computationally intensive, stateful middle-tier applications. Coherence is targeted to run in the application tier, and is often run in-process with the application itself, for example in an Application Server Cluster.

Start Here!


Cluster your objects and your data


Overview

Coherence is an essential ingredient for building reliable, high-scale clustered applications. The term clustering refers to the use of more than one server to run an application, usually for reliability and scalability purposes. Coherence provides all of the necessary capabilities for applications to achieve the maximum possible availability, reliability, scalability and performance. Virtually any clustered application will benefit from using Coherence.

One of the primary uses of Coherence is to cluster an application's objects and data. In the simplest sense, this means that all of the objects and data that an application delegates to Coherence are automatically available to and accessible by all servers in the application cluster, and none of those objects and none of that data will be lost in the event of server failure.

By clustering the application's objects and data, Coherence solves many of the difficult problems related to achieving availability, reliability, scalability, performance, serviceability and manageability of clustered applications.

Availability

Availability refers to the percentage of time that an application is operating. High Availability refers to achieving availability close to 100%. Coherence is used to achieve High Availability in several different ways:

Supporting redundancy in Java applications

Coherence makes it possible for an application to run on more than one server, which means that the servers are redundant. Using a load balancer, for example, an application running on redundant servers will be available as long as one server is still operating. Coherence enables redundancy by allowing an application to share, coordinate access to, update and receive modification events for critical runtime information across all of the redundant servers. Most applications cannot operate in a redundant server environment unless they are architected to run in such an environment; Coherence is a key enabler of such an architecture.

Enabling dynamic cluster membership

Coherence tracks exactly what servers are available at any given moment. When the application is started on an additional server, Coherence is instantly aware of that server coming online, and automatically joins it into the cluster. This allows redundancy (and thus availability) to be dynamically increased by adding servers.

Exposing knowledge of server failure

Coherence reliably detects most types of server failure in less than a second, and immediately fails over all of the responsibilities of the failed server without losing any data. As a result, server failure does not impact availability.

Part of an availability management is Mean Time To Recovery (MTTR), which is a measurement of how much time it takes for an unavailable application to become available. Since server failure is detected and handled in less than a second, and since redundancy means that the application is available even when that server goes down, the MTTR due to server failure is zero from the point of view of application availability, and typically sub-second from the point of view of a load-balancer re-routing an incoming request.

Eliminating other Single Points Of Failure (SPOFs)

Coherence provides insulation against failures in other infrastructure tiers. For example, Coherence write-behind caching and Coherence distributed parallel queries can insulate an application from a database failure; in fact, using these capabilities, two different Coherence customers have had database failure during operational hours, yet their production Coherence-based applications maintained their availability and their operational status.

Providing support for Disaster Recovery (DR) and Continuancy Planning

Coherence can even insulate against failure of an entire data center, by clustering across multiple data centers and failing over the responsibilities of an entire data center. Again, this capability has been proven in production, with a Coherence customer running a mission-critical real-time financial system surviving a complete data center outage.

Reliability

Reliability refers to the percentage of time that an application is able to process correctly. In other words, an application may be available, yet unreliable if it cannot correctly handle the application processing. An example that we use to illustrate high availability but low reliability is a mobile phone network: While most mobile phone networks have very high uptimes (referring to availability), dropped calls tend to be relatively common (referring to reliability).

Coherence is explicitly architected to achieve very high levels of reliability. For example, server failure does not impact "in flight" operations, since each operation is atomically protected from server failure, and will internally re-route to a secondary node based on a dynamic pre-planned recovery strategy. In other words, every operation has a backup plan ready to go!

Coherence is architected based on the assumption that failures are always about to occur. As a result, the algorithms employed by Coherence are carefully designed to assume that each step within a operation could fail due to a network, server, operating system, JVM or other resource outage. An example of how Coherence plans for these failures is the synchronous manner in which it maintains redundant copies of data; in other words, Coherence does not gamble with the application's data, and that ensures that the application will continue to work correctly, even during periods of server failure.

Scalability

Scalability refers to the ability of an application to predictably handle more load. An application exhibits linear scalability if the maximum amount of load that an application can sustain is directly proportional to the hardware resources that the application is running on. For example, if an application running on 2 servers can handle 2000 requests per second, then linear scalability would imply that 10 servers would handle 10000 requests per second.

Linear scalability is the goal of a scalable architecture, but it is difficult to achieve. The measurement of how well an application scales is called the scaling factor (SF). A scaling factor of 1.0 represents linear scalability, while a scaling factor of 0.0 represents no scalability. Coherence provides a number of capabilities designed to help applications achieve linear scalability.

When planning for extreme scale, the first thing to understand is that application scalability is limited by any necessary shared resource that does not exhibit linear scalability. The limiting element is referred to as a bottleneck, and in most applications, the bottleneck is the data source, such as a database or an EIS.

Coherence helps to solve the scalability problem by targeting obvious bottlenecks, and by completely eliminating bottlenecks whenever possible. It accomplishes this through a variety of capabilities, including:

Distributed Caching

Coherence uses a combination of replication, distribution, partitioning and invalidation to reliably maintain data in a cluster in such a way that regardless of which server is processing, the data that it obtains from Coherence is the same. In other words, Coherence provides a distributed shared memory implementation, also referred to as Single System Image (SSI) and Coherent Clustered Caching.

Any time that an application can obtain the data it needs from the application tier, it is eliminating the data source as the Single Point Of Bottleneck (SPOB).

Partitioning

Partitioning refers to the ability for Coherence to load-balance data storage, access and management across all of the servers in the cluster. For example, when using Coherence data partitioning, if there are four servers in a cluster then each will manage 25% of the data, and if another server is added, each server will dynamically adjust so that each of the five servers will manage 20% of the data, and this data load balancing will occur without any application interruption and without any lost data or operations. Similarly, if one of those five servers were to die, each of the remaining four servers would be managing 25% of the data, and this data load balancing will occur without any application interruption and without any lost data or operations – including the 20% of the data that was being managed on the failed server.

Coherence accomplishes failover without data loss by synchronously maintaining a configurable number of copies of the data within the cluster. Just as the data management responsibility is spread out over the cluster, so is the responsibility for backing up data, so in the previous example, each of the remaining four servers would have roughly 25% of the failed server's data backed up on it. This mesh architecture guarantees that on server failure, no particular remaining server is inundated with a massive amount of additional responsibility.

Coherence prevents loss of data even when multiple instances of the application are running on a single physical server within the cluster. It does so by ensuring that backup copies of data are being managed on different physical servers, so that if a physical server fails or is disconnected, all of the the data being managed by the failed server has backups ready to go on a different server.

Lastly, partitioning supports linear scalability of both data capacity and throughput. It accomplishes the scalability of data capacity by evenly balancing the data across all servers, so four servers can naturally manage two times as much data as two servers. Scalability of throughput is also a direct result of load-balancing the data across all servers, since as servers are added, each server is able to utilize its full processing power to manage a smaller and smaller percentage of the overall data set. For example, in a ten-server cluster each server has to manage 10% of the data operations, and – since Coherence uses a peer-to-peer architecture – 10% of those operations are coming from each server. With ten times that many servers (i.e. 100 servers), each server is managing only 1% of the data operations, and only 1% of those operations are coming from each server – but there are ten times as many servers, so the cluster is accomplishing ten times the total number of operations! In the 10-server example, if each of the ten servers was issuing 100 operations per second, they would each be sending 10 of those operations to each of the other servers, and the result would be that each server was receiving 100 operations (10x10) that it was responsible for processing. In the 100-server example, each would still be issuing 100 operations per second, but each would be sending only one operation to each of the other servers, so the result would be that each server was receiving 100 operations (100x1) that it was responsible for processing. This linear scalability is made possible by modern switched network architectures that provide backplanes that scale linearly to the number of ports on the switch, providing each port with dedicated fully-duplexed (upstream and downstream) bandwidth. Since each server is only sending and receiving 100 operations (in both the 10-server and 100-server examples), the network bandwidth utilization is roughly constant per port regardless of the number of servers in the cluster.

Session Management

One common use case for Coherence clustering is to manage user sessions (conversational state) in the cluster. This capability is provided by the Coherence*Web module, which is a built-in feature of Coherence. Coherence*Web provides linear scalability for HTTP Session Management in clusters of hundreds of production servers. It can achieve this linear scalability because at its core it is built on Coherence dynamic partitioning.

Session management highlights the scalability problem that typifies shared data sources: If an application could not share data across the servers, it would have to delegate that data management entirely to the shared store, which is typically the application's database. If the HTTP session were stored in the database, each HTTP request (in the absence of sticky load-balancing) would require a read from the database, causing the desired reads-per-second from the database to increase linearly with the size of the server cluster. Further, each HTTP request causes an update of its corresponding HTTP session, so regardless of sticky load balancing, to ensure that HTTP session data is not lost when a server fails the desired writes-per-second to the database will also increase linearly with the size of the server cluster. In both cases, the actual reads and writes per second that a database is capable of does not scale in relation to the number of servers requesting those reads and writes, and the database quickly becomes a bottleneck, forcing availability, reliability (e.g. asynchronous writes) and performance compromises. Additionally, related to performance, each read from a database has an associated latency, and that latency increases dramatically as the database experiences increasing load.

Coherence*Web, on the other hand, has the same latency in a 2-server cluster as it has in a 200-server cluster, since all HTTP session read operations that cannot be handled locally (e.g. locality as the result of the sticky load balancing) are spread out evenly across the rest of the cluster, and all update operations (which must be handled remotely to ensure survival of the HTTP sessions) are likewise spread out evently across the rest of the cluster. The result is linear scalability with constant latency, regardless of the size of the cluster.

Performance

Performance is the inverse of latency, and latency is the measurement of how long something takes to complete. If increasing performance is the goal, then getting rid of anything that has any latency is the solution. Obviously, it is impossible to get rid of all latencies, since the High Availability and reliability aspects of an application are counting on the underlying infrastructure, such as Coherence, to maintain reliable up-to-date back-ups of important information, which means that some operations (such as data modifications and pessimistic transactions) have unavoidable latencies. On the other hand, every remaining operation that could possibly have any latency needs to be targeted for elimination, and Coherence provides a large number of capabilities designed to do just that:

Replication

Just like partitioning dynamically load-balances data evenly across the entire server cluster, replication ensures that a desired set of data is up-to-date on every single server in the cluster at all times. Replication allows operations running on any server to obtain the data that they need locally, at basically no cost, because that data has already been replicated to that server. In other words, replication is a tool to guarantee locality of reference, and the end result is zero-latency access to replicated data.

Near Caching

Since replication works best for data that should be on all servers, it follows that replication is inefficient for data that an application would want to avoid copying to all servers. For example, data that changes all of the time and very large data sets are both poorly suited to replication, but both are excellently suited to partitioning, since it exhibits linear scale of data capacity and throughput.

The only downside of partitioning is that it introduces latency for data access, and in most applications the data access rate far out-weighs the data modification rate. To eliminate the latency associated with partitioned data access, near caching maintains frequently- and recently-used data from the partitioned cache on the specific servers that are accessing that data, and it keeps that data coherent by means of event-based invalidation. In other words, near caching keeps the most-likely-to-be-needed data near to where it will be used, thus providing good locality of access, yet backed up by the linear scalability of partitioning.

Write-Behind, Write-Coalescing and Write-Batching

Since the transactional throughput in the cluster is linearly scalable, the cost associated with data changes can be a fixed latency, typically in the range of a few milliseconds, and the total number of transactions per second is limited only by the size of the cluster. In one application, Coherence was able to achieve transaction rates close to a half-million transactions per second – and that on a cluster of commodity two-CPU servers.

Often, the data being managed by Coherence is actually a temporary copy of data that exists in an official System Of Record (SOR), such as a database. To avoid having the database become a transaction bottleneck, and to eliminate the latency of database updates, Coherence provides a Write-Behind capability, which allows the application to change data in the cluster, and those changes are asynchronously replayed to the application's database (or EIS). By managing the changes in a clustered cache (which has all of the High Availability, reliability and scalability attributes described previously,) the pending changes are immune to server failure and the total rate of changes scales linearly with the size of the cluster.

The Write-Behind functionality is implemented by queueing each data change; the queue contains a list of what changes needs to be written to the System Of Record. The duration of an item within the queue is configurable, and is referred to as the Write-Behind Delay. When data changes, it is added to the write-behind queue (if it is not already in the queue), and the queue entry is set to ripen after the configured Write-Behind Delay has passed. When the queue entry has ripened, the latest copy of the corresponding data is written to the System Of Record.

To avoid overwhelming the System Of Record, Coherence will replay only the latest copies of data to the database, thus coalescing many updates that occur to the same piece data into a single database operation. The longer the Write-Behind Delay, the more coalescing may occur. Additionally, if many different pieces of data have changed, all of those updates can be batched (e.g. using JDBC statement batching) into a single database operation. In this way, a massive breadth of changes (number of pieces of data changed) and depth of changes (number of times each was changed) can be bundled into a single database operation, which results in dramatically reduced load on the database. The batching is also fully configurable; one option, called the Write Batch Factor, even allows some of the queue entries that have not yet ripened to be included in the batched update.

Serviceability

Serviceability refers to the ease and extent of changes that can be affected without affecting availability. Coherence helps to increase an application's serviceability by allowing servers to be taken off-line without impacting the application availability. Those servers can be serviced and brought back online without any end-user or processing interruptions. Many configuration changes related to Coherence can also be made on a node-by-node basis in the same manner. With careful planning, even major application changes can be rolled into production – again, one node at a time – without interrupting the application.

Manageability

Manageability refers to the level of information that a running system provides, and the capability to tweak settings related to that information. For example, Coherence provides a cluster-wide view of management information via the standard JMX API, so that the entire cluster can be managed from a single server. The information provided includes hit and miss rates, cace sizes, read-, write- and write-behind statistics, and detailed information all the way down to the network packet level.

Additionally, Coherence allows applications to place their own management information – and expose their own tweakable settings – through the same clustered JMX implementation. The result is an application infrastructure that makes managing and monitoring a clustered application as simple as managing and monitoring a single server, and all through Java's standard management API.

Summary

There are a lot of challenges in building a highly available application that exhibits scalable performance and is both serviceable and manageable. While there are many ways to build distributed applications, only Coherence reliably clusters objects and data. Once objects and data are clustered by Coherence, all the servers in the cluster can access and modify those objects and that data, and the objects and data managed by Coherence will not be effected if and when servers fail. By providing a variety of advanced capabilities, each of which is configurable, and application can achieve the optimal balance of redundancy, scalability and performance, and do so within a manageable and serviceable environment.

Unknown macro: {rate-table}


Deliver events for changes as they occur


Overview

Coherence provides cache events using the JavaBean Event model. It is extremely simple to receive the events that you need, where you need them, regardless of where the changes are actually occurring in the cluster. Developers with any experience with the JavaBean model will have no difficulties working with events, even in a complex cluster.

Listener interface and Event object

In the JavaBeans Event model, there is an EventListener interface that all listeners must extend. Coherence provides a MapListener interface, which allows application logic to receive events when data in a Coherence cache is added, modified or removed:

excerpt from com.tangosol.util.MapListener
public interface MapListener
        extends EventListener
    {
    /**
    * Invoked when a map entry has been inserted.
    *
    * @param evt  the MapEvent carrying the insert information
    */
    public void entryInserted(MapEvent evt);

    /**
    * Invoked when a map entry has been updated.
    *
    * @param evt  the MapEvent carrying the update information
    */
    public void entryUpdated(MapEvent evt);

    /**
    * Invoked when a map entry has been removed.
    *
    * @param evt  the MapEvent carrying the delete information
    */
    public void entryDeleted(MapEvent evt);
    }

An application object that implements the MapListener interface can sign up for events from any Coherence cache or class that implements the ObservableMap interface, simply by passing an instance of the application's MapListener implementation to one of the addMapListener() methods.

The MapEvent object that is passed to the MapListener carries all of the necessary information about the event that has occurred, including the source (ObservableMap) that raised the event, the identity (key) that the event is related to, what the action was against that identity (insert, update or delete), what the old value was and what the new value is:

excerpt from com.tangosol.util.MapEvent
public class MapEvent
        extends EventObject
    {
    /**
    * Return an ObservableMap object on which this event has actually
    * occured.
    *
    * @return an ObservableMap object
    */
    public ObservableMap getMap()

    /**
    * Return this event's id. The event id is one of the ENTRY_*
    * enumerated constants.
    *
    * @return an id
    */
    public int getId()

    /**
    * Return a key associated with this event.
    *
    * @return a key
    */
    public Object getKey()

    /**
    * Return an old value associated with this event.
    * <p>
    * The old value represents a value deleted from or updated in a map.
    * It is always null for "insert" notifications.
    *
    * @return an old value
    */
    public Object getOldValue()

    /**
    * Return a new value associated with this event.
    * <p>
    * The new value represents a new value inserted into or updated in
    * a map. It is always null for "delete" notifications.
    *
    * @return a new value
    */
    public Object getNewValue()


    // ----- Object methods -------------------------------------------------

    /**
    * Return a String representation of this MapEvent object.
    *
    * @return a String representation of this MapEvent object
    */
    public String toString()


    // ----- constants ------------------------------------------------------

    /**
    * This event indicates that an entry has been added to the map.
    */
    public static final int ENTRY_INSERTED = 1;

    /**
    * This event indicates that an entry has been updated in the map.
    */
    public static final int ENTRY_UPDATED  = 2;

    /**
    * This event indicates that an entry has been removed from the map.
    */
    public static final int ENTRY_DELETED  = 3;
    }

Caches and classes that support events

All Coherence caches implement ObservableMap; in fact, the NamedCache interface that is implemented by all Coherence caches extends the ObservableMap interface. That means that an application can sign up to receive events from any cache, regardless of whether that cache is local, partitioned, near, replicated, using read-through, write-through, write-behind, overflow, disk storage, etc.

Regardless of the cache topology and the number of servers, and even if the modifications are being made by other servers, the events will be delivered to the application's listeners.

In addition to the Coherence caches (those objects obtained through a Coherence cache factory), several other supporting classes in Coherence also implement the ObservableMap interface:

  • ObservableHashMap
  • LocalCache
  • OverflowMap
  • NearCache
  • ReadWriteBackingMap
  • AbstractSerializationCache, SerializationCache and SerializationPagedCache
  • WrapperObservableMap, WrapperConcurrentMap and WrapperNamedCache

For a full list of published implementing classes, see the Coherence JavaDoc for ObservableMap.

Signing up for all events

To sign up for events, simply pass an object that implements the MapListener interface to one of the addMapListener methods on ObservableMap:

ObservableMap methods
public void addMapListener(MapListener listener);
public void addMapListener(MapListener listener, Object oKey, boolean fLite);
public void addMapListener(MapListener listener, Filter filter, boolean fLite);

Let's create an example MapListener implementation:

Example MapListener implementation
/**
* A MapListener implementation that prints each event as it receives
* them.
*/
public static class EventPrinter
        extends Base
        implements MapListener
    {
    public void entryInserted(MapEvent evt)
        {
        out(evt);
        }

    public void entryUpdated(MapEvent evt)
        {
        out(evt);
        }

    public void entryDeleted(MapEvent evt)
        {
        out(evt);
        }
    }

Using this implementation, it is extremely simple to print out all events from any given cache (since all caches implement the ObservableMap interface):

cache.addMapListener(new EventPrinter());

Of course, to be able to later remove the listener, it is necessary to hold on to a reference to the listener:

Listener listener = new EventPrinter();
cache.addMapListener(listener);
m_listener = listener; // store the listener in a field

Later, to remove the listener:

Listener listener = m_listener;
if (listener != null)
    {
    cache.removeMapListener(listener);
    m_listener = null; // clean up the listener field
    }

Each addMapListener method on the ObservableMap interface has a corresponding removeMapListener method. To remove a listener, use the removeMapListener method that corresponds to the addMapListener method that was used to add the listener.

Using an inner class as a MapListener

When creating an an inner class to use as a MapListener, or when implementing a MapListener that only listens to one or two types of events (inserts, updates or deletes), you can use the AbstractMapListener base class. For example, the following anonymous inner class prints out only the insert events for the cache:

cache.addMapListener(new AbstractMapListener()
    {
    public void entryInserted(MapEvent evt)
        {
        out(evt);
        }
    });

Another helpful base class for creating a MapListener is the MultiplexingMapListener, which routes all events to a single method for handling. For example, the EventPrinter example could be simplified to:

public static class EventPrinter
        extends MultiplexingMapListener
    {
    public void onMapEvent(MapEvent evt)
        {
        out(evt);
        }
    }

Since only one method needs to be implemented to capture all events, the MultiplexingMapListener can also be very useful when creating an an inner class to use as a MapListener.

Configuring a MapListener for a Cache

If the listener should always be on a particular cache, then place it into the cache configuration using the listener element and Coherence will automatically add the listener when it configures the cache.

Signing up for Events on specific identities

Signing up for events that occur against specific identities (keys) is just as simple. For example, to print all events that occur against the Integer key "5":

cache.addMapListener(new EventPrinter(), new Integer(5), false);

So the following code would only trigger an event when the Integer key "5" is inserted or updated:

for (int i = 0; i < 10; ++i)
    {
    Integer key   = new Integer(i);
    String  value = "test value for key " + i;
    cache.put(key, value);
    }

Filtering Events

Similar to listening to a particular key, it is possible to listen to particular events. Consider the following example:

// Filters used with partitioned caches must be Serializable, Externalizable or ExternalizableLite
public class DeletedFilter
        implements Filter, Serializable
    {
    public boolean evaluate(Object o)
        {
        MapEvent evt = (MapEvent) o;
        return evt.getId() == MapEvent.ENTRY_DELETED;
        }
    }

cache.addMapListener(new EventPrinter(), new DeletedFilter(), false);
Filtering events versus filtering cached data

When building a Filter for querying, the object that will be passed to the evaluate method of the Filter will be a value from the cache, or – if the Filter implements the EntryFilter interface – the entire Map.Entry from the cache. When building a Filter for filtering events for a MapListener, the object that will be passed to the evaluate method of the Filter will always be of type MapEvent.

For more information on how to use a query filter to listen to cache events, see the section below titled Advanced: Listening to Queries.

The listener is added to the cache with a filter that allows the listener to only receive delete events. For example, if the following sequence of calls were made:

cache.put("hello", "world");
cache.put("hello", "again");
cache.remove("hello");

The result would be:

CacheEvent{LocalCache deleted: key=hello, value=again}

For more information, see the Advanced: Listening to Queries section below.

"Lite" Events

By default, Coherence provides both the old and the new value as part of an event. Consider the following example:

MapListener listener = new MultiplexingMapListener()
    {
    public void onMapEvent(MapEvent evt)
        {
        out("event has occurred: " + evt);
        out("(the wire-size of the event would have been "
            + ExternalizableHelper.toBinary(evt).length()
            + " bytes.)");
        }
    };
cache.addMapListener(listener);

// insert a 1KB value
cache.put("test", new byte[1024]);

// update with a 2KB value
cache.put("test", new byte[2048]);

// remove the 2KB value
cache.remove("test");

The output from running the test shows that the first event carries the 1KB inserted value, the second event carries both the replaced 1KB value and the new 2KB value, and the third event carries the removed 2KB value:

event has occurred: CacheEvent{LocalCache added: key=test, value=[B@a470b8}
(the wire-size of the event would have been 1283 bytes.)
event has occurred: CacheEvent{LocalCache updated: key=test, old value=[B@a470b8, new value=[B@1c6f579}
(the wire-size of the event would have been 3340 bytes.)
event has occurred: CacheEvent{LocalCache deleted: key=test, value=[B@1c6f579}
(the wire-size of the event would have been 2307 bytes.)

When an application does not require the old and the new value to be included in the event, it can indicate that by requesting only "lite" events. When adding a listener, you can request lite events by using one of the two addMapListener methods that takes an additional boolean fLite parameter. In the above example, the only change would be:

cache.addMapListener(listener, (Filter) null, true);

Obviously, a lite event's old value and new value may be null. However, even if you request lite events, the old and the new value may be included if there is no additional cost to generate and deliver the event. In other words, requesting that a MapListener receive lite events is simply a hint to the system that the MapListener does not need to know the old and new values for the event.

Advanced: Listening to Queries

All Coherence caches support querying by any criteria. When an application queries for data from a cache, the result is a point-in-time snapshot, either as a set of identities ("keySet") or a set of identity/value pairs ("entrySet"). The mechanism for determining the contents of the resulting set is referred to as filtering, and it allows an application developer to construct queries of arbitrary complexity using a rich set of out-of-the-box filters (e.g. equals, less-than, like, between, etc.), or to provide their own custom filters (e.g. XPath).

The same filters that are used to query a cache can be used to listen to events from a cache. For example, in a trading system it is possible to query for all open "Order" objects for a particular trader:

NamedCache mapTrades = ...
Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid),
                              new EqualsFilter("getStatus", Status.OPEN));
Set setOpenTrades = mapTrades.entrySet(filter);

To receive notifications of new trades being opened for that trader, closed by that trader or reassigned to or from another trader, the application can use the same filter:

// receive events for all trade IDs that this trader is interested in
trades.addMapListener(listener, new MapEventFilter(filter), true);

The MapEventFilter converts a query filter into an event filter.

Filtering events versus filtering cached data

When building a Filter for querying, the object that will be passed to the evaluate method of the Filter will be a value from the cache, or – if the Filter implements the EntryFilter interface – the entire Map.Entry from the cache. When building a Filter for filtering events for a MapListener, the object that will be passed to the evaluate method of the Filter will always be of type MapEvent.

The MapEventFilter converts a Filter that is used to do a query into a Filter that is used to filter events for a MapListener. In other words, the MapEventFilter is constructed from a Filter that queries a cache, and the resulting MapEventFilter is a filter that evaluates MapEvent objects by converting them into the objects that a query Filter would expect.

The MapEventFilter has a number of very powerful options, allowing an application listener to receive only the events that it is specifically interested in. More importantly for scalability and performance, only the desired events have to be communicated over the network, and they are communicated only to the servers and clients that have expressed interest in those specific events! For example:

// receive all events for all trades that this trader is interested in
nMask = MapEventFilter.E_ALL;
trades.addMapListener(listener, new MapEventFilter(nMask, filter), true);

// receive events for all this trader's trades that are closed or
// re-assigned to a different trader
nMask = MapEventFilter.E_UPDATED_LEFT | MapEventFilter.E_DELETED;
trades.addMapListener(listener, new MapEventFilter(nMask, filter), true);

// receive events for all trades as they are assigned to this trader
nMask = MapEventFilter.E_INSERTED | MapEventFilter.E_UPDATED_ENTERED;
trades.addMapListener(listener, new MapEventFilter(nMask, filter), true);

// receive events only for new trades assigned to this trader
nMask = MapEventFilter.E_INSERTED;
trades.addMapListener(listener, new MapEventFilter(nMask, filter), true);

For more information on the various options supported, see the API documentation for MapEventFilter.

Advanced: Synthetic Events

Events usually reflect the changes being made to a cache. For example, one server is modifying one entry in a cache while another server is adding several items to a cache while a third server is removing an item from the same cache, all while fifty threads on each and every server in the cluster is accessing data from the same cache! All the modifying actions will produce events that any server within the cluster can choose to receive. We refer to these actions as client actions, and the events as being dispatched to clients, even though the "clients" in this case are actually servers. This is a natural concept in a true peer-to-peer architecture, such as a Coherence cluster: Each and every peer is both a client and a server, both consuming services from its peers and providing services to its peers. In a typical Java Enterprise application, a "peer" is an application server instance that is acting as a container for the application, and the "client" is that part of the application that is directly accessing and modifying the caches and listening to events from the caches.

Some events originate from within a cache itself. There are many examples, but the most common cases are:

  • When entries automatically expire from a cache;
  • When entries are evicted from a cache because the maximum size of the cache has been reached;
  • When entries are transparently added to a cache as the result of a Read-Through operation;
  • When entries in a cache are transparently updated as the result of a Read-Ahead or Refresh-Ahead operation.

Each of these represents a modification, but the modifications represent natural (and typically automatic) operations from within a cache. These events are referred to as synthetic events.

When necessary, an application can differentiate between client-induced and synthetic events simply by asking the event if it is synthetic. This information is carried on a sub-class of the MapEvent, called CacheEvent. Using the previous EventPrinter example, it is possible to print only the synthetic events:

public static class EventPrinter
        extends MultiplexingMapListener
    {
    public void onMapEvent(MapEvent evt)
        {
        if (evt instanceof CacheEvent && ((CacheEvent) evt).isSynthetic())
            {
            out(evt);
            )
        }
    }

For more information on this feature, see the API documentation for CacheEvent.

Advanced: Backing Map Events

While it is possible to listen to events from Coherence caches, each of which presents a local view of distributed, partitioned, replicated, near-cached, continuously-queried, read-through/write-through and/or write-behind data, it is also possible to peek behind the curtains, so to speak. Normally, the advice from the Wizard of Oz is sufficient:

"Pay no attention to the man behind the curtain!"

For some advanced use cases, it may be necessary to pay attention the man behind the curtain – or more correctly, to "listen to" the "map" behind the "service". Replication, partitioning and other approaches to managing data in a distributed environment are all distribution services. The service still has to have something in which to actually manage the data, and that something is called a "backing map".

Backing maps are configurable. If all the data for a particular cache should be kept in object form on the heap, then use an unlimited and non-expiring LocalCache (or a SafeHashMap if statistics are not required). If only a small number of items should be kept in memory, use a LocalCache. If data are to be read on demand from a database, then use a ReadWriteBackingMap (which knows how to read and write through an application's DAO implementation), and in turn give the ReadWriteBackingMap a backing map such as a SafeHashMap or a LocalCache to store its data in.

Some backing maps are observable. The events coming from these backing maps are not usually of direct interest to the appication. Instead, Coherence translates them into actions that must be taken (by Coherence) to keep data in sync and properly backed up, and it also translates them when appropriate into clustered events that are delivered throughout the cluster as requested by application listeners. For example, if a partitioned cache has a LocalCache as its backing map, and the local cache expires an entry, that event causes Coherence to expire all of the backup copies of that entry. Furthermore, if any listeners have been registered on the partitioned cache, and if the event matches their event filter(s), then that event will be delivered to those listeners on the servers where those listeners were registered.

In some advanced use cases, an application needs to process events on the server where the data are being maintained, and it needs to do so on the structure (backing map) that is actually managing the data. In these cases, if the backing map is an observable map, a listener can be configured on the backing map or one can be programmatically added to the backing map. (If the backing map is not observable, it can be made observable by wrapping it in an WrapperObservableMap.)

For more information on this feature, see the API documentation for BackingMapManager.

Advanced: Synchronous Event Listeners

Some events are delivered asynchronously, so that application listeners do not disrupt the cache services that are generating the events. In some rare scenarios, asynchronous delivery can cause ambiguity of the ordering of events compared to the results of ongoing operations. To guarantee that the cache API operations and the events are ordered as if the local view of the clustered system were single-threaded, a MapListener must implement the SynchronousListener marker interface.

One example in Coherence itself that uses synchronous listeners is the Near Cache, which can use events to invalidate locally cached data ("Seppuku").

For more information on this feature, see the API documentation for SynchronousListener.

Summary

Coherence provides an extremely rich event model for caches, providing the means for an application to request the specific events it requires, and the means to have those events delivered only to those parts of the application that require them.

Unknown macro: {rate-table}


Automatically manage dynamic cluster membership


Overview

Coherence manages cluster membership, automatically adding new servers to the cluster when they start up and automatically detecting their departure when they are shut down or fail. Applications have full access to this information, and can sign up to receive event notifications when members join and leave the cluster. Coherence also tracks all the services that each member is providing and consuming, and uses this information to plan for service resiliency in case of server failure, and to load-balance data management and other responsibilities across all members of the cluster.

Cluster and Service objects

From any cache, the application can obtain a reference to the local representation of a cache's service. From any service, the application can obtain a reference to the local representation of the cluster.

CacheService service = cache.getCacheService();
Cluster      cluster = service.getCluster();

From the Cluster object, the application can determine the set of services that are running in the cluster:

for (Enumeration enum = cluster.getServiceNames(); enum.hasMoreElements(); )
    {
    String sName = (String) enum.nextElement();
    ServiceInfo info = cluster.getServiceInfo(sName);
    // ...
    }

The ServiceInfo object provides information about the service, including its name, type, version and membership.

For more information on this feature, see the API documentation for NamedCache, CacheService, Service, ServiceInfo and Cluster.

Member object

The primary information that an application can determine about each member in the cluster is:

  • The Member's IP address
  • What date/time the Member joined the cluster

As an example, if there are four servers in the cluster with each server running one copy ("instance") of the application and all four instances of the application are clustered together, then the cluster is composed of four Members. From the Cluster object, the application can determine what the local Member is:

Member memberThis = cluster.getLocalMember();

From the Cluster object, the application can also determine the entire set of cluster members:

Set setMembers = cluster.getMemberSet();

From the ServiceInfo object, the application can determine the set of cluster members that are participating in that service:

ServiceInfo info = cluster.getServiceInfo(sName);
Set setMembers = info.getMemberSet();

For more information on this feature, see the [API documentation for Member.

Listener interface and Event object

To listen to cluster and/or service membership changes, the application places a listener on the desired Service. As discussed before, the Service can come from a cache:

Service service = cache.getCacheService();

The Service can also be looked up by its name:

Service service = cluster.getService(sName);

To receive membership events, the application implements a MemberListener. For example, the following listener example prints out all the membership events that it receives:

public class MemberEventPrinter
        extends Base
        implements MemberListener
    {
    public void memberJoined(MemberEvent evt)
        {
        out(evt);
        }

    public void memberLeaving(MemberEvent evt)
        {
        out(evt);
        }

    public void memberLeft(MemberEvent evt)
        {
        out(evt);
        }
    }

The MemberEvent object carries information about the event type (joined / leaving / left), the member that generated the event, and the service that acts as the source of the event. Additionally, the event provides a method, isLocal(), that indicates to the application that it is this member that is joining or leaving the cluster. This is useful for recognizing soft restarts in which an application automatically rejoins a cluster after a failure occurs. For example:

public class RejoinEventPrinter
        extends Base
        implements MemberListener
    {
    public void memberJoined(MemberEvent evt)
        {
        if (evt.isLocal())
            {
            out("this member just rejoined the cluster: " + evt);
            }
        }

    public void memberLeaving(MemberEvent evt)
        {
        }

    public void memberLeft(MemberEvent evt)
        {
        }
    }

For more information on these feature, see the [API documentation for Service, MemberListener and MemberEvent.

Unknown macro: {rate-table}


Provide a Queryable Data Fabric


Overview

Oracle invented the concept of a data fabric with the introduction of the Coherence partitioned data management service in 2002. Since then, Forrester Research has labeled the combination of data virtualization, transparent and distributed EIS integration, queryability and uniform accessibility found in Coherence as an information fabric. The term fabric comes from a 2-dimensional illustration of interconnects, as in a switched fabric. The purpose of a fabric architecture is that all points within a fabric have a direct interconnect with all other points.

Data Fabric

An information fabric, or the more simple form called a data fabric or data grid, uses a switched fabric concept as the basis for managing data in a distributed environment. Also referred to as a dynamic mesh architecture, Coherence automatically and dynamically forms a reliable, increasingly resilient switched fabric composed of any number of servers within a grid environment. Consider the attributes and benefits of this architecture:

  • The aggregate data throughput of the fabric is linearly proportional to the number of servers;
  • The in-memory data capacity and data-indexing capacity of the fabric is linearly proportional to the number of servers;
  • The aggregate I/O throughput for disk-based overflow and disk-based storage of data is linearly proportional to the number of servers;
  • The resiliency of the fabric increases with the extent of the fabric, resulting in each server being responsible for only 1/n of the failover responsibility for a fabric with an extent of n servers;
  • If the fabric is servicing clients, such as trading systems, the aggregage maximum number of clients that can be served is linearly proportional to the number of servers.

Coherence accomplishes these technical feats through a variety of algorithms:

  • Coherence dynamically partitions data across all data fabric nodes;
  • Since each data fabric node has a configurable maximum amount of data that it will manage, the capacity of the data fabric is linearly proportional to the number of data fabric nodes;
  • Since the partitioning is automatic and load-balancing, each data fabric node ends up with its fair share of the data management responsibilities, allowing the throughput (in terms of network throughput, disk I/O throughput, query throughput, etc.) to scale linearly with the number of data fabric nodes;
  • Coherence maintains a configurable level of redundancy of data, automatically eliminating single points of failure (SPOFs) by ensuring that data is kept synchronously up-to-date in multiple data fabric nodes;
  • Coherence spreads out the responsibility for data redundancy in a dynamically load-balanced manner so that each server backs up a small amount of data from many other servers, instead of backing up all of the data from one particular server, thus amortizing the impact of a server failure across the entire data fabric;
  • Each data fabric node can handle a large number of client connections, which can be load-balanced by a hardware load balancer.

EIS and Database Integration

The Coherence information fabric can automatically load data on demand from an underlying database or EIS using automatic read-through functionality. If data in the fabric are modified, the same functionality allows that data to be synchronously updated in the database, or queued for asynchronous write-behind.

Coherence automatically partitions data access across the data fabric, resulting in load-balanced data accesses and efficient use of database and EIS connectivity. Furthermore, the read-ahead and write-behind capabilities can cut data access latencies to near-zero levels and insulate the application from temporary database and EIS failures.

Coherence solves the data bottleneck for large-scale compute grids

In large-scale compute grids, such as in DataSynapse financial grids and biotech grids, the bottleneck for most compute processes is in loading a data set and making it available to the compute engines that require it. By layering a Coherence data fabric onto (or beside) a compute grid, these data sets can be maintained in memory at all times, and Coherence can feed the data in parallel at close to wire speed to all of the compute nodes. In a large-scale deployment, Coherence can provide several thousand times the aggregate data throughput of the underlying data source.

Queryable

The Coherence information fabric supports querying from any server in the fabric or any client of the fabric. The queries can be performed using any criteria, including custom criteria such as XPath queries and full text searches. When Coherence partitioning is used to manage the data, the query is processed in parallel across the entire fabric (i.e. the query is also partitioned), resulting in an data query engine that can scale its throughput up to fabrics of thousands of servers. For example, in a trading system it is possible to query for all open "Order" objects for a particular trader:

NamedCache mapTrades = ...
Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid),
                              new EqualsFilter("getStatus", Status.OPEN));
Set setOpenTrades = mapTrades.entrySet(filter);

When an application queries for data from the fabric, the result is a point-in-time snapshot. Additionally, the query results can be kept up-to-date by placing a listener on the query itself or by using the Coherence Continuous Query feature.

Continuous Query

While it is possible to obtain a point in time query result from a Coherence data fabric, and it is possible to receive events that would change the result of that query, Coherence provides a feature that combines a query result with a continuous stream of related events that maintain the query result in a real-time fashion. This capability is called Continuous Query, because it has the same effect as if the desired query had zero latency and the query were repeated several times every millisecond!

Coherence implements Continuous Query using a combination of its data fabric parallel query capability and its real-time event-filtering and streaming. The result is support for thousands of client application instances, such as trading desktops. Using the previous trading system example, it can be converted to a Continuous Query with only one a single line of code changed:

NamedCache mapTrades = ...
Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid),
                              new EqualsFilter("getStatus", Status.OPEN));
NamedCache mapOpenTrades = new ContinuousQueryCache(mapTrades, filter);

The result of the Continuous Query is maintained locally, and optionally all of corresponding data can be cached locally as well.

Summary

Coherence is successfully deployed as a large-scale data fabric for many of the world's largest financial, telecommunications, logistics, travel and media organizations. With unlimited scalability, the highest levels of availability, close to zero latency, an incredibly rich set of capabilities and a sterling reputation for quality, Coherence is the Information Fabric of choice.

Unknown macro: {rate-table}


Provide a Data Grid


Overview

Coherence provides the ideal infrastructure for building Data Grid services, as well as the client and server-based applications that utilize a Data Grid. At a basic level, Coherence can manage an immense amount of data across a large number of servers in a grid; it can provide close to zero latency access for that data; it supports parallel queries across that data; and it supports integration with database and EIS systems that act as the system of record for that data. For more information on the infrastructure for the Data Grid features in Coherence, refer to the discussion on Data Fabric capabilities. Additionally, Coherence provides a number of services that are ideal for building effective data grids.

All of the Data Grid capabilities described below are features of Coherence Enterprise Edition and higher.

Targeted Execution

Coherence provides for the ability to execute an agent against an entry in any map of data managed by the Data Grid:

map.invoke(key, agent);

In the case of partitioned data, the agent executes on the grid node that owns the data to execute against. This means that the queueing, concurrency management, agent execution, data access by the agent and data modification by the agent all occur on that grid node. (Only the synchronous backup of the resultant data modification, if any, requires additional network traffic.) For many processing purposes, it is much more efficient to move the serialized form of the agent (usually only a few hundred bytes, at most) than to handle distributed concurrency control, coherency and data updates.

For request/response processing, the agent returns a result:

Object oResult = map.invoke(key, agent);

In other words, Coherence as a Data Grid will determine the location to execute the agent based on the configuration for the data topology, move the agent there, execute the agent (automatically handling concurrency control for the item while executing the agent), back up the modifications if any, and return a result.

Parallel Execution

Coherence additionally provides for the ability to execute an agent against an entire collection of entries. In a partitioned Data Grid, the execution occurs in parallel, meaning that the more nodes that are in the grid, the broader the work is load-balanced across the Data Grid:

map.invokeAll(collectionKeys, agent);

For request/response processing, the agent returns one result for each key processed:

Map mapResults = map.invokeAll(collectionKeys, agent);

In other words, Coherence determines the optimal location(s) to execute the agent based on the configuration for the data topology, moves the agent there, executes the agent (automatically handling concurrency control for the item(s) while executing the agent), backing up the modifications if any, and returning the coalesced results.

Query-Based Execution

As discussed in the queryable data fabric topic, Coherence supports the ability to query across the entire data grid. For example, in a trading system it is possible to query for all open "Order" objects for a particular trader:

NamedCache map    = CacheFactory.getCache("trades");
Filter     filter = new AndFilter(new EqualsFilter("getTrader", traderid),
                                  new EqualsFilter("getStatus", Status.OPEN));
Set setOpenTradeIds = mapTrades.keySet(filter);

By combining this feature with Parallel Execution in the data grid, Coherence provides for the ability to execute an agent against a query. As in the previous section, the execution occurs in parallel, and instead of returning the identities or entries that match the query, Coherence executes the agents against the entries:

map.invokeAll(filter, agent);

For request/response processing, the agent returns one result for each key processed:

Map mapResults = map.invokeAll(filter, agent);

In other words, Coherence combines its Parallel Query and its Parallel Execution together to achieve query-based agent invocation against a Data Grid.

Data-Grid-Wide Execution

Passing an instance of AlwaysFilter (or a null) to the invokeAll method will cause the passed agent to be executed against all entries in the InvocableMap:

map.invokeAll((Filter) null, agent);

As with the other types of agent invocation, request/response processing is supported:

Map mapResults = map.invokeAll((Filter) null, agent);

In other words, with a single line of code, an application can process all the data spread across a particular map in the Data Grid.

Agents for Targeted, Parallel and Query-Based Execution

An agent implements the EntryProcessor interface, typically by extending the AbstractProcessor class.

A number of agents are included with Coherence, including:

  • AbstractProcessor - an abstract base class for building an EntryProcessor
  • ExtractorProcessor - extracts and returns a specific value (such as a property value) from an object stored in an InvocableMap
  • CompositeProcessor - bundles together a collection of EntryProcessor objects that are invoked sequentially against the same Entry
  • ConditionalProcessor - conditionally invokes an EntryProcessor if a Filter against the Entry-to-process evaluates to true
  • PropertyProcessor - an abstract base class for EntryProcessor implementations that depend on a PropertyManipulator
  • NumberIncrementor - pre- or post-increments any property of a primitive integral type, as well as Byte, Short, Integer, Long, Float, Double, BigInteger, BigDecimal
  • NumberMultiplier - multiplies any property of a primitive integral type, as well as Byte, Short, Integer, Long, Float, Double, BigInteger, BigDecimal, and returns either the previous or new value

The EntryProcessor interface (contained within the InvocableMap interface) contains only two methods:

/**
* An invocable agent that operates against the Entry objects within a
* Map.
*/
public interface EntryProcessor
        extends Serializable
    {
    /**
    * Process a Map Entry.
    *
    * @param entry  the Entry to process
    *
    * @return the result of the processing, if any
    */
    public Object process(Entry entry);

    /**
    * Process a Set of InvocableMap Entry objects. This method is
    * semantically equivalent to:
    * <pre>
    *   Map mapResults = new ListMap();
    *   for (Iterator iter = setEntries.iterator(); iter.hasNext(); )
    *       {
    *       Entry entry = (Entry) iter.next();
    *       mapResults.put(entry.getKey(), process(entry));
    *       }
    *   return mapResults;
    * </pre>
    *
    * @param setEntries  a read-only Set of InvocableMap Entry objects to
    *                    process
    *
    * @return a Map containing the results of the processing, up to one
    *         entry for each InvocableMap Entry that was processed, keyed
    *         by the keys of the Map that were processed, with a
    *         corresponding value being the result of the processing for
    *         each key
    */
    public Map processAll(Set setEntries);
    }

(The AbstractProcessor implements the processAll method as described in the JavaDoc above.)

The InvocableMap.Entry that is passed to an EntryProcessor is an extension of the Map.Entry interface that allows an EntryProcessor implementation to obtain the necessary information about the entry and to make the necessary modifications in the most efficient manner possible:

/**
* An InvocableMap Entry contains additional information and exposes
* additional operations that the basic Map Entry does not. It allows
* non-existent entries to be represented, thus allowing their optional
* creation. It allows existent entries to be removed from the Map. It
* supports a number of optimizations that can ultimately be mapped
* through to indexes and other data structures of the underlying Map.
*/
public interface Entry
        extends Map.Entry
    {
    // ----- Map Entry interface ------------------------------------

    /**
    * Return the key corresponding to this entry. The resultant key does
    * not necessarily exist within the containing Map, which is to say
    * that <tt>InvocableMap.this.containsKey(getKey)</tt> could return
    * false. To test for the presence of this key within the Map, use
    * {@link #isPresent}, and to create the entry for the key, use
    * {@link #setValue}.
     *
    * @return the key corresponding to this entry; may be null if the
    *         underlying Map supports null keys
    */
    public Object getKey();

    /**
    * Return the value corresponding to this entry. If the entry does
    * not exist, then the value will be null. To differentiate between
    * a null value and a non-existent entry, use {@link #isPresent}.
    * <p/>
    * <b>Note:</b> any modifications to the value retrieved using this
    * method are not guaranteed to persist unless followed by a
    * {@link #setValue} or {@link #update} call.
    *
    * @return the value corresponding to this entry; may be null if the
    *         value is null or if the Entry does not exist in the Map
    */
    public Object getValue();

    /**
    * Store the value corresponding to this entry. If the entry does
    * not exist, then the entry will be created by invoking this method,
    * even with a null value (assuming the Map supports null values).
    *
    * @param oValue  the new value for this Entry
    *
    * @return the previous value of this Entry, or null if the Entry did
    *         not exist
    */
    public Object setValue(Object oValue);

    // ----- InvocableMap Entry interface ---------------------------

    /**
    * Store the value corresponding to this entry. If the entry does
    * not exist, then the entry will be created by invoking this method,
    * even with a null value (assuming the Map supports null values).
    * <p/>
    * Unlike the other form of {@link #setValue(Object) setValue}, this
    * form does not return the previous value, and as a result may be
    * significantly less expensive (in terms of cost of execution) for
    * certain Map implementations.
    *
    * @param oValue      the new value for this Entry
    * @param fSynthetic  pass true only if the insertion into or
    *                    modification of the Map should be treated as a
    *                    synthetic event
    */
    public void setValue(Object oValue, boolean fSynthetic);

    /**
    * Extract a value out of the Entry's value. Calling this method is
    * semantically equivalent to
    * <tt>extractor.extract(entry.getValue())</tt>, but this method may
    * be significantly less expensive because the resultant value may be
    * obtained from a forward index, for example.
    *
    * @param extractor  a ValueExtractor to apply to the Entry's value
    *
    * @return the extracted value
    */
    public Object extract(ValueExtractor extractor);

    /**
    * Update the Entry's value. C