Skip to content

Commit

Permalink
Merge pull request #88 from stelin/main
Browse files Browse the repository at this point in the history
Fixed bug
  • Loading branch information
stelin authored May 19, 2023
2 parents 87822a2 + d774224 commit cda2c5f
Show file tree
Hide file tree
Showing 37 changed files with 581 additions and 135 deletions.
12 changes: 12 additions & 0 deletions openjob-common/src/main/java/io/openjob/common/util/IpUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
Expand Down Expand Up @@ -90,6 +91,17 @@ public static String getLocalAddress() {
}
}

/**
* Get ip by host
*
* @param host host
* @return String
* @throws UnknownHostException UnknownHostException
*/
public static String getIpByHost(String host) throws UnknownHostException {
return InetAddress.getByName(host).getHostAddress();
}

/**
* Normalize address.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package io.openjob.common.util;

import org.checkerframework.checker.units.qual.A;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
* @author stelin [email protected]
* @since 1.0.0
Expand All @@ -16,4 +20,22 @@ public void testGetLocalIp() {
String formatAddress = IpUtil.getFormatAddress();
Assertions.assertNotNull(formatAddress);
}

@Test
public void testGetIpByHost() throws UnknownHostException {
String ip = IpUtil.getIpByHost("localhost");
Assertions.assertEquals(ip, "127.0.0.1");

String ip2 = IpUtil.getIpByHost("127.0.0.1");
Assertions.assertEquals(ip2, "127.0.0.1");

String ip3 = IpUtil.getIpByHost("github.com");
Assertions.assertNotNull(ip3);

String ip4 = IpUtil.getIpByHost("20.205.243.166");
Assertions.assertEquals(ip4, "20.205.243.166");

String ip5 = IpUtil.getIpByHost("172.20.1.166");
Assertions.assertEquals(ip5, "172.20.1.166");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,37 @@
import lombok.Getter;

/**
* user: 100+
* namespace: 200+
* application: 300+
* job: 400+
* delay: 500+
* User: 100+
* Namespace: 200+
* Application: 300+
* Job: 400+
* Delay: 500+
*
* @author stelin [email protected]
* @since 1.0.0
*/
@Getter
@AllArgsConstructor
public enum CodeEnum implements CodeExceptionAssert {
// Code list
USER_EXIST(100, "User is exist!"),

/**
* App name not exist
*/
NAME_EXIST(100, "App name must be globally unique!"),
// Namespace
NAMESPACE_DELETE_INVALID(200, "Namespace can not be delete!"),

// Application
APP_NAME_EXIST(300, "App name must be globally unique!"),
APP_DELETE_INVALID(301, "Application can not be deleted!"),

// Job
TIME_EXPRESSION_INVALID(400, "Time expression is invalid"),
JOB_DELETE_INVALID(401, "Job can not be deleted!"),

// Delay
DELAY_TOPIC_EXIST(500, "Topic is exist!"),
DELAY_DELETE_INVALID(501, "Delay can not be deleted!"),
;

TIME_EXPRESSION_INVALID(400, "Time expression is invalid");

/**
* Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
import io.openjob.server.common.util.PageUtil;
import io.openjob.server.common.vo.PageVO;
import io.openjob.server.repository.dao.AppDAO;
import io.openjob.server.repository.dao.DelayDAO;
import io.openjob.server.repository.dao.JobDAO;
import io.openjob.server.repository.dao.NamespaceDAO;
import io.openjob.server.repository.entity.App;
import io.openjob.server.repository.entity.Delay;
import io.openjob.server.repository.entity.Job;
import io.openjob.server.repository.entity.Namespace;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
Expand All @@ -37,17 +41,22 @@ public class AppServiceImpl implements AppService {

private final AppDAO appDAO;
private final NamespaceDAO namespaceDAO;
private final JobDAO jobDAO;
private final DelayDAO delayDAO;


@Autowired
public AppServiceImpl(AppDAO appDAO, NamespaceDAO namespaceDAO) {
public AppServiceImpl(AppDAO appDAO, NamespaceDAO namespaceDAO, JobDAO jobDAO, DelayDAO delayDAO) {
this.appDAO = appDAO;
this.namespaceDAO = namespaceDAO;
this.jobDAO = jobDAO;
this.delayDAO = delayDAO;
}

@Override
public AddAppVO add(AddAppRequest addRequest) {
App app = this.appDAO.getAppByName(addRequest.getName());
CodeEnum.NAME_EXIST.assertIsTrue(Objects.isNull(app));
CodeEnum.APP_NAME_EXIST.assertIsTrue(Objects.isNull(app));

Long id = this.appDAO.save(BeanMapperUtil.map(addRequest, App.class));

Expand All @@ -61,7 +70,7 @@ public UpdateAppVO update(UpdateAppRequest updateRequest) {
// App name is exist and not self!
App nameApp = this.appDAO.getAppByName(updateRequest.getName());
if (Objects.nonNull(nameApp) && !nameApp.getId().equals(updateRequest.getId())) {
CodeEnum.NAME_EXIST.throwException();
CodeEnum.APP_NAME_EXIST.throwException();
}

App app = BeanMapperUtil.map(BeanMapperUtil.map(updateRequest, App.class), App.class);
Expand All @@ -71,9 +80,16 @@ public UpdateAppVO update(UpdateAppRequest updateRequest) {

@Override
public DeleteAppVO delete(DeleteAppRequest deleteAppRequest) {
App app = BeanMapperUtil.map(deleteAppRequest, App.class);
app.setDeleted(CommonConstant.YES);
this.appDAO.update(app);
App byId = this.appDAO.getById(deleteAppRequest.getId());

// Job/delay/workflow
Job firstJob = this.jobDAO.getFirstByNamespaceAndAppid(byId.getNamespaceId(), byId.getId());
Delay firstDelay = this.delayDAO.getFirstByNamespaceAndAppid(byId.getNamespaceId(), byId.getId());
if (Objects.nonNull(firstJob) || Objects.nonNull(firstDelay)) {
CodeEnum.APP_DELETE_INVALID.throwException();
}

this.appDAO.deleteById(deleteAppRequest.getId());
return new DeleteAppVO();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public StopDelayInstanceVO stop(StopDelayInstanceRequest request) {
DelayInstanceStopResponseDTO stop = this.delayInstanceScheduler.stop(delayInstanceStopRequestDTO);

// Update status
this.delayInstanceDAO.updateStatus(request.getTaskId(), TaskStatusEnum.STOP.getStatus());

StopDelayInstanceVO stopDelayInstanceVO = new StopDelayInstanceVO();
stopDelayInstanceVO.setResult(stop.getResult());
return stopDelayInstanceVO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.openjob.common.constant.CommonConstant;
import io.openjob.common.constant.TaskStatusEnum;
import io.openjob.common.util.DelayUtil;
import io.openjob.server.admin.constant.CodeEnum;
import io.openjob.server.admin.request.delay.AddDelayRequest;
import io.openjob.server.admin.request.delay.DeleteDelayRequest;
import io.openjob.server.admin.request.delay.ListDelayRequest;
Expand Down Expand Up @@ -65,6 +66,9 @@ public DelayServiceImpl(DelayDAO delayDAO, AppDAO appDAO, DelayInstanceDAO delay
@Override
@Transactional(rollbackFor = Exception.class)
public AddDelayVO add(AddDelayRequest addRequest) {
Delay byTopic = this.delayDAO.findByTopic(addRequest.getTopic());
CodeEnum.DELAY_TOPIC_EXIST.assertIsTrue(Objects.isNull(byTopic));

// Delay
Delay delay = BeanMapperUtil.map(addRequest, Delay.class);
delay.setPid(0L);
Expand Down Expand Up @@ -139,8 +143,13 @@ public PageVO<ListDelayVO> list(ListDelayRequest listDelayRequest) {
@Override
@Transactional(rollbackFor = Exception.class)
public DeleteDelayVO delete(DeleteDelayRequest deleteDelayRequest) {
this.delayDAO.updateStatusOrDeleted(deleteDelayRequest.getId(), null, CommonConstant.YES);
this.delayDAO.updateStatusOrDeleted(deleteDelayRequest.getCid(), null, CommonConstant.YES);
if (Objects.nonNull(this.delayInstanceDAO.getFirstByDelayId(deleteDelayRequest.getId()))) {
CodeEnum.DELAY_DELETE_INVALID.throwException();
}

// Delete
this.delayDAO.deleteById(deleteDelayRequest.getId());
this.delayDAO.deleteById(deleteDelayRequest.getCid());

// Refresh delay version
this.delayScheduler.refreshDelayVersion();
Expand All @@ -150,6 +159,11 @@ public DeleteDelayVO delete(DeleteDelayRequest deleteDelayRequest) {
@Override
@Transactional(rollbackFor = Exception.class)
public UpdateDelayVO update(UpdateDelayRequest updateDelayRequest) {
Delay byTopic = this.delayDAO.findByTopic(updateDelayRequest.getTopic());
if (Objects.nonNull(byTopic) && !byTopic.getId().equals(updateDelayRequest.getId())) {
CodeEnum.DELAY_TOPIC_EXIST.throwException();
}

// Delay
Delay delay = BeanMapperUtil.map(updateDelayRequest, Delay.class);
this.delayDAO.update(delay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
import io.openjob.server.repository.constant.JobStatusEnum;
import io.openjob.server.repository.dao.AppDAO;
import io.openjob.server.repository.dao.JobDAO;
import io.openjob.server.repository.dao.JobInstanceDAO;
import io.openjob.server.repository.dto.JobPageDTO;
import io.openjob.server.repository.entity.App;
import io.openjob.server.repository.entity.Job;
import io.openjob.server.scheduler.dto.JobExecuteRequestDTO;
import io.openjob.server.scheduler.service.JobSchedulingService;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.apache.bcel.classfile.Code;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
Expand All @@ -56,12 +58,14 @@ public class JobServiceImpl implements JobService {

private final JobDAO jobDAO;
private final AppDAO appDAO;
private final JobInstanceDAO jobInstanceDAO;
private final JobSchedulingService jobSchedulingService;

@Autowired
public JobServiceImpl(JobDAO jobDAO, AppDAO appDAO, JobSchedulingService jobSchedulingService) {
public JobServiceImpl(JobDAO jobDAO, AppDAO appDAO, JobInstanceDAO jobInstanceDAO, JobSchedulingService jobSchedulingService) {
this.jobDAO = jobDAO;
this.appDAO = appDAO;
this.jobInstanceDAO = jobInstanceDAO;
this.jobSchedulingService = jobSchedulingService;
}

Expand Down Expand Up @@ -107,6 +111,10 @@ public UpdateJobVO update(UpdateJobRequest updateJobRequest) {

@Override
public DeleteJobVO delete(DeleteJobRequest deleteJobRequest) {
if (Objects.nonNull(this.jobInstanceDAO.getFirstByJobId(deleteJobRequest.getId()))) {
CodeEnum.JOB_DELETE_INVALID.throwException();
}

this.jobDAO.updateByStatusOrDeleted(deleteJobRequest.getId(), null, CommonConstant.YES, null);
return new DeleteJobVO();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.openjob.server.admin.service.impl;

import io.openjob.common.constant.CommonConstant;
import io.openjob.server.admin.constant.CodeEnum;
import io.openjob.server.admin.request.namespace.AddNamespaceRequest;
import io.openjob.server.admin.request.namespace.DeleteNamespaceRequest;
import io.openjob.server.admin.request.namespace.ListNamespaceRequest;
Expand All @@ -14,11 +15,13 @@
import io.openjob.server.common.util.BeanMapperUtil;
import io.openjob.server.common.util.PageUtil;
import io.openjob.server.common.vo.PageVO;
import io.openjob.server.repository.dao.AppDAO;
import io.openjob.server.repository.dao.NamespaceDAO;
import io.openjob.server.repository.entity.Namespace;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Objects;
import java.util.UUID;

/**
Expand All @@ -28,10 +31,12 @@
@Service
public class NamespaceServiceImpl implements NamespaceService {
private final NamespaceDAO namespaceDAO;
private final AppDAO appDAO;

@Autowired
public NamespaceServiceImpl(NamespaceDAO namespaceDAO) {
public NamespaceServiceImpl(NamespaceDAO namespaceDAO, AppDAO appDAO) {
this.namespaceDAO = namespaceDAO;
this.appDAO = appDAO;
}

@Override
Expand All @@ -53,6 +58,10 @@ public UpdateNamespaceVO update(UpdateNamespaceRequest updateRequest) {

@Override
public DeleteNamespaceVO delete(DeleteNamespaceRequest deleteNamespaceRequest) {
if (Objects.nonNull(this.appDAO.getFirstByNamespaceId(deleteNamespaceRequest.getId()))) {
CodeEnum.NAMESPACE_DELETE_INVALID.throwException();
}

Namespace namespace = BeanMapperUtil.map(deleteNamespaceRequest, Namespace.class);
namespace.setDeleted(CommonConstant.YES);
this.namespaceDAO.update(namespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public Boolean doJoin(String hostname, Integer port) {

// Refresh current slots.
this.refreshManager.refreshCurrentSlots();

// Refresh app workers;
this.refreshManager.refreshAppWorkers();
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void workerCheck() {
workerStartRequest.setAppName(w.getAppName());
workerStartRequest.setWorkerKey(w.getWorkerKey());

log.info("Scheduling worker start begin!");
log.info("Scheduling worker start begin! address={}", w.getAddress());
this.workerStart(workerStartRequest);
}
});
Expand All @@ -193,7 +193,7 @@ public void workerCheck() {
workerStopRequest.setAddress(w.getAddress());
workerStopRequest.setAppName(w.getAppName());

log.info("Scheduling worker stop begin!");
log.info("Scheduling worker stop begin! address={}", w.getAddress());
this.workerStop(workerStopRequest);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public static void refreshAppWorkers(List<Worker> workers) {
Map<Long, List<WorkerDTO>> appWorkers = workers.stream()
.map(w -> {
WorkerDTO workerDTO = new WorkerDTO();
workerDTO.setNamespaceId(w.getNamespaceId());
workerDTO.setAppId(w.getAppId());
workerDTO.setWorkerKey(w.getWorkerKey());
workerDTO.setAddress(w.getAddress());
Expand All @@ -67,12 +68,13 @@ public static void refreshAppWorkers(List<Worker> workers) {
.collect(Collectors.groupingBy(WorkerDTO::getAppId));

ClusterContext.refreshAppWorkers(appWorkers);
log.info("Refresh app workers {}", appWorkers);
}

/**
* Online workers.
*
* @param appId appId
* @param appId appId
* @return Set
*/
public static Set<String> getOnlineWorkers(Long appId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,24 @@
* @since 1.0.0
*/
public class ClusterUtilTest {

@Test
public void testGetKnowServersByOnlyOne() {
Map<Long, Node> nodesMap = new HashMap<>(16);
Node currentNode = null;
Node node = new Node();
node.setStatus(1);
node.setServerId(1L);
node.setIp("127.0.0.1");
node.setAkkaAddress(String.format("127.0.0.1:%d", 1L));
nodesMap.put(1L, node);

currentNode = node;
List<Long> knowServers = ClusterUtil.getKnowServers(nodesMap, currentNode, 5);
List<Long> expect = new ArrayList<>();
Assertions.assertEquals(knowServers, expect);
}

@Test
public void testGetKnowServersByTwo() {
Map<Long, Node> nodesMap = new HashMap<>(16);
Expand Down
Loading

0 comments on commit cda2c5f

Please sign in to comment.