Skip to content

Commit

Permalink
Apply refactorings and bug fixes
Browse files Browse the repository at this point in the history
Mostly include more work on the correctness
of `await`.

Rel #14 #15
  • Loading branch information
nobeh committed Jul 12, 2015
1 parent 092058a commit 23fdcfc
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 88 deletions.
1 change: 1 addition & 0 deletions abs-api/src/main/java/abs/api/Actor.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ default <V> Response<V> await(Object to, Object message) {
final Reference toRef = reference(to);
final Envelope envelope = new AwaitEnvelope(from, toRef, message);
context().execute(() -> context().router().route(envelope));
((ContextResponse<?>) envelope.response()).await();
return envelope.response();
}

Expand Down
4 changes: 2 additions & 2 deletions abs-api/src/main/java/abs/api/AwaitEnvelope.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public AwaitEnvelope(Reference sender, Reference receiver, Object message) {
}

@Override
protected Fut createResponse() {
return new Fut(true);
protected ContextResponse createResponse() {
return new ContextResponse(true);
}

}
5 changes: 2 additions & 3 deletions abs-api/src/main/java/abs/api/Context.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package abs.api;

import java.util.concurrent.Executor;
import java.util.concurrent.Future;

/**
* A context provides a set of interfaces that allows regulating
Expand Down Expand Up @@ -135,7 +134,7 @@ default <T> T object(Reference reference) {
* message
* @return the result of the message as a future
*/
default <V> Future<V> send(Object to, Object message) {
default <V> Response<V> send(Object to, Object message) {
return Actor.NOBODY.send(to, message);
}

Expand All @@ -156,7 +155,7 @@ default <V> Future<V> send(Object to, Object message) {
* @return the future value to capture the result of the
* message
*/
default <V> Future<V> await(Object to, Object message) {
default <V> Response<V> await(Object to, Object message) {
return Actor.NOBODY.await(to, message);
}

Expand Down
9 changes: 6 additions & 3 deletions abs-api/src/main/java/abs/api/ContextInbox.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package abs.api;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -47,7 +48,7 @@ public void run() {
return;
}
while (running.get()) {
execute(inboxes, inboxes.size() > 1000);
execute(inboxes, inboxes.size() > 10000);
}
}

Expand All @@ -73,9 +74,11 @@ public ContextInbox(ExecutorService executor) {

protected void execute(Collection<ObjectInbox> inboxes, final boolean parallel) {
if (parallel) {
inboxes.parallelStream().forEach(oi -> executor.submit(oi));
inboxes.parallelStream().forEach((Runnable oi) -> executor.submit(oi));
} else {
inboxes.stream().forEach(oi -> executor.submit(oi));
inboxes.stream().filter(oi -> !oi.isRunning() && !oi.isBusy()).forEach(oi -> {
CompletableFuture.runAsync(oi, executor);
});
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package abs.api;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
* An internal implementation of {@link Response} extending over
Expand All @@ -9,14 +10,14 @@
* @author Behrooz Nobakht
* @since 1.0
*/
class Fut<V> extends CompletableFuture<V> implements Response<V> {
class ContextResponse<V> extends CompletableFuture<V>implements Response<V> {

private final boolean await;

/**
* Ctor.
*/
public Fut() {
public ContextResponse() {
this(false);
}

Expand All @@ -26,48 +27,52 @@ public Fut() {
* @param await if this is a response for an
* {@link AwaitEnvelope}.
*/
public Fut(boolean await) {
public ContextResponse(boolean await) {
this.await = await;
}

@Override
public boolean isDone() {
await();
return super.isDone();
}

@Override
public boolean isCancelled() {
await();
return super.isCancelled();
}

@Override
public boolean isCompletedExceptionally() {
await();
return super.isCompletedExceptionally();
}

@Override
public boolean isCompleted() {
await();
if (!isDone()) {
return false;
}
return isDone() && !isCompletedExceptionally();
}

@Override
public V get() throws InterruptedException, ExecutionException {
try {
get();
return true;
} catch (Throwable e) {
return false;
await();
V v = super.get();
complete(v);
return v;
} catch (InterruptedException e) {
completeExceptionally(e);
throw e;
} catch (ExecutionException e) {
completeExceptionally(e);
throw e;
}
}

@Override
public V getValue() {
try {
return get();
V v = get();
return v;
} catch (Throwable e) {
completeExceptionally(e);
return null;
}
}
Expand All @@ -78,23 +83,23 @@ public <E extends Throwable> E getException() {
return null;
}
try {
get();
await();
throw new IllegalStateException("Should have completed exceptionally: " + this);
} catch (Throwable e) {
return e.getCause() == null ? (E) e : (E) e.getCause();
}
}

/**
* Await on this response if necessary
* until it's done or it fails.
* Await on this response if necessary until it's done or it
* fails.
*/
protected synchronized void await() {
protected synchronized final void await() {
if (!await) {
return;
}
try {
V object = get();
V object = super.get();
complete(object);
} catch (Throwable e) {
completeExceptionally(e);
Expand Down
7 changes: 6 additions & 1 deletion abs-api/src/main/java/abs/api/EnveloperRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* {@link Runnable} from {@link Envelope#message()}. Execution
* of the message is contained and no exception is propagated.
*/
class EnveloperRunner implements Runnable {
class EnveloperRunner implements Runnable, Comparable<EnveloperRunner> {

private final Envelope envelope;
private final Context context;
Expand Down Expand Up @@ -46,6 +46,11 @@ public final void run() {
executeMessage(msg, response);
}

@Override
public int compareTo(EnveloperRunner o) {
return Long.compare(envelope.sequence(), o.envelope.sequence());
}

protected void executeMessage(final Object msg, final Response<Object> response) {
if (msg instanceof Runnable) {
executeRunnableEnvelope(msg, response);
Expand Down
8 changes: 5 additions & 3 deletions abs-api/src/main/java/abs/api/LocalContext.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package abs.api;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.PostConstruct;

Expand All @@ -28,6 +28,7 @@ public class LocalContext implements Context {
private Notary notary;
private ExecutorService executor;
private ReferenceFactory referenceFactory;
private final ExecutorService routerExecutor = Executors.newSingleThreadExecutor();

/**
* <p>
Expand Down Expand Up @@ -132,8 +133,8 @@ public void execute(Runnable command) {
// Every message to an object should be queued in the
// order that it is received. Here, #get() ensures such
// order of queueing.
executor.submit(command).get();
} catch (InterruptedException | ExecutionException e) {
routerExecutor.submit(command);
} catch (Exception e) {
// Ignore: What can we do??!
}
}
Expand All @@ -142,6 +143,7 @@ public void execute(Runnable command) {
@Override
public void stop() throws Exception {
try {
routerExecutor.shutdownNow();
List<Runnable> tasks = executor.shutdownNow();
for (Runnable task : tasks) {
if (task instanceof EnveloperRunner) {
Expand Down
29 changes: 23 additions & 6 deletions abs-api/src/main/java/abs/api/ObjectInbox.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package abs.api;

import java.time.Instant;
import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
Expand All @@ -11,6 +12,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* A dedicated {@link Inbox} for a receiver of an
Expand Down Expand Up @@ -117,12 +119,17 @@ public <V> Future<V> open(Envelope envelope, Object target) {
}

public void run() {
if (!sweeping.compareAndSet(false, true)) {
if (isBusy() || !sweeping.compareAndSet(false, true)) {
return;
}
Envelope envelope = get();
if (envelope != null) {
open(envelope, receiver);
for (Envelope envelope = get(); envelope != null; envelope = get()) {
super.onOpen(envelope, this, receiver);
EnveloperRunner runner = createEnvelopeRunner(envelope);
runner.run();
// System.out.println("RUN --- " + Thread.currentThread().getName() + " " + receiver + " "
// + envelope.sequence() + " "
// + unprocessed.stream().map(e -> Long.valueOf(e.sequence())).collect(Collectors.toList())
// + " => " + envelope.response().getValue() + " " + envelope.response());
}
sweeping.getAndSet(false);
}
Expand Down Expand Up @@ -175,6 +182,10 @@ protected boolean isAwaiting() {
return !awaiting.isEmpty();
}

protected boolean isRunning() {
return sweeping.get();
}

protected Envelope lastAwaitingEnvelope() {
return this.awaiting.peek();
}
Expand Down Expand Up @@ -210,10 +221,16 @@ protected ObjectInbox senderInbox(Envelope envelope, Context context) {
}

protected Envelope nextEnvelope(BlockingQueue<Envelope> q) {
if (q.isEmpty()) {
if (isBusy() || q.isEmpty()) {
return null;
}
return q.peek();
Envelope env = q.peek();
return env;
}

private void log(Object o) {
System.err.println(String.format("%s %s %s %s", Instant.now().toString(),
Thread.currentThread().getName(), receiver, o.toString()));
}

}
2 changes: 1 addition & 1 deletion abs-api/src/main/java/abs/api/SimpleEnvelope.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,6 @@ public String toString() {
}

protected <V> Response<V> createResponse() {
return new Fut<V>();
return new ContextResponse<V>();
}
}
Loading

0 comments on commit 23fdcfc

Please sign in to comment.