Skip to content

Commit

Permalink
remove Log to simplify code and add more docs
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielYWoo committed Oct 14, 2020
1 parent 8174d16 commit 9f8a23c
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 98 deletions.
5 changes: 4 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<dependency>
<groupId>com.conversantmedia</groupId>
<artifactId>disruptor</artifactId>
<version>1.2.19</version>
<version>1.2.15</version>
<!-- This is optional, if you want the best performance you need to manually add disruptor into your dependency -->
<optional>true</optional>
</dependency>
Expand Down Expand Up @@ -113,6 +113,9 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<configuration>
<source>8</source>
</configuration>
<executions>
<execution>
<id>attach-javadocs</id>
Expand Down
65 changes: 0 additions & 65 deletions src/main/java/cn/danielw/fop/Log.java

This file was deleted.

33 changes: 25 additions & 8 deletions src/main/java/cn/danielw/fop/ObjectPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* @author Daniel
*/
public class ObjectPool<T> {

private static final Logger logger = Logger.getLogger(ObjectPool.class.getCanonicalName());

private final PoolConfig config;
private final ObjectFactory<T> factory;
private final ObjectPoolPartition<T>[] partitions;
Expand All @@ -32,13 +36,25 @@ protected BlockingQueue<Poolable<T>> createBlockingQueue(PoolConfig poolConfig)
return new ArrayBlockingQueue<>(poolConfig.getMaxSize());
}

/**
* borrow an object from the pool. the call will be blocked for at most <code>PoolConfig.maxWaitMilliseconds</code>
* before throwing an Exception
* @return the object
*/
public Poolable<T> borrowObject() {
return borrowObject(true);
}

public Poolable<T> borrowObject(boolean blocking) {
/**
* borrow an object from the pool
* @param noTimeout if true, the call will be blocked until one is available;
* if false, the call will be blocked for at most <code>PoolConfig.maxWaitMilliseconds</code>
* before throwing an Exception
* @return the object
*/
public Poolable<T> borrowObject(boolean noTimeout) {
for (int i = 0; i < 3; i++) { // try at most three times
Poolable<T> result = getObject(blocking);
Poolable<T> result = getObject(noTimeout);
if (factory.validate(result.getObject())) {
return result;
} else {
Expand All @@ -48,7 +64,7 @@ public Poolable<T> borrowObject(boolean blocking) {
throw new RuntimeException("Cannot find a valid object");
}

private Poolable<T> getObject(boolean blocking) {
private Poolable<T> getObject(boolean noTimeout) {
if (shuttingDown) {
throw new IllegalStateException("Your pool is shutting down");
}
Expand All @@ -59,7 +75,7 @@ private Poolable<T> getObject(boolean blocking) {
// increase objects and return one, it will return null if reach max size
subPool.increaseObjects(1);
try {
if (blocking) {
if (noTimeout) {
freeObject = subPool.getObjectQueue().take();
} else {
freeObject = subPool.getObjectQueue().poll(config.getMaxWaitMilliseconds(), TimeUnit.MILLISECONDS);
Expand All @@ -79,9 +95,9 @@ public void returnObject(Poolable<T> obj) {
ObjectPoolPartition<T> subPool = this.partitions[obj.getPartition()];
try {
subPool.getObjectQueue().put(obj);
if (Log.isDebug())
Log.debug("return object: queue size:", subPool.getObjectQueue().size(),
", partition id:", obj.getPartition());
if (logger.isLoggable(Level.FINE))
logger.fine("return object: queue size:" + subPool.getObjectQueue().size() +
", partition id:" + obj.getPartition());
} catch (InterruptedException e) {
throw new RuntimeException(e); // impossible for now, unless there is a bug, e,g. borrow once but return twice.
}
Expand Down Expand Up @@ -115,9 +131,10 @@ public void run() {
int partition = 0;
while (!ObjectPool.this.shuttingDown) {
try {
//noinspection BusyWait
Thread.sleep(config.getScavengeIntervalMilliseconds());
partition = ++partition % config.getPartitionSize();
Log.debug("scavenge sub pool ", partition);
logger.fine("scavenge sub pool " + partition);
partitions[partition].scavenge();
} catch (InterruptedException ignored) {
}
Expand Down
14 changes: 9 additions & 5 deletions src/main/java/cn/danielw/fop/ObjectPoolPartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* @author Daniel
*/
public class ObjectPoolPartition<T> {

private static final Logger logger = Logger.getLogger(ObjectPoolPartition.class.getName());

private final ObjectPool<T> pool;
private final PoolConfig config;
private final int partition;
Expand Down Expand Up @@ -41,8 +45,8 @@ public synchronized int increaseObjects(int delta) {
objectQueue.put(new Poolable<>(objectFactory.create(), pool, partition));
}
totalCount += delta;
if (Log.isDebug())
Log.debug("increase objects: count=", totalCount, ", queue size=", objectQueue.size());
if (logger.isLoggable(Level.FINE))
logger.fine("increase objects: count=" + totalCount + ", queue size=" + objectQueue.size());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand All @@ -69,8 +73,8 @@ public synchronized void scavenge() throws InterruptedException {
while (delta-- > 0 && (obj = objectQueue.poll()) != null) {
// performance trade off: delta always decrease even if the queue is empty,
// so it could take several intervals to shrink the pool to the configured min value.
if (Log.isDebug())
Log.debug("obj=", obj, ", now-last=", now - obj.getLastAccessTs(), ", max idle=",
if (logger.isLoggable(Level.FINE))
logger.fine("obj=" + obj + ", now-last=" + (now - obj.getLastAccessTs()) + ", max idle=" +
config.getMaxIdleMilliseconds());
if (now - obj.getLastAccessTs() > config.getMaxIdleMilliseconds() &&
ThreadLocalRandom.current().nextDouble(1) < config.getScavengeRatio()) {
Expand All @@ -80,7 +84,7 @@ public synchronized void scavenge() throws InterruptedException {
objectQueue.put(obj); //put it back
}
}
if (removed > 0) Log.debug(removed, " objects were scavenged.");
if (removed > 0) logger.fine(removed + " objects were scavenged.");
}

public synchronized int shutdown() {
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/cn/danielw/fop/PoolConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@ public int getMaxWaitMilliseconds() {
return maxWaitMilliseconds;
}

/**
* this is only used for blocking call to <code>borrowObject(true)</code>.
* @param maxWaitMilliseconds how long to block
* @return the pool config
*/
public PoolConfig setMaxWaitMilliseconds(int maxWaitMilliseconds) {
if (maxWaitMilliseconds <= 0) {
throw new IllegalArgumentException("Cannot set max wait time to a negative number " + maxWaitMilliseconds);
}
this.maxWaitMilliseconds = maxWaitMilliseconds;
return this;
}
Expand Down Expand Up @@ -65,8 +73,13 @@ public int getScavengeIntervalMilliseconds() {
/**
* @param scavengeIntervalMilliseconds set it to zero if you don't want to automatically shrink your pool.
* This is useful for fixed-size pool, or pools don't increase too much.
* @return the pool config
*/
public PoolConfig setScavengeIntervalMilliseconds(int scavengeIntervalMilliseconds) {
if (scavengeIntervalMilliseconds < 5000) {
throw new IllegalArgumentException("Cannot set interval too short (" + scavengeIntervalMilliseconds +
"), must be at least 5 seconds");
}
this.scavengeIntervalMilliseconds = scavengeIntervalMilliseconds;
return this;
}
Expand All @@ -78,6 +91,7 @@ public double getScavengeRatio() {
/**
* Each time we shrink a pool, we only scavenge some of the objects to avoid an empty pool
* @param scavengeRatio must be a double between (0, 1]
* @return the pool config
*/
public PoolConfig setScavengeRatio(double scavengeRatio) {
if (scavengeRatio <= 0 || scavengeRatio > 1) {
Expand Down
32 changes: 13 additions & 19 deletions src/test/java/TestObjectPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
public class TestObjectPool {


public ObjectPool init(double scavengeRatio) {
public ObjectPool<StringBuilder> init(double scavengeRatio) {
Logger.getLogger("").getHandlers()[0].setLevel(Level.ALL);
Logger.getLogger("").setLevel(Level.ALL);
PoolConfig config = new PoolConfig();
config.setPartitionSize(2).setMinSize(2).setMaxSize(20).setMaxIdleMilliseconds(5000).
setScavengeIntervalMilliseconds(5000).setScavengeRatio(scavengeRatio);
setMaxWaitMilliseconds(100).setScavengeIntervalMilliseconds(5000).setScavengeRatio(scavengeRatio);

ObjectFactory<StringBuilder> factory = new ObjectFactory<StringBuilder>() {
@Override
Expand All @@ -36,12 +36,12 @@ public boolean validate(StringBuilder o) {
return true;
}
};
return new ObjectPool(config, factory);
return new ObjectPool<>(config, factory);
}

@Test
public void testSimple() throws InterruptedException {
ObjectPool pool = init(1.0);
public void testSimple() {
ObjectPool<StringBuilder> pool = init(1.0);
for (int i = 0; i < 100; i++) {
try (Poolable<StringBuilder> obj = pool.borrowObject()) {
obj.getObject().append("x");
Expand All @@ -53,7 +53,7 @@ public void testSimple() throws InterruptedException {

@Test
public void testShrink() throws InterruptedException {
final ObjectPool pool = init(1.0);
final ObjectPool<StringBuilder> pool = init(1.0);
List<Poolable<StringBuilder>> borrowed = new ArrayList<>();
for (int i = 0; i < 10; i++) {
System.out.println("test borrow");
Expand All @@ -75,19 +75,13 @@ public void testShrink() throws InterruptedException {
System.out.println("scavenged, pool size=" + pool.getSize());

// test return after shutdown
Thread testThread = new Thread() {
@Override
public void run() {
Poolable<StringBuilder> obj = pool.borrowObject();
try {
System.out.println("pool size:" + pool.getSize());
Thread.sleep(10000);
} catch (InterruptedException e) {
}
pool.returnObject(obj);
System.out.println("pool size:" + pool.getSize());
}
};
Thread testThread = new Thread(() -> {
Poolable<StringBuilder> obj = pool.borrowObject();
System.out.println("pool size:" + pool.getSize());
try { Thread.sleep(10000); } catch (InterruptedException ignored) { }
pool.returnObject(obj);
System.out.println("pool size:" + pool.getSize());
});
testThread.start();
testThread.join();
int removed = pool.shutdown(); // this will block 9 seconds
Expand Down

0 comments on commit 9f8a23c

Please sign in to comment.