Skip to content

Commit

Permalink
feat: global countdown for election
Browse files Browse the repository at this point in the history
  • Loading branch information
elblasco committed Aug 6, 2024
1 parent 5a04de8 commit 332a103
Show file tree
Hide file tree
Showing 5 changed files with 439 additions and 351 deletions.
163 changes: 89 additions & 74 deletions src/main/java/it/unitn/disi/ds1/qtop/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* The Client actor is responsible for sending requests to the Nodes in the network. The requests are randomised
* between read and write operations. The Client actor also has the ability to make crash a random Node in the network.
*/
public class Client extends AbstractActor{
public class Client extends AbstractActor {
private static final Random RAND = new Random();
private static final Logger LOGGER = Logger.getInstance();
private static final int CRASH_TIMEOUT = 1000;
Expand All @@ -31,81 +31,89 @@ public class Client extends AbstractActor{
private int timeOutCounter;
private int requestNumber;

public Client(int clientId, List<ActorRef> group, int numberOfNodes) {
super();
this.clientId = clientId;
public Client(int clientId, List<ActorRef> group, int numberOfNodes) {
super();
this.clientId = clientId;
this.group = group;
this.numberOfNodes = numberOfNodes;
this.timeOutCounter = CRASH_TIMEOUT;
this.requestNumber = 0;
this.timeOutManager = new TimeOutManager(
0,
0,
0,
0,
this.timeOutCounter = CRASH_TIMEOUT;
this.requestNumber = 0;
this.timeOutManager = new TimeOutManager(
0,
0,
0,
0,
0,
CLIENT_REQUEST_TIMEOUT,
COUNTDOWN_REFRESH
);
COUNTDOWN_REFRESH
);
}

static public Props props(int clientId, List<ActorRef> group, int numberOfNodes) {
return Props.create(Client.class, () -> new Client(clientId, group, numberOfNodes));
}
static public Props props(int clientId, List<ActorRef> group, int numberOfNodes) {
return Props.create(
Client.class,
() -> new Client(
clientId,
group,
numberOfNodes
)
);
}

/**
* Mask for to the Client actor.
*
* @return the Receive object
*/
@Override
public Receive createReceive() {
return receiveBuilder().match(
StartMessage.class,
this::onStartMessage
).match(
@Override
public Receive createReceive() {
return receiveBuilder().match(
StartMessage.class,
this::onStartMessage
).match(
Utils.MakeRequest.class,
this::onMakeRequest
).match(
this::onMakeRequest
).match(
Utils.ReadValue.class,
this::onReadAck
).match(
this::onReadAck
).match(
Utils.WriteValue.class,
this::onWriteAck
).match(
Utils.CrashRequest.class,
this::onCrashRequest
).match(
Utils.TimeOut.class,
this::onTimeOut
).match(
Utils.CrashACK.class,
this::onCrashACK
).match(
Utils.CountDown.class,
this::onCountDown
).matchAny(tmp -> {
}).build();
}
this::onWriteAck
).match(
Utils.CrashRequest.class,
this::onCrashRequest
).match(
Utils.TimeOut.class,
this::onTimeOut
).match(
Utils.CrashACK.class,
this::onCrashACK
).match(
Utils.CountDown.class,
this::onCountDown
).matchAny(tmp -> {
}).build();
}

/**
* Initial set up for the Client. It sets up a scheduled message to send requests to the `Node`s.
*
* @param msg the init message
*/
public void onStartMessage(StartMessage msg) {
LOGGER.log(
LogLevel.INFO,
"[CLIENT-" + (this.clientId - this.numberOfNodes) + "] starting..."
);
this.getContext().getSystem().scheduler().scheduleWithFixedDelay(
Duration.ZERO,
Duration.ofMillis(1000),
this.getSelf(),
new Utils.MakeRequest(),
getContext().getSystem().dispatcher(),
this.getSelf()
);
}
* @param msg the init message
*/
public void onStartMessage(StartMessage msg) {
LOGGER.log(
LogLevel.INFO,
"[CLIENT-" + (this.clientId - this.numberOfNodes) + "] starting..."
);
this.getContext().getSystem().scheduler().scheduleWithFixedDelay(
Duration.ZERO,
Duration.ofMillis(1000),
this.getSelf(),
new Utils.MakeRequest(),
getContext().getSystem().dispatcher(),
this.getSelf()
);
}

/**
* General purpose handler for the Client's timeout messages.
Expand All @@ -126,7 +134,8 @@ private void onTimeOut(@NotNull Utils.TimeOut msg) {
new Utils.CrashRequest(this.cachedCrashType),
getSelf()
);
} else if (msg.reason() == Utils.TimeOutReason.CLIENT_REQUEST)
}
else if (msg.reason() == Utils.TimeOutReason.CLIENT_REQUEST)
{
LOGGER.log(
LogLevel.INFO,
Expand All @@ -144,7 +153,6 @@ private void onTimeOut(@NotNull Utils.TimeOut msg) {
private void onMakeRequest(Utils.MakeRequest msg) {
boolean type = RAND.nextBoolean(); // true for write, false for read
int index = RAND.nextInt(this.numberOfNodes);

this.timeOutManager.startCountDown(
Utils.TimeOutReason.CLIENT_REQUEST,
this.getContext().getSystem().scheduler().scheduleWithFixedDelay(
Expand All @@ -153,13 +161,15 @@ private void onMakeRequest(Utils.MakeRequest msg) {
this.getSelf(),
new Utils.CountDown(
Utils.TimeOutReason.CLIENT_REQUEST,
new EpochPair(0, requestNumber)
new EpochPair(
0,
requestNumber
)
),
getContext().getSystem().dispatcher(),
getSelf()
),
this.requestNumber

);
if (type)
{
Expand All @@ -175,19 +185,20 @@ private void onMakeRequest(Utils.MakeRequest msg) {
);
LOGGER.log(
LogLevel.INFO,
"[CLIENT-" + (this.clientId - this.numberOfNodes) + "] write requested to [NODE-" + index + "], " +
"new value proposed: "
+ proposedValue +
", local operation id: " + requestNumber
"[CLIENT-" + (this.clientId - this.numberOfNodes) + "] write requested to [NODE-" + index + "], " + "new value proposed: " + proposedValue + ", local operation id: " + requestNumber
);
}
else
{
LOGGER.log(
LogLevel.INFO,
"[CLIENT-"+ (this.clientId - this.numberOfNodes) +"] requesting read to " +
"[NODE-" + index + "]" + ", local operation id: " + requestNumber);
group.get(index).tell(new ReadRequest(requestNumber), this.getSelf());
"[CLIENT-" + (this.clientId - this.numberOfNodes) + "] requesting read to " + "[NODE-" + index +
"]" + ", local operation id: " + requestNumber
);
group.get(index).tell(
new ReadRequest(requestNumber),
this.getSelf()
);
}
this.requestNumber++;
}
Expand All @@ -199,7 +210,6 @@ private void onMakeRequest(Utils.MakeRequest msg) {
*/
private void onReadAck(@NotNull Utils.ReadValue msg) {
int value = msg.value();

this.timeOutManager.resetCountDown(
Utils.TimeOutReason.CLIENT_REQUEST,
msg.nRequest()
Expand Down Expand Up @@ -239,15 +249,20 @@ private void onCountDown(@NotNull Utils.CountDown msg) {
if (this.timeOutCounter <= 0)
{
getSelf().tell(
new Utils.TimeOut(Utils.TimeOutReason.CRASH_RESPONSE,null),
new Utils.TimeOut(
Utils.TimeOutReason.CRASH_RESPONSE,
null
),
getSelf()
);
}
else
{
this.timeOutCounter -= CRASH_TIMEOUT / 100;
}
} else if (msg.reason() == Utils.TimeOutReason.CLIENT_REQUEST) {
}
else if (msg.reason() == Utils.TimeOutReason.CLIENT_REQUEST)
{
this.timeOutManager.clientHandleCountDown(
msg.reason(),
msg.epoch().i(),
Expand Down Expand Up @@ -299,8 +314,8 @@ private void onCrashACK(Utils.CrashACK msg) {
/**
* Send a message to a destination actor with a random delay, within 0 and 29 milliseconds.
*
* @param dest the destination actor
* @param msg the message to send
* @param dest the destination actor
* @param msg the message to send
* @param sender the sender actor
*/
public void tell(ActorRef dest, final Object msg, final ActorRef sender) {
Expand Down
Loading

0 comments on commit 332a103

Please sign in to comment.