Skip to content

Commit

Permalink
Fixed ehcache#41 Race Condition in JCacheManager
Browse files Browse the repository at this point in the history
Using Futures to reserve cache names before configuring them to prevent
a different thread from creating it at the same time, possibly with
broken configuration
  • Loading branch information
Yogu committed Mar 5, 2015
1 parent 4320c1c commit 29bc560
Showing 1 changed file with 105 additions and 40 deletions.
145 changes: 105 additions & 40 deletions ehcache-jcache/src/main/java/org/ehcache/jcache/JCacheManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

import javax.cache.Cache;
import javax.cache.CacheException;
Expand Down Expand Up @@ -60,7 +64,7 @@ public class JCacheManager implements javax.cache.CacheManager {
private final CacheManager cacheManager;
private final URI uri;
private final Properties props;
private final ConcurrentHashMap<String, JCache> allCaches = new ConcurrentHashMap<String, JCache>();
private final ConcurrentHashMap<String, Future<JCache>> allCaches = new ConcurrentHashMap<String, Future<JCache>>();
private volatile boolean closed = false;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final ConcurrentMap<JCache, JCacheManagementMXBean> cfgMXBeans = new ConcurrentHashMap<JCache, JCacheManagementMXBean>();
Expand Down Expand Up @@ -100,25 +104,47 @@ public <K, V, C extends Configuration<K, V>> Cache<K, V> createCache(final Strin
if(configuration == null) {
throw new NullPointerException();
}

return getOrPutCacheAtomically(cacheName, new Callable<JCache>() {
@Override
public JCache call() throws Exception {
return createCache0(cacheName, configuration);
}
});
}

private <K, V> JCache<K, V> getOrPutCacheAtomically(String cacheName, Callable<JCache> creator) {
FutureTask<JCache> myFuture = new FutureTask<JCache>(creator);

// Only configure the cache if it is the one that has been added to the map
Future<JCache> previousFuture = allCaches.putIfAbsent(cacheName, myFuture);
if (previousFuture != null) {
throw new CacheException("A cache called " + cacheName + " already exists in this CacheManager");
}

myFuture.run();

JCache<K, V> jCache = allCaches.get(cacheName);
if (jCache != null) {
throw new CacheException();
try {
return myFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CacheException("This thread has been interrupted while the cache has been configured", e);
} catch (ExecutionException e) {
throw new CacheException("An exception has occurred while configuring the cache", e);
}
}

private <K, V, C extends Configuration<K, V>> JCache<K, V> createCache0(String cacheName, C configuration) {
cacheManager.addCacheIfAbsent(new net.sf.ehcache.Cache(toEhcacheConfig(cacheName, configuration)));
Ehcache ehcache = cacheManager.getEhcache(cacheName);
final JCacheConfiguration<K, V> cfg = new JCacheConfiguration<K, V>(configuration);
jCache = new JCache<K, V>(this, cfg, ehcache);
JCache<K, V> previous = allCaches.putIfAbsent(cacheName, jCache);
if(previous != null) {
// todo validate config
return previous;
}

JCache jCache = new JCache<K, V>(this, cfg, ehcache);
if(cfg.isStatisticsEnabled()) {
enableStatistics(cacheName, true);
enableStatistics(true, jCache);
}
if(cfg.isManagementEnabled()) {
enableManagement(cacheName, true);
enableManagement(true, jCache);
}
return jCache;
}
Expand All @@ -129,25 +155,14 @@ public <K, V> Cache<K, V> getCache(final String cacheName, final Class<K> keyTyp
if(valueType == null) {
throw new NullPointerException();
}
JCache<K, V> jCache = allCaches.get(cacheName);
if(jCache != null) {
if(!keyType.isAssignableFrom(jCache.getConfiguration(CompleteConfiguration.class).getKeyType())) {
throw new ClassCastException();

JCache jCache = getOrPutCacheAtomically(cacheName, new Callable<JCache>() {
@Override
public JCache call() throws Exception {
return getCache0(cacheName, keyType, valueType);
}
if(!valueType.isAssignableFrom(jCache.getConfiguration(CompleteConfiguration.class).getValueType())) {
throw new ClassCastException();
}
return jCache;
}
final net.sf.ehcache.Cache cache = cacheManager.getCache(cacheName);
if (cache == null) {
return null;
}
jCache = new JCache<K, V>(this, new JCacheConfiguration<K, V>(null, null, keyType, valueType), cache);
final JCache<K, V> previous = allCaches.putIfAbsent(cacheName, jCache);
if(previous != null) {
jCache = previous;
}
});

if(!keyType.isAssignableFrom(jCache.getConfiguration(CompleteConfiguration.class).getKeyType())) {
throw new ClassCastException();
}
Expand All @@ -156,13 +171,24 @@ public <K, V> Cache<K, V> getCache(final String cacheName, final Class<K> keyTyp
}
return jCache;
}

private <K, V> JCache<K, V> getCache0(final String cacheName, final Class<K> keyType, final Class<V> valueType) {
final net.sf.ehcache.Cache cache = cacheManager.getCache(cacheName);
if (cache == null) {
// Can't create the, so retract the promise of creating it.
allCaches.remove(cacheName);
return null;
}

return new JCache<K, V>(this, new JCacheConfiguration<K, V>(null, null, keyType, valueType), cache);
}

@Override
public <K, V> Cache<K, V> getCache(final String cacheName) {
final JCache<K, V> jCache = allCaches.get(cacheName);
final JCache<K, V> jCache = getCacheIfExists(cacheName);
if(jCache == null) {
refreshAllCaches();
return allCaches.get(cacheName);
return getCacheIfExists(cacheName);
}
if(jCache.getConfiguration(CompleteConfiguration.class).getKeyType() != Object.class ||
jCache.getConfiguration(CompleteConfiguration.class).getValueType() != Object.class) {
Expand All @@ -171,6 +197,23 @@ public <K, V> Cache<K, V> getCache(final String cacheName) {
return jCache;
}

private JCache getCacheIfExists(final String cacheName) {
Future<JCache> future = allCaches.get(cacheName);
if (future == null) {
return null;
}

try {
return future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CacheException("This thread has been interrupted while another thread configured the requested cache", e);
} catch (ExecutionException e) {
throw new CacheException("An exception has occurred while another thread was configuring the requested cache", e);
}
}


@Override
public Iterable<String> getCacheNames() {
return Collections.unmodifiableSet(new HashSet<String>(allCaches.keySet()));
Expand All @@ -179,7 +222,7 @@ public Iterable<String> getCacheNames() {
@Override
public void destroyCache(final String cacheName) {
checkNotClosed();
final JCache jCache = allCaches.get(cacheName);
final JCache jCache = getCacheIfExists(cacheName);
if (jCache != null) {
jCache.close();
}
Expand All @@ -189,7 +232,7 @@ public void destroyCache(final String cacheName) {
public void enableManagement(final String cacheName, final boolean enabled) {
checkNotClosed();
if(cacheName == null) throw new NullPointerException();
final JCache jCache = allCaches.get(cacheName);
final JCache jCache = getCacheIfExists(cacheName);
if(jCache == null) {
throw new NullPointerException();
}
Expand Down Expand Up @@ -221,7 +264,7 @@ private void enableManagement(final boolean enabled, final JCache jCache) {
public void enableStatistics(final String cacheName, final boolean enabled) {
checkNotClosed();
if(cacheName == null) throw new NullPointerException();
final JCache jCache = allCaches.get(cacheName);
final JCache jCache = getCacheIfExists(cacheName);
if(jCache == null) {
throw new NullPointerException();
}
Expand Down Expand Up @@ -298,8 +341,14 @@ public void close() {

void shutdown() {
closed = true;
for (JCache jCache : allCaches.values()) {
jCache.close();
for (Future<JCache> future : allCaches.values()) {
// This will wait until caches being added are configured, then remove them
// closed is true, so no new caches can be added when we are here.
try {
future.get().close();
} catch (Exception e) {
// ignore according to the spec of CacheManager#close()
}
}
cacheManager.shutdown();
allCaches.clear();
Expand All @@ -325,7 +374,12 @@ private void refreshAllCaches() {
for (String s : cacheManager.getCacheNames()) {
final net.sf.ehcache.Cache cache = cacheManager.getCache(s);
if(cache != null) {
allCaches.put(s, new JCache(this, new JCacheConfiguration(cache.getCacheConfiguration()), cache));
allCaches.putIfAbsent(s, new FutureTask<JCache>(new Callable<JCache>() {
@Override
public JCache call() throws Exception {
return new JCache(JCacheManager.this, new JCacheConfiguration(cache.getCacheConfiguration()), cache);
}
}));
}
}
}
Expand Down Expand Up @@ -353,8 +407,19 @@ private void checkNotClosed() {
}

void shutdown(final JCache jCache) {
final JCache r = allCaches.remove(jCache.getName());
if (r == jCache) {
final Future<JCache> removedFuture = allCaches.remove(jCache.getName());
JCache removedCache;
try {
removedCache = removedFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (ExecutionException e) {
// the cache failed to configure, but it should be shut down anyway, so just ignore it
// The thread that configured the cache will get the exception of course, so it is not lost
return;
}
if (removedCache == jCache) {
enableStatistics(false, jCache);
enableManagement(false, jCache);
cacheManager.removeCache(jCache.getName());
Expand Down

0 comments on commit 29bc560

Please sign in to comment.