diff --git a/pom.xml b/pom.xml
index 4b55e76..06baf5a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -49,7 +49,7 @@
com.conversantmedia
disruptor
- 1.2.19
+ 1.2.15
true
@@ -113,6 +113,9 @@
org.apache.maven.plugins
maven-javadoc-plugin
2.9.1
+
+
+
attach-javadocs
diff --git a/src/main/java/cn/danielw/fop/Log.java b/src/main/java/cn/danielw/fop/Log.java
deleted file mode 100755
index 8f7a575..0000000
--- a/src/main/java/cn/danielw/fop/Log.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package cn.danielw.fop;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * @author Daniel
- */
-public class Log {
-
- private static Logger logger = Logger.getLogger("FOP");
-
- private static String getString(Throwable ex, Object ... objects) {
- StringBuilder sb = new StringBuilder();
- for (Object object : objects)
- sb.append(object);
- sb.append(", ");
- StringWriter writer = new StringWriter();
- ex.printStackTrace(new PrintWriter(writer));
- sb.append(writer.toString());
- return sb.toString();
- }
-
- public static boolean isDebug() {
- return logger.isLoggable(Level.FINE);
- }
-
- private static String getString(Object ... objects) {
- if (objects.length > 1) {
- StringBuilder sb = new StringBuilder();
- for (Object object : objects)
- sb.append(object);
- return sb.toString();
- } else {
- return objects[0].toString();
- }
- }
-
- public static void debug(Object ... objects) {
- if (logger.isLoggable(Level.FINE)) {
- logger.fine(getString(objects));
- }
- }
-
- public static void info(Object ... objects) {
- if (logger.isLoggable(Level.INFO)) {
- logger.info(getString(objects));
- }
- }
-
- public static void error(Object... objects) {
- if (logger.isLoggable(Level.SEVERE)) {
- logger.severe(getString(objects));
- }
- }
-
- public static void error(Exception ex, Object... objects) {
- if (logger.isLoggable(Level.SEVERE)) {
- logger.severe(getString(ex, objects));
- }
- }
-
-}
diff --git a/src/main/java/cn/danielw/fop/ObjectPool.java b/src/main/java/cn/danielw/fop/ObjectPool.java
index 1015090..9a04307 100755
--- a/src/main/java/cn/danielw/fop/ObjectPool.java
+++ b/src/main/java/cn/danielw/fop/ObjectPool.java
@@ -3,12 +3,16 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* @author Daniel
*/
public class ObjectPool {
+ private static final Logger logger = Logger.getLogger(ObjectPool.class.getCanonicalName());
+
private final PoolConfig config;
private final ObjectFactory factory;
private final ObjectPoolPartition[] partitions;
@@ -32,13 +36,25 @@ protected BlockingQueue> createBlockingQueue(PoolConfig poolConfig)
return new ArrayBlockingQueue<>(poolConfig.getMaxSize());
}
+ /**
+ * borrow an object from the pool. the call will be blocked for at most PoolConfig.maxWaitMilliseconds
+ * before throwing an Exception
+ * @return the object
+ */
public Poolable borrowObject() {
return borrowObject(true);
}
- public Poolable borrowObject(boolean blocking) {
+ /**
+ * borrow an object from the pool
+ * @param noTimeout if true, the call will be blocked until one is available;
+ * if false, the call will be blocked for at most PoolConfig.maxWaitMilliseconds
+ * before throwing an Exception
+ * @return the object
+ */
+ public Poolable borrowObject(boolean noTimeout) {
for (int i = 0; i < 3; i++) { // try at most three times
- Poolable result = getObject(blocking);
+ Poolable result = getObject(noTimeout);
if (factory.validate(result.getObject())) {
return result;
} else {
@@ -48,7 +64,7 @@ public Poolable borrowObject(boolean blocking) {
throw new RuntimeException("Cannot find a valid object");
}
- private Poolable getObject(boolean blocking) {
+ private Poolable getObject(boolean noTimeout) {
if (shuttingDown) {
throw new IllegalStateException("Your pool is shutting down");
}
@@ -59,7 +75,7 @@ private Poolable getObject(boolean blocking) {
// increase objects and return one, it will return null if reach max size
subPool.increaseObjects(1);
try {
- if (blocking) {
+ if (noTimeout) {
freeObject = subPool.getObjectQueue().take();
} else {
freeObject = subPool.getObjectQueue().poll(config.getMaxWaitMilliseconds(), TimeUnit.MILLISECONDS);
@@ -79,9 +95,9 @@ public void returnObject(Poolable obj) {
ObjectPoolPartition subPool = this.partitions[obj.getPartition()];
try {
subPool.getObjectQueue().put(obj);
- if (Log.isDebug())
- Log.debug("return object: queue size:", subPool.getObjectQueue().size(),
- ", partition id:", obj.getPartition());
+ if (logger.isLoggable(Level.FINE))
+ logger.fine("return object: queue size:" + subPool.getObjectQueue().size() +
+ ", partition id:" + obj.getPartition());
} catch (InterruptedException e) {
throw new RuntimeException(e); // impossible for now, unless there is a bug, e,g. borrow once but return twice.
}
@@ -115,9 +131,10 @@ public void run() {
int partition = 0;
while (!ObjectPool.this.shuttingDown) {
try {
+ //noinspection BusyWait
Thread.sleep(config.getScavengeIntervalMilliseconds());
partition = ++partition % config.getPartitionSize();
- Log.debug("scavenge sub pool ", partition);
+ logger.fine("scavenge sub pool " + partition);
partitions[partition].scavenge();
} catch (InterruptedException ignored) {
}
diff --git a/src/main/java/cn/danielw/fop/ObjectPoolPartition.java b/src/main/java/cn/danielw/fop/ObjectPoolPartition.java
index 0b5dbba..f034c3d 100755
--- a/src/main/java/cn/danielw/fop/ObjectPoolPartition.java
+++ b/src/main/java/cn/danielw/fop/ObjectPoolPartition.java
@@ -2,12 +2,16 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* @author Daniel
*/
public class ObjectPoolPartition {
+ private static final Logger logger = Logger.getLogger(ObjectPoolPartition.class.getName());
+
private final ObjectPool pool;
private final PoolConfig config;
private final int partition;
@@ -41,8 +45,8 @@ public synchronized int increaseObjects(int delta) {
objectQueue.put(new Poolable<>(objectFactory.create(), pool, partition));
}
totalCount += delta;
- if (Log.isDebug())
- Log.debug("increase objects: count=", totalCount, ", queue size=", objectQueue.size());
+ if (logger.isLoggable(Level.FINE))
+ logger.fine("increase objects: count=" + totalCount + ", queue size=" + objectQueue.size());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@@ -69,8 +73,8 @@ public synchronized void scavenge() throws InterruptedException {
while (delta-- > 0 && (obj = objectQueue.poll()) != null) {
// performance trade off: delta always decrease even if the queue is empty,
// so it could take several intervals to shrink the pool to the configured min value.
- if (Log.isDebug())
- Log.debug("obj=", obj, ", now-last=", now - obj.getLastAccessTs(), ", max idle=",
+ if (logger.isLoggable(Level.FINE))
+ logger.fine("obj=" + obj + ", now-last=" + (now - obj.getLastAccessTs()) + ", max idle=" +
config.getMaxIdleMilliseconds());
if (now - obj.getLastAccessTs() > config.getMaxIdleMilliseconds() &&
ThreadLocalRandom.current().nextDouble(1) < config.getScavengeRatio()) {
@@ -80,7 +84,7 @@ public synchronized void scavenge() throws InterruptedException {
objectQueue.put(obj); //put it back
}
}
- if (removed > 0) Log.debug(removed, " objects were scavenged.");
+ if (removed > 0) logger.fine(removed + " objects were scavenged.");
}
public synchronized int shutdown() {
diff --git a/src/main/java/cn/danielw/fop/PoolConfig.java b/src/main/java/cn/danielw/fop/PoolConfig.java
index 8cb28f0..fc303cc 100755
--- a/src/main/java/cn/danielw/fop/PoolConfig.java
+++ b/src/main/java/cn/danielw/fop/PoolConfig.java
@@ -17,7 +17,15 @@ public int getMaxWaitMilliseconds() {
return maxWaitMilliseconds;
}
+ /**
+ * this is only used for blocking call to borrowObject(true)
.
+ * @param maxWaitMilliseconds how long to block
+ * @return the pool config
+ */
public PoolConfig setMaxWaitMilliseconds(int maxWaitMilliseconds) {
+ if (maxWaitMilliseconds <= 0) {
+ throw new IllegalArgumentException("Cannot set max wait time to a negative number " + maxWaitMilliseconds);
+ }
this.maxWaitMilliseconds = maxWaitMilliseconds;
return this;
}
@@ -65,8 +73,13 @@ public int getScavengeIntervalMilliseconds() {
/**
* @param scavengeIntervalMilliseconds set it to zero if you don't want to automatically shrink your pool.
* This is useful for fixed-size pool, or pools don't increase too much.
+ * @return the pool config
*/
public PoolConfig setScavengeIntervalMilliseconds(int scavengeIntervalMilliseconds) {
+ if (scavengeIntervalMilliseconds < 5000) {
+ throw new IllegalArgumentException("Cannot set interval too short (" + scavengeIntervalMilliseconds +
+ "), must be at least 5 seconds");
+ }
this.scavengeIntervalMilliseconds = scavengeIntervalMilliseconds;
return this;
}
@@ -78,6 +91,7 @@ public double getScavengeRatio() {
/**
* Each time we shrink a pool, we only scavenge some of the objects to avoid an empty pool
* @param scavengeRatio must be a double between (0, 1]
+ * @return the pool config
*/
public PoolConfig setScavengeRatio(double scavengeRatio) {
if (scavengeRatio <= 0 || scavengeRatio > 1) {
diff --git a/src/test/java/TestObjectPool.java b/src/test/java/TestObjectPool.java
index 122ffeb..c40a47d 100755
--- a/src/test/java/TestObjectPool.java
+++ b/src/test/java/TestObjectPool.java
@@ -14,12 +14,12 @@
public class TestObjectPool {
- public ObjectPool init(double scavengeRatio) {
+ public ObjectPool init(double scavengeRatio) {
Logger.getLogger("").getHandlers()[0].setLevel(Level.ALL);
Logger.getLogger("").setLevel(Level.ALL);
PoolConfig config = new PoolConfig();
config.setPartitionSize(2).setMinSize(2).setMaxSize(20).setMaxIdleMilliseconds(5000).
- setScavengeIntervalMilliseconds(5000).setScavengeRatio(scavengeRatio);
+ setMaxWaitMilliseconds(100).setScavengeIntervalMilliseconds(5000).setScavengeRatio(scavengeRatio);
ObjectFactory factory = new ObjectFactory() {
@Override
@@ -36,12 +36,12 @@ public boolean validate(StringBuilder o) {
return true;
}
};
- return new ObjectPool(config, factory);
+ return new ObjectPool<>(config, factory);
}
@Test
- public void testSimple() throws InterruptedException {
- ObjectPool pool = init(1.0);
+ public void testSimple() {
+ ObjectPool pool = init(1.0);
for (int i = 0; i < 100; i++) {
try (Poolable obj = pool.borrowObject()) {
obj.getObject().append("x");
@@ -53,7 +53,7 @@ public void testSimple() throws InterruptedException {
@Test
public void testShrink() throws InterruptedException {
- final ObjectPool pool = init(1.0);
+ final ObjectPool pool = init(1.0);
List> borrowed = new ArrayList<>();
for (int i = 0; i < 10; i++) {
System.out.println("test borrow");
@@ -75,19 +75,13 @@ public void testShrink() throws InterruptedException {
System.out.println("scavenged, pool size=" + pool.getSize());
// test return after shutdown
- Thread testThread = new Thread() {
- @Override
- public void run() {
- Poolable obj = pool.borrowObject();
- try {
- System.out.println("pool size:" + pool.getSize());
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- }
- pool.returnObject(obj);
- System.out.println("pool size:" + pool.getSize());
- }
- };
+ Thread testThread = new Thread(() -> {
+ Poolable obj = pool.borrowObject();
+ System.out.println("pool size:" + pool.getSize());
+ try { Thread.sleep(10000); } catch (InterruptedException ignored) { }
+ pool.returnObject(obj);
+ System.out.println("pool size:" + pool.getSize());
+ });
testThread.start();
testThread.join();
int removed = pool.shutdown(); // this will block 9 seconds