Skip to content

Commit

Permalink
Merge pull request #109 from stelin/main
Browse files Browse the repository at this point in the history
Update cluster
  • Loading branch information
stelin authored Jun 12, 2023
2 parents bf74d4f + d704974 commit 3b0c36f
Show file tree
Hide file tree
Showing 69 changed files with 832 additions and 179 deletions.
2 changes: 1 addition & 1 deletion openjob-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>openjob</artifactId>
<groupId>io.openjob</groupId>
<version>1.0.2</version>
<version>1.0.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>openjob-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.openjob.worker.task;
package io.openjob.common.task;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -17,6 +17,9 @@
*/
@Slf4j
public abstract class BaseConsumer<T> {

protected Long pollIdleTime = 1000L;
protected Long pollSleepTime = 500L;
protected final Long id;
protected final Integer consumerCoreThreadNum;
protected final Integer consumerMaxThreadNum;
Expand All @@ -26,7 +29,6 @@ public abstract class BaseConsumer<T> {
protected String pollThreadName;
protected Thread pollThread;
protected TaskQueue<T> queues;

protected ThreadPoolExecutor pullExecutor;
protected AtomicInteger activePollNum = new AtomicInteger(0);

Expand Down Expand Up @@ -57,6 +59,39 @@ public BaseConsumer(Long id,
this.queues = queues;
}

/**
* New BaseConsumer
*
* @param id id
* @param consumerCoreThreadNum consumerCoreThreadNum
* @param consumerMaxThreadNum consumerMaxThreadNum
* @param consumerThreadName consumerThreadName
* @param pollSize pollSize
* @param pollThreadName pollThreadName
* @param queues queues
* @param pollIdleTime pollIdleTime(ms)
* @param pollSleepTime pollSleepTime(ms)
*/
public BaseConsumer(Long id,
Integer consumerCoreThreadNum,
Integer consumerMaxThreadNum,
String consumerThreadName,
Integer pollSize,
String pollThreadName,
TaskQueue<T> queues,
Long pollIdleTime,
Long pollSleepTime) {
this.id = id;
this.consumerCoreThreadNum = consumerCoreThreadNum;
this.consumerMaxThreadNum = consumerMaxThreadNum;
this.consumerThreadName = consumerThreadName;
this.pollSize = pollSize;
this.pollThreadName = pollThreadName;
this.queues = queues;
this.pollIdleTime = pollIdleTime;
this.pollSleepTime = pollSleepTime;
}

/**
* Start
*/
Expand Down Expand Up @@ -93,10 +128,10 @@ public Thread newThread(@Nonnull Runnable r) {
List<T> tasks = this.pollTasks();
if (tasks.size() < this.pollSize) {
if (tasks.isEmpty()) {
Thread.sleep(1000L);
Thread.sleep(this.pollIdleTime);
continue;
}
Thread.sleep(500L);
Thread.sleep(this.pollSleepTime);
}
}
} catch (Throwable ex) {
Expand Down Expand Up @@ -136,15 +171,6 @@ public void stop() {
}
}

private synchronized List<T> pollTasks() {
List<T> tasks = queues.poll(this.pollSize);
if (!tasks.isEmpty()) {
this.activePollNum.incrementAndGet();
this.consume(id, tasks);
}
return tasks;
}

/**
* Whether is active.
*
Expand All @@ -153,4 +179,17 @@ private synchronized List<T> pollTasks() {
public synchronized boolean isActive() {
return queues.size() > 0 || activePollNum.get() > 0;
}

public AtomicInteger getActivePollNum() {
return activePollNum;
}

private synchronized List<T> pollTasks() {
List<T> tasks = queues.poll(this.pollSize);
if (!tasks.isEmpty()) {
this.activePollNum.incrementAndGet();
this.consume(id, tasks);
}
return tasks;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.openjob.worker.task;
package io.openjob.common.task;

import com.google.common.collect.Lists;

Expand Down
2 changes: 1 addition & 1 deletion openjob-server/openjob-server-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>openjob-server</artifactId>
<groupId>io.openjob</groupId>
<version>1.0.2</version>
<version>1.0.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>openjob-server-admin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package io.openjob.server.admin.interceptor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import io.openjob.common.response.Result;
import io.openjob.server.admin.constant.AdminConstant;
import io.openjob.server.admin.constant.AdminHttpStatusEnum;
import io.openjob.server.admin.service.AdminUserService;
Expand All @@ -12,7 +9,6 @@
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;

Expand Down
2 changes: 1 addition & 1 deletion openjob-server/openjob-server-cluster/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>openjob-server</artifactId>
<groupId>io.openjob</groupId>
<version>1.0.2</version>
<version>1.0.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ public void start() {

// Join server
Config config = actorSystem.settings().config();
Integer bindPort = config.getInt(AkkaConfigConstant.AKKA_BIND_PORT);
String bindHostname = config.getString(AkkaConfigConstant.AKKA_BIND_HOSTNAME);
this.joinService.join(bindHostname, bindPort);
Integer remotePort = config.getInt(AkkaConfigConstant.AKKA_CANONICAL_PORT);
String remoteHostname = config.getString(AkkaConfigConstant.AKKA_CANONICAL_HOSTNAME);
this.joinService.join(remoteHostname, remotePort);

// Register coordinated shutdown.
this.registerCoordinatedShutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ public Receive createReceive() {
*/
public void workerHeartbeat(WorkerHeartbeatRequest workerHeartbeatRequest) {
ServerHeartbeatResponse response = workerHeartbeatService.workerHeartbeat(workerHeartbeatRequest);
log.info("Worker({}) heartbeat success!", workerHeartbeatRequest.getAddress());

getSender().tell(Result.success(response), getSelf());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ public static class Worker {
/**
* Worker online period(s).
*/
private Integer onlinePeriod = 30;
private Integer onlinePeriod = 10;

/**
* Worker offline period(s).
*/
private Integer offlinePeriod = 60;
private Integer offlinePeriod = 20;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.openjob.server.cluster.executor;

import io.openjob.common.request.WorkerHeartbeatRequest;
import io.openjob.common.task.TaskQueue;
import io.openjob.server.cluster.service.WorkerHeartbeatService;
import io.openjob.server.cluster.task.WorkerHeartConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* @author stelin [email protected]
* @since 1.0.3
*/
@Slf4j
@Component
public class WorkerHeartbeatExecutor {
private final TaskQueue<WorkerHeartbeatRequest> queue;

/**
* New
*/
public WorkerHeartbeatExecutor() {
this.queue = new TaskQueue<>(0L, 64);

//Consumer
WorkerHeartConsumer consumer = new WorkerHeartConsumer(
0L,
1,
8,
"Openjob-heartbeat-executor",
50,
"Openjob-heartbeat-consumer",
this.queue
);
consumer.start();
}

/**
* Submit request
*
* @param request request
*/
public void submit(WorkerHeartbeatRequest request) {
try {
this.queue.submit(request);
} catch (InterruptedException e) {
log.error("Worker heartbeat submit failed!", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.openjob.server.cluster.executor;

import io.openjob.common.request.WorkerJobInstanceTaskLogRequest;
import io.openjob.common.task.TaskQueue;
import io.openjob.server.cluster.task.WorkerTaskLogConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* @author stelin [email protected]
* @since 1.0.3
*/
@Slf4j
@Component
public class WorkerTaskLogExecutor {
private final TaskQueue<WorkerJobInstanceTaskLogRequest> queue;

/**
* New
*/
public WorkerTaskLogExecutor() {
this.queue = new TaskQueue<>(0L, 64);

// Consumer
WorkerTaskLogConsumer consumer = new WorkerTaskLogConsumer(
0L,
1,
16,
"Openjob-log-executor",
50,
"Openjob-log-consumer",
this.queue
);
consumer.start();
}

/**
* Submit request
*
* @param request request
*/
public void submit(WorkerJobInstanceTaskLogRequest request) {
try {
this.queue.submit(request);
} catch (InterruptedException e) {
log.error("Worker task log submit failed!", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
* @author stelin [email protected]
* @since 1.0.0
*/
@Component
@Slf4j
@Component
public class FailManager {
private final ServerDAO serverDAO;
private final JobSlotsDAO jobSlotsDAO;
Expand Down Expand Up @@ -69,7 +69,7 @@ public void fail(Node stopNode) {
failDTO.setServerId(stopNode.getServerId());
failDTO.setAkkaAddress(stopNode.getAkkaAddress());

// Akka message for join.
// Akka message for fail.
this.sendClusterStopMessage(failDTO, stopNode);
}
} catch (InterruptedException interruptedException) {
Expand Down Expand Up @@ -135,7 +135,9 @@ public void shutdown(Node stopNode) {
*/
private void migrateSlots(Node stopNode) {
List<JobSlots> currentJobSlots = this.jobSlotsDAO.listJobSlotsByServerId(stopNode.getServerId());
List<Server> servers = this.serverDAO.listServers(ServerStatusEnum.OK.getStatus());
List<Server> servers = this.serverDAO.listServers(ServerStatusEnum.OK.getStatus())
.stream().filter(s -> !s.getId().equals(stopNode.getServerId()))
.collect(Collectors.toList());

// Exclude current server.
int serverCount = servers.size();
Expand All @@ -154,27 +156,23 @@ private void migrateSlots(Node stopNode) {
for (int i = 0; i < servers.size(); i++) {
Server s = servers.get(i);

// Ignore current server.
if (s.getId().equals(stopNode.getServerId())) {
break;
}

// Last server.
if (i + 1 == servers.size()) {
List<Long> lastSlotsId = currentJobSlots.subList(index, currentJobSlots.size() - index)
List<Long> lastSlotsId = currentJobSlots.subList(index, currentJobSlots.size())
.stream()
.map(JobSlots::getId)
.collect(Collectors.toList());
migrationSlots.put(s.getId(), lastSlotsId);
break;
}

index += slotsSize;

List<Long> slotIds = currentJobSlots.subList(index, slotsSize)
int segmentSize = (i + 1) * slotsSize;
List<Long> slotIds = currentJobSlots.subList(index, segmentSize)
.stream()
.map(JobSlots::getId)
.collect(Collectors.toList());

index = segmentSize;
migrationSlots.put(s.getId(), slotIds);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void doCheck(Map<Long, Node> nodesMap, Long serverId) {
this.checkOnline(node);
}
} catch (Exception e) {
log.info("Node{} ping failed!", node);
log.info(String.format("Node %s ping failed!", node.toString()), e);

// Node failed.
// Record node fail message.
Expand Down Expand Up @@ -156,8 +156,8 @@ public void checkOnline(Node reportNode) {
if (reportsCount > this.clusterProperties.getNodeSuccessTimes()) {
// Join node to cluster.
Config config = this.actorSystem.settings().config();
Integer bindPort = config.getInt(AkkaConfigConstant.AKKA_BIND_PORT);
String bindHostname = config.getString(AkkaConfigConstant.AKKA_BIND_HOSTNAME);
Integer bindPort = config.getInt(AkkaConfigConstant.AKKA_CANONICAL_PORT);
String bindHostname = config.getString(AkkaConfigConstant.AKKA_CANONICAL_HOSTNAME);
this.joinManager.join(bindHostname, bindPort);
}
}
Expand Down
Loading

0 comments on commit 3b0c36f

Please sign in to comment.