Tuesday, January 12, 2010

Notes on Oracle Coherence

Oracle Coherence is a distributed cache that functionally comparable to Memcached. On top of the basic cache API function, it has some additional capabilities that is attractive for building large scale enterprise applications.

The API is based on the Java Map (Hashtable) Interface, which provides a key/value store semantics where the value can be any Java Serializable object. Coherence allows data stored in multiple caches identified by a unique name (which they called a "named cache").

Below code examples are extracted from the great presentation from Brian Oliver of Oracle

The common usage pattern is to first locate a cache by its name, and then act on the cache.

Basic cache function (Map, JCache)
  • Get data by key
  • Update data by key
  • Remove data by key
NamedCache nc = CacheFactory.getCache("mine");

Object previous = nc.put("key", "hello world");

Object current = nc.get("key");

int size = nc.size();

Object value = nc.remove("key");

Set keys = nc.keySet();

Set entries = nc.entrySet();

boolean exists = nc.containsKey("key");

Cache Modification Event Listener (ObservableMap)

You can register an event listener on a cache to catch certain change events happen within the cache.
  • New cache item is inserted
  • Existing cache item is deleted
  • Existing cache item is updated
NamedCache nc = CacheFactory.getCache("stocks");

nc.addMapListener(new MapListener() {
public void onInsert(MapEvent mapEvent) {
...
}

public void onUpdate(MapEvent mapEvent) {
...
}

public void onDelete(MapEvent mapEvent) {
...
}
});


View of Filtered Cache (QueryMap)

You can also define a "view" by providing a "filter" which is basically a boolean function. Only items that is evaluated to be true by this function will be visible in this view.

NamedCache nc = CacheFactory.getCache("people");

Set keys =
nc.keySet(new LikeFilter("getLastName", "%Stone%"));

Set entries =
nc.entrySet( new EqualsFilter("getAge", 35));


Continuous Query Support (ContinuousQueryCache)

The view can also be used as a "continuous query". All new coming data that fulfilled the filter criteria will be included automatically in the view.

NamedCache nc = CacheFactory.getCache("stocks");

NamedCache expensiveItems =
new ContinuousQueryCache(nc,
new GreaterThan("getPrice", 1000));


Parallel Query Support (InvocableMap)

We can also spread a query execution and partial aggregation across all nodes and have them execute in parallel, followed by the final aggregation.
NamedCache nc = CacheFactory.getCache("stocks");

Double total =
(Double)nc.aggregate(AlwaysFilter.INSTANCE,
new DoubleSum("getQuantity"));

Set symbols =
(Set)nc.aggregate(new EqualsFilter("getOwner", "Larry"),
new DistinctValue("getSymbol"));


Parallel Execution Processing Support (InvocableMap)

We can ship a piece of processing logic to all nodes which will execute the processing in parallel
NamedCache nc = CacheFactory.getCache("stocks");

nc.invokeAll(new EqualsFilter("getSymbol", "ORCL"),
new StockSplitProcessor());

class StockSplitProcessor extends AbstractProcessor {
Object process(Entry entry) {
Stock stock = (Stock)entry.getValue();
stock.quantity *= 2;
entry.setValue(stock);
return null;
}
}


Implementation Architecture

Oracle Coherence runs on a cluster of identical server machines connected via a network. Within each server, there are multiple layers of software provide a unified data storage and processing abstraction over a distributed environment.


Smart Data Proxy

Application typically runs within a node of the cluster as well. The cache interface is implemented by a set of smart data proxy which knows the location of master (primary) and slave (backup) copy of data based on its key.

Read through with 2 level cache

When the client "read" data from the proxy, it first try to find the data in a local cache (also called the "near cache" within the same machine). If it is not found, the smart proxy will then locate the distributed cache for the corresponding copy (also called the L2 cache). Since this is a read, either a master or a slave copy is fine. If the smart proxy wouldn't find data from the distributed cache, it will lookup data from the backend DB. The return data will then propagate back to the client and the cache will be populated.

Master/Slave data partitioning

Updating data (insert, update, delete) is done in the reverse way. Under the master/slave architecture, all updates will go to the corresponding master node that owns that piece of data. Coherence support two modes of update; "Write through" and "Write behind". "Write through" will update the DB backend immediately after updating the master copy, but before updating the slave copy, and therefore keep the DB always up to date. "Write behind" will update the slave copy and then the DB in an asynchronous fashion. Data lost is possible in "write behind" mode, which has a higher throughput because multiple write can be merge in a single write, resulting in a fewer number of writes.

Moving processing logic towards data

While extracting data from the cache to the application is a typical way of processing data, it is not very scalable when large volume of data is required to be processed. Instead of shipping the data to the processing logic, a much more efficient way is to ship the processing logic to where the data is residing. This is exactly why Oracle Coherence provide an invocableMap interface where the client can provide a "processor" class that get shipped to every node where processing can be conducted with local data. Moving code towards the data dstributed across many nodes also enable parallel processing because now every node can conduct local processing in parallel.

The processor logic is shipped into the processing queue of the execution node, which has an active processor dequeue the processor object and execute it. Notice that this execution is performed in a serial manner, in other words, the processor will completely finished a processing job before proceeding to the next job. There is no worry about multi-threading issue and no need to use locks, and therefore no dead lock issue.

8 comments:

J Chris A said...

the processor logic sounds a lot like map reduce

thanks for the informative write up!

Ricky Ho said...

I think they are more primitive to map / reduce. They are equivalent to a (map + combiner) but no reducer. I don't see any shuffling/sorting

Unknown said...

say in a situation when a "job" on some data gets distributed across many nodes how can i ensure that originally invoking thread won't proceed to the next line of code just because it has already finished with all its own data (data on this particular node). i assume invokableMap works asynch. and what i'm asking is how can i implement something like CyclicBarrier in JAVA

Ricky Ho said...

InvocableMap is synchronous from the caller's perspective. Although the processing happens in parallel across many nodes, it won't return until all nodes have completed the processing.

StorageCraft said...

These notes are of great help to me. I was actually making a project on it and now i think that i will get good appreciation for it.

Suramya said...

Hi,
I'm using extended client architecture for CRUD operations. Data addition/modification can be happen at any time and many readers will read data while we are doing additon/updation. Therefore we needs to use locking. But for extended clients coherence cache->lock(key,-1) will give true every time even one process thread has already gained the lock. Please share your comments on it.

Sanjay Garothaya said...

Hi,
I have some doubt.
Do you want to say
1.local cache (near cache) have
A.master (primary) and B.slave (backup) copy
As well as
2.Distributed cache (L2 cache) have A.master (primary) and B.slave (backup) copy

i.e there are four copy of each object 2 copy in master and 2 copy in slave

if that is case do't you fell if have lots of data i need 4X time space

Manh-Kiet Yap said...

@Sanjay - The local cache used as the front-scheme in the near-cache model does not have primary/backup object. As the name states it's only for local, and if the local jvm fails, you lose the object.... But when you start it back, the local cache will be re-populated (when you issue the .get(key) method) from the distributed cache in the back-scheme. So you have 3 copies, not 4.

The idea is to limit the network hops when you know the object is being read/mutated by yourself (i mean your local jvm). The typical situation is when you maintain your HTTP session with a sticky load-balancer where all the request related to the same session always goes to the same app server.

So basically Coherence gives you the option to either optimize your network usage or memory usage. It's your choice. But the bottom line is - you don't need to take care of if while you code, as it just work in the background transparently