EhCache + JGroups give “Exception on flushing of replication queue: null”
-
29-04-2021 - |
题
I'm trying to configure EhCache with JGroups-based replication, but I get log flooded with the following exception as soon as first element is added to the cache:
12061 [Replication Thread] ERROR net.sf.ehcache.distribution.RMIAsynchronousCacheReplicator - Exception on flushing of replication queue: null. Continuing...
java.lang.NullPointerException
at net.sf.ehcache.distribution.RMISynchronousCacheReplicator.listRemoteCachePeers(RMISynchronousCacheReplicator.java:335)
at net.sf.ehcache.distribution.RMIAsynchronousCacheReplicator.flushReplicationQueue(RMIAsynchronousCacheReplicator.java:299)
at net.sf.ehcache.distribution.RMIAsynchronousCacheReplicator.replicationThreadMain(RMIAsynchronousCacheReplicator.java:119)
at net.sf.ehcache.distribution.RMIAsynchronousCacheReplicator.access$100(RMIAsynchronousCacheReplicator.java:57)
at net.sf.ehcache.distribution.RMIAsynchronousCacheReplicator$ReplicationThread.run(RMIAsynchronousCacheReplicator.java:371)
ehcache.xml is like this:
<?xml version="1.0" encoding="UTF-8"?>
<ehcache
updateCheck="true"
monitoring="autodetect"
defaultTransactionTimeoutInSeconds="30"
dynamicConfig="true">
<cacheManagerPeerProviderFactory
class="net.sf.ehcache.distribution.jgroups.JGroupsCacheManagerPeerProviderFactory"
properties="jgroups.xml"
/>
<defaultCache
maxElementsInMemory="200"
eternal="false"
statistics="true"
timeToIdleSeconds="86400"
timeToLiveSeconds="86400"
overflowToDisk="false">
<cacheEventListenerFactory
class="net.sf.ehcache.distribution.RMICacheReplicatorFactory"
properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true"
/>
<bootstrapCacheLoaderFactory class="net.sf.ehcache.distribution.RMIBootstrapCacheLoaderFactory" />
</defaultCache>
</ehcache>
jgroups.xml
is like this:
<?xml version='1.0'?>
<config>
<TCP start_port="7800" />
<TCPPING
timeout="3000"
initial_hosts="localhost[7800],localhost[7800]"
port_range="10"
num_initial_members="2" />
<VERIFY_SUSPECT timeout="1500" />
<pbcast.NAKACK
use_mcast_xmit="false"
gc_lag="100"
retransmit_timeout="300,600,1200,2400,4800"
discard_delivered_msgs="true" />
<pbcast.STABLE
stability_delay="1000"
desired_avg_gossip="50000"
max_bytes="400000" />
<pbcast.GMS
print_local_addr="true"
join_timeout="5000"
shun="false"
view_bundling="true" />
</config>
Using jgroups version 2.8.1.GA, ehcache-core version 2.5.1, ehcache-jgroupsreplication version 1.5.
What am I doing wrong?
UPDATE: When I change to replicateAsynchronously=false
I get the following exception:
Exception in thread "main" java.lang.NullPointerException
at net.sf.ehcache.distribution.RMISynchronousCacheReplicator.listRemoteCachePeers(RMISynchronousCacheReplicator.java:335)
at net.sf.ehcache.distribution.RMISynchronousCacheReplicator.replicatePutNotification(RMISynchronousCacheReplicator.java:145)
at net.sf.ehcache.distribution.RMISynchronousCacheReplicator.notifyElementPut(RMISynchronousCacheReplicator.java:132)
at net.sf.ehcache.event.RegisteredEventListeners.notifyListener(RegisteredEventListeners.java:294)
at net.sf.ehcache.event.RegisteredEventListeners.invokeListener(RegisteredEventListeners.java:284)
at net.sf.ehcache.event.RegisteredEventListeners.internalNotifyElementPut(RegisteredEventListeners.java:144)
at net.sf.ehcache.event.RegisteredEventListeners.notifyElementPut(RegisteredEventListeners.java:122)
at net.sf.ehcache.Cache.notifyPutInternalListeners(Cache.java:1515)
at net.sf.ehcache.Cache.putInternal(Cache.java:1490)
at net.sf.ehcache.Cache.put(Cache.java:1417)
at net.sf.ehcache.Cache.put(Cache.java:1382)
UPDATE 2: Issue is created in Terracota's JIRA: https://jira.terracotta.org/jira/browse/EHC-927
解决方案
As pointed out by Chris in EHC927 I was using wrong cacheEventListenerFactory class. It should be net.sf.ehcache.distribution.jgroups.JGroupsCacheReplicatorFactory
instead of net.sf.ehcache.distribution.RMICacheReplicatorFactory
.
其他提示
I've checked the source code for RMIAsynchronousCacheReplicator class
There is something not right when flushReplicationQueue()
is called; it should also check for replicationQueue != null
, not just replicationQueue.size() == 0
. Just like what it does to test the thread's alive()
in the while loop...
It cannot flush an object if the object does not exist or is not initialized...how can it know the object is empty or not if it does not even exist or is not initialized? Simply catching NullPointerException
is not a nice way tell the user about it!
/**
* RemoteDebugger method for the replicationQueue thread.
* <p/>
* Note that the replicationQueue thread locks the cache for the entire time it is writing elements to the disk.
*/
private void replicationThreadMain() {
while (true) {
// Wait for elements in the replicationQueue
while (alive() && replicationQueue != null && replicationQueue.size() == 0) {
try {
Thread.sleep(asynchronousReplicationInterval);
} catch (InterruptedException e) {
LOG.debug("Spool Thread interrupted.");
return;
}
}
if (notAlive()) {
return;
}
try {
if (replicationQueue.size() != 0) {
flushReplicationQueue();
}
} catch (Throwable e) {
LOG.error("Exception on flushing of replication queue: " + e.getMessage() + ". Continuing...", e);
}
}
}
The intent of the code simply to avoid CPU idle time to jump to 50% when the thread does nothing in the while loop which it may lead the user to believe something is not right with Encache if the CPU usage evolves around 50% all the time...
Probably, you need to add the property asynchronousReplicationInterval
with a small value (100 ms to 150 ms) so that replication queue can be built. Append it as follows:
properties="replicateAsynchronously=true,
replicatePuts=true,
replicateUpdates=true,
replicateUpdatesViaCopy=true,
replicateRemovals=true,
asynchronousReplicationInterval=100"
It may be needed in the RMIAsynchronousCacheReplicator constructor below:
/**
* Constructor for internal and subclass use
*/
public RMIAsynchronousCacheReplicator(
boolean replicatePuts,
boolean replicatePutsViaCopy,
boolean replicateUpdates,
boolean replicateUpdatesViaCopy,
boolean replicateRemovals,
int asynchronousReplicationInterval) {
super(replicatePuts,
replicatePutsViaCopy,
replicateUpdates,
replicateUpdatesViaCopy,
replicateRemovals);
this.asynchronousReplicationInterval = asynchronousReplicationInterval;
status = Status.STATUS_ALIVE;
replicationThread.start();
}
Maybe, you can just ignore the problem for the time being and let someone else report the bug if it is even considered a bug...I wonder why it says "Continuing..." later...