From 293a61ef071da403d608bc956f67fe5469936ae1 Mon Sep 17 00:00:00 2001 From: mike_xwm Date: Mon, 9 Dec 2024 15:12:32 +0800 Subject: [PATCH] [ISSUE #5137] update connector runtime v2 module (#5138) * [ISSUE #5137] update connector runtime v2 module * fix checkStyle error --- .../remote/request/ReportMonitorRequest.java | 38 +++ .../api/monitor/AbstractConnectorMonitor.java | 80 ++++++ .../openconnect/api/monitor/Monitor.java | 30 +++ .../api/monitor/MonitorRegistry.java | 34 +++ .../runtime/boot/RuntimeInstanceStarter.java | 1 - .../runtime/connector/ConnectorRuntime.java | 230 +++++++----------- .../runtime/service/health/HealthService.java | 112 +++++++++ .../service/monitor/MonitorService.java | 144 +++++++++++ .../runtime/service/monitor/SinkMonitor.java | 52 ++++ .../service/monitor/SourceMonitor.java | 47 ++++ .../runtime/service/status/StatusService.java | 94 +++++++ .../runtime/service/verify/VerifyService.java | 138 +++++++++++ .../eventmesh/runtime/util/RuntimeUtils.java | 13 + 13 files changed, 872 insertions(+), 141 deletions(-) create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportMonitorRequest.java create mode 100644 eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/AbstractConnectorMonitor.java create mode 100644 eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/Monitor.java create mode 100644 eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/MonitorRegistry.java create mode 100644 eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/health/HealthService.java create mode 100644 eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/MonitorService.java create mode 100644 eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SinkMonitor.java create mode 100644 eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SourceMonitor.java create mode 100644 eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/status/StatusService.java create mode 100644 eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/verify/VerifyService.java diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportMonitorRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportMonitorRequest.java new file mode 100644 index 0000000000..12278df27f --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportMonitorRequest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.request; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@Data +@EqualsAndHashCode(callSuper = true) +@ToString +public class ReportMonitorRequest extends BaseRemoteRequest { + private String taskID; + private String jobID; + private String address; + private String connectorStage; + private String transportType; + private long totalReqNum; + private long totalTimeCost; + private long maxTimeCost; + private long avgTimeCost; + private double tps; +} diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/AbstractConnectorMonitor.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/AbstractConnectorMonitor.java new file mode 100644 index 0000000000..b9205804a4 --- /dev/null +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/AbstractConnectorMonitor.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.openconnect.api.monitor; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Getter +public abstract class AbstractConnectorMonitor implements Monitor { + + private final String taskId; + private final String jobId; + private final String ip; + private final LongAdder totalRecordNum; + private final LongAdder totalTimeCost; + protected final AtomicLong startTime; + private final AtomicLong maxTimeCost; + private long averageTime = 0; + private double tps = 0; + + public AbstractConnectorMonitor(String taskId, String jobId, String ip) { + this.taskId = taskId; + this.jobId = jobId; + this.ip = ip; + this.totalRecordNum = new LongAdder(); + this.totalTimeCost = new LongAdder(); + this.startTime = new AtomicLong(System.currentTimeMillis()); + this.maxTimeCost = new AtomicLong(); + } + + @Override + public synchronized void recordProcess(long timeCost) { + totalRecordNum.increment(); + totalTimeCost.add(timeCost); + maxTimeCost.updateAndGet(max -> Math.max(max, timeCost)); + } + + @Override + public synchronized void recordProcess(int recordCount, long timeCost) { + totalRecordNum.add(recordCount); + totalTimeCost.add(timeCost); + maxTimeCost.updateAndGet(max -> Math.max(max, timeCost)); + } + + @Override + public synchronized void printMetrics() { + long totalRecords = totalRecordNum.sum(); + long totalCost = totalTimeCost.sum(); + averageTime = totalRecords > 0 ? totalCost / totalRecords : 0; + long elapsedTime = (System.currentTimeMillis() - startTime.get()) / 1000; // in seconds + tps = elapsedTime > 0 ? (double) totalRecords / elapsedTime : 0; + + log.info("========== Metrics =========="); + log.info("TaskId: {}|JobId: {}|ip: {}", taskId, jobId, ip); + log.info("Total records: {}", totalRecordNum); + log.info("Total time (ms): {}", totalTimeCost); + log.info("Max time per record (ms): {}", maxTimeCost); + log.info("Average time per record (ms): {}", averageTime); + log.info("TPS: {}", tps); + } +} diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/Monitor.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/Monitor.java new file mode 100644 index 0000000000..4d4d9efb0c --- /dev/null +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/Monitor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.openconnect.api.monitor; + +/** + * Monitor Interface. + * All monitors should implement this interface. + */ +public interface Monitor { + void recordProcess(long timeCost); + + void recordProcess(int recordCount, long timeCost); + + void printMetrics(); +} \ No newline at end of file diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/MonitorRegistry.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/MonitorRegistry.java new file mode 100644 index 0000000000..904efc5d3f --- /dev/null +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/monitor/MonitorRegistry.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.openconnect.api.monitor; + +import java.util.ArrayList; +import java.util.List; + +import lombok.Getter; + +public class MonitorRegistry { + + @Getter + private static final List monitors = new ArrayList<>(); + + public static void registerMonitor(Monitor monitor) { + monitors.add(monitor); + } + +} diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstanceStarter.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstanceStarter.java index 42745c8dd7..0881521879 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstanceStarter.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstanceStarter.java @@ -40,7 +40,6 @@ public static void main(String[] args) { long start = System.currentTimeMillis(); runtimeInstance.shutdown(); long end = System.currentTimeMillis(); - log.info("runtime shutdown cost {}ms", end - start); } catch (Exception e) { log.error("exception when shutdown {}", e.getMessage(), e); diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java index 3d3c864b58..92e78256ec 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java @@ -33,9 +33,6 @@ import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; import org.apache.eventmesh.common.remote.JobState; import org.apache.eventmesh.common.remote.request.FetchJobRequest; -import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; -import org.apache.eventmesh.common.remote.request.ReportJobRequest; -import org.apache.eventmesh.common.remote.request.ReportVerifyRequest; import org.apache.eventmesh.common.remote.response.FetchJobResponse; import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.JsonUtils; @@ -57,33 +54,34 @@ import org.apache.eventmesh.openconnect.util.ConfigUtil; import org.apache.eventmesh.runtime.Runtime; import org.apache.eventmesh.runtime.RuntimeInstanceConfig; +import org.apache.eventmesh.runtime.service.health.HealthService; +import org.apache.eventmesh.runtime.service.monitor.MonitorService; +import org.apache.eventmesh.runtime.service.monitor.SinkMonitor; +import org.apache.eventmesh.runtime.service.monitor.SourceMonitor; +import org.apache.eventmesh.runtime.service.status.StatusService; +import org.apache.eventmesh.runtime.service.verify.VerifyService; +import org.apache.eventmesh.runtime.util.RuntimeUtils; import org.apache.eventmesh.spi.EventMeshExtensionFactory; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import io.grpc.stub.StreamObserver; import com.google.protobuf.Any; import com.google.protobuf.UnsafeByteOperations; @@ -103,10 +101,6 @@ public class ConnectorRuntime implements Runtime { private AdminServiceBlockingStub adminServiceBlockingStub; - StreamObserver responseObserver; - - StreamObserver requestObserver; - private Source sourceConnector; private Sink sinkConnector; @@ -129,9 +123,6 @@ public class ConnectorRuntime implements Runtime { private final ExecutorService sinkService = ThreadPoolFactory.createSingleExecutor("eventMesh-sinkService"); - private final ScheduledExecutorService heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(); - - private final ExecutorService reportVerifyExecutor = Executors.newSingleThreadExecutor(); private final BlockingQueue queue; @@ -143,6 +134,18 @@ public class ConnectorRuntime implements Runtime { private String adminServerAddr; + private HealthService healthService; + + private MonitorService monitorService; + + private SourceMonitor sourceMonitor; + + private SinkMonitor sinkMonitor; + + private VerifyService verifyService; + + private StatusService statusService; + public ConnectorRuntime(RuntimeInstanceConfig runtimeInstanceConfig) { this.runtimeInstanceConfig = runtimeInstanceConfig; @@ -156,46 +159,31 @@ public void init() throws Exception { initStorageService(); + initStatusService(); + initConnectorService(); + + initMonitorService(); + + initHealthService(); + + initVerfiyService(); + } private void initAdminService() { - adminServerAddr = getRandomAdminServerAddr(runtimeInstanceConfig.getAdminServiceAddr()); + adminServerAddr = RuntimeUtils.getRandomAdminServerAddr(runtimeInstanceConfig.getAdminServiceAddr()); // create gRPC channel - channel = ManagedChannelBuilder.forTarget(adminServerAddr).usePlaintext().build(); + channel = ManagedChannelBuilder.forTarget(adminServerAddr) + .usePlaintext() + .enableRetry() + .maxRetryAttempts(3) + .build(); adminServiceStub = AdminServiceGrpc.newStub(channel).withWaitForReady(); adminServiceBlockingStub = AdminServiceGrpc.newBlockingStub(channel).withWaitForReady(); - responseObserver = new StreamObserver() { - @Override - public void onNext(Payload response) { - log.info("runtime receive message: {} ", response); - } - - @Override - public void onError(Throwable t) { - log.error("runtime receive error message: {}", t.getMessage()); - } - - @Override - public void onCompleted() { - log.info("runtime finished receive message and completed"); - } - }; - - requestObserver = adminServiceStub.invokeBiStream(responseObserver); - } - - private String getRandomAdminServerAddr(String adminServerAddrList) { - String[] addresses = adminServerAddrList.split(";"); - if (addresses.length == 0) { - throw new IllegalArgumentException("Admin server address list is empty"); - } - Random random = new Random(); - int randomIndex = random.nextInt(addresses.length); - return addresses[randomIndex]; } private void initStorageService() { @@ -206,11 +194,16 @@ private void initStorageService() { } + private void initStatusService() { + statusService = new StatusService(adminServiceStub, adminServiceBlockingStub); + } + private void initConnectorService() throws Exception { connectorRuntimeConfig = ConfigService.getInstance().buildConfigInstance(ConnectorRuntimeConfig.class); FetchJobResponse jobResponse = fetchJobConfig(); + log.info("fetch job config from admin server: {}", JsonUtils.toJSONString(jobResponse)); if (jobResponse == null) { isFailed = true; @@ -271,7 +264,7 @@ private void initConnectorService() throws Exception { sinkConnectorContext.setJobType(jobResponse.getType()); sinkConnector.init(sinkConnectorContext); - reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.INIT); + statusService.reportJobStatus(connectorRuntimeConfig.getJobID(), JobState.INIT); } @@ -292,27 +285,31 @@ private FetchJobResponse fetchJobConfig() { return null; } - @Override - public void start() throws Exception { + private void initMonitorService() { + monitorService = new MonitorService(adminServiceStub, adminServiceBlockingStub); + sourceMonitor = new SourceMonitor(connectorRuntimeConfig.getTaskID(), connectorRuntimeConfig.getJobID(), IPUtils.getLocalAddress()); + monitorService.registerMonitor(sourceMonitor); + sinkMonitor = new SinkMonitor(connectorRuntimeConfig.getTaskID(), connectorRuntimeConfig.getJobID(), IPUtils.getLocalAddress()); + monitorService.registerMonitor(sinkMonitor); + } - heartBeatExecutor.scheduleAtFixedRate(() -> { + private void initHealthService() { + healthService = new HealthService(adminServiceStub, adminServiceBlockingStub, connectorRuntimeConfig); + } - ReportHeartBeatRequest heartBeat = new ReportHeartBeatRequest(); - heartBeat.setAddress(IPUtils.getLocalAddress()); - heartBeat.setReportedTimeStamp(String.valueOf(System.currentTimeMillis())); - heartBeat.setJobID(connectorRuntimeConfig.getJobID()); + private void initVerfiyService() { + verifyService = new VerifyService(adminServiceStub, adminServiceBlockingStub, connectorRuntimeConfig); + } - Metadata metadata = Metadata.newBuilder().setType(ReportHeartBeatRequest.class.getSimpleName()).build(); + @Override + public void start() throws Exception { + // start offsetMgmtService + offsetManagementService.start(); - Payload request = Payload.newBuilder().setMetadata(metadata) - .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(heartBeat)))).build()) - .build(); + monitorService.start(); - requestObserver.onNext(request); - }, 5, 5, TimeUnit.SECONDS); + healthService.start(); - // start offsetMgmtService - offsetManagementService.start(); isRunning = true; // start sinkService sinkService.execute(() -> { @@ -320,32 +317,34 @@ public void start() throws Exception { startSinkConnector(); } catch (Exception e) { isFailed = true; - log.error("sink connector [{}] start fail", sinkConnector.name(), e); + log.error("sink connector start fail", e.getStackTrace()); try { this.stop(); } catch (Exception ex) { log.error("Failed to stop after exception", ex); } - throw new RuntimeException(e); + } finally { + System.exit(-1); } }); - // start + // start sourceService sourceService.execute(() -> { try { startSourceConnector(); } catch (Exception e) { isFailed = true; - log.error("source connector [{}] start fail", sourceConnector.name(), e); + log.error("source connector start fail", e); try { this.stop(); } catch (Exception ex) { log.error("Failed to stop after exception", ex); } - throw new RuntimeException(e); + } finally { + System.exit(-1); } }); - reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.RUNNING); + statusService.reportJobStatus(connectorRuntimeConfig.getJobID(), JobState.RUNNING); } @Override @@ -353,26 +352,30 @@ public void stop() throws Exception { log.info("ConnectorRuntime start stop"); isRunning = false; if (isFailed) { - reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.FAIL); + statusService.reportJobStatus(connectorRuntimeConfig.getJobID(), JobState.FAIL); } else { - reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.COMPLETE); + statusService.reportJobStatus(connectorRuntimeConfig.getJobID(), JobState.COMPLETE); } sourceConnector.stop(); sinkConnector.stop(); + monitorService.stop(); + healthService.stop(); sourceService.shutdown(); sinkService.shutdown(); - heartBeatExecutor.shutdown(); - reportVerifyExecutor.shutdown(); - requestObserver.onCompleted(); + verifyService.stop(); + statusService.stop(); if (channel != null && !channel.isShutdown()) { - channel.shutdown(); + channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); } + log.info("ConnectorRuntime stopped"); } private void startSourceConnector() throws Exception { sourceConnector.start(); while (isRunning) { + long sourceStartTime = System.currentTimeMillis(); List connectorRecordList = sourceConnector.poll(); + long sinkStartTime = System.currentTimeMillis(); // TODO: use producer pub record to storage replace below if (connectorRecordList != null && !connectorRecordList.isEmpty()) { for (ConnectRecord record : connectorRecordList) { @@ -381,19 +384,14 @@ private void startSourceConnector() throws Exception { record.addExtension("recordUniqueId", record.getRecordId()); } - queue.put(record); - - // if enabled incremental data reporting consistency check - if (connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) { - reportVerifyRequest(record, connectorRuntimeConfig, ConnectorStage.SOURCE); - } - // set a callback for this record // if used the memory storage callback will be triggered after sink put success record.setCallback(new SendMessageCallback() { @Override public void onSuccess(SendResult result) { log.debug("send record to sink callback success, record: {}", record); + long sinkEndTime = System.currentTimeMillis(); + sinkMonitor.recordProcess(sinkEndTime - sinkStartTime); // commit record sourceConnector.commit(record); if (record.getPosition() != null) { @@ -424,6 +422,16 @@ public void onException(SendExceptionContext sendExceptionContext) { } } }); + + queue.put(record); + long sourceEndTime = System.currentTimeMillis(); + sourceMonitor.recordProcess(sourceEndTime - sourceStartTime); + + // if enabled incremental data reporting consistency check + if (connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) { + verifyService.reportVerifyRequest(record, ConnectorStage.SOURCE); + } + } } } @@ -438,64 +446,6 @@ private SendResult convertToSendResult(ConnectRecord record) { return result; } - private void reportVerifyRequest(ConnectRecord record, ConnectorRuntimeConfig connectorRuntimeConfig, ConnectorStage connectorStage) { - reportVerifyExecutor.submit(() -> { - try { - // use record data + recordUniqueId for md5 - String md5Str = md5(record.getData().toString() + record.getExtension("recordUniqueId")); - ReportVerifyRequest reportVerifyRequest = new ReportVerifyRequest(); - reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID()); - reportVerifyRequest.setJobID(connectorRuntimeConfig.getJobID()); - reportVerifyRequest.setRecordID(record.getRecordId()); - reportVerifyRequest.setRecordSig(md5Str); - reportVerifyRequest.setConnectorName( - IPUtils.getLocalAddress() + "_" + connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion()); - reportVerifyRequest.setConnectorStage(connectorStage.name()); - reportVerifyRequest.setPosition(JsonUtils.toJSONString(record.getPosition())); - - Metadata metadata = Metadata.newBuilder().setType(ReportVerifyRequest.class.getSimpleName()).build(); - - Payload request = Payload.newBuilder().setMetadata(metadata) - .setBody( - Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportVerifyRequest)))) - .build()) - .build(); - - requestObserver.onNext(request); - } catch (Exception e) { - log.error("Failed to report verify request", e); - } - }); - } - - private void reportJobRequest(String jobId, JobState jobState) throws InterruptedException { - ReportJobRequest reportJobRequest = new ReportJobRequest(); - reportJobRequest.setJobID(jobId); - reportJobRequest.setState(jobState); - Metadata metadata = Metadata.newBuilder() - .setType(ReportJobRequest.class.getSimpleName()) - .build(); - Payload payload = Payload.newBuilder() - .setMetadata(metadata) - .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportJobRequest)))) - .build()) - .build(); - requestObserver.onNext(payload); - } - - private String md5(String input) { - try { - MessageDigest md = MessageDigest.getInstance("MD5"); - byte[] messageDigest = md.digest(input.getBytes()); - StringBuilder sb = new StringBuilder(); - for (byte b : messageDigest) { - sb.append(String.format("%02x", b)); - } - return sb.toString(); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException(e); - } - } public Optional prepareToUpdateRecordOffset(ConnectRecord record) { return Optional.of(this.offsetManagement.submitRecord(record.getPosition())); @@ -589,7 +539,7 @@ private void startSinkConnector() throws Exception { sinkConnector.put(connectRecordList); // if enabled incremental data reporting consistency check if (connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) { - reportVerifyRequest(connectRecord, connectorRuntimeConfig, ConnectorStage.SINK); + verifyService.reportVerifyRequest(connectRecord, ConnectorStage.SINK); } } } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/health/HealthService.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/health/HealthService.java new file mode 100644 index 0000000000..54f924874b --- /dev/null +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/health/HealthService.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.service.health; + +import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; +import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; +import org.apache.eventmesh.common.utils.IPUtils; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.runtime.connector.ConnectorRuntimeConfig; + +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import io.grpc.stub.StreamObserver; + +import com.google.protobuf.Any; +import com.google.protobuf.UnsafeByteOperations; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class HealthService { + + private final ScheduledExecutorService scheduler; + + private StreamObserver requestObserver; + + private StreamObserver responseObserver; + + private AdminServiceGrpc.AdminServiceStub adminServiceStub; + + private AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub; + + private ConnectorRuntimeConfig connectorRuntimeConfig; + + + public HealthService(AdminServiceGrpc.AdminServiceStub adminServiceStub, AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub, + ConnectorRuntimeConfig connectorRuntimeConfig) { + this.adminServiceStub = adminServiceStub; + this.adminServiceBlockingStub = adminServiceBlockingStub; + this.connectorRuntimeConfig = connectorRuntimeConfig; + + this.scheduler = Executors.newSingleThreadScheduledExecutor(); + + responseObserver = new StreamObserver() { + @Override + public void onNext(Payload response) { + log.debug("health service receive message: {}|{} ", response.getMetadata(), response.getBody()); + } + + @Override + public void onError(Throwable t) { + log.error("health service receive error message: {}", t.getMessage()); + } + + @Override + public void onCompleted() { + log.info("health service finished receive message and completed"); + } + }; + requestObserver = this.adminServiceStub.invokeBiStream(responseObserver); + } + + public void start() { + this.healthReport(); + } + + public void healthReport() { + scheduler.scheduleAtFixedRate(() -> { + ReportHeartBeatRequest heartBeat = new ReportHeartBeatRequest(); + heartBeat.setAddress(IPUtils.getLocalAddress()); + heartBeat.setReportedTimeStamp(String.valueOf(System.currentTimeMillis())); + heartBeat.setJobID(connectorRuntimeConfig.getJobID()); + + Metadata metadata = Metadata.newBuilder().setType(ReportHeartBeatRequest.class.getSimpleName()).build(); + + Payload request = Payload.newBuilder().setMetadata(metadata) + .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(heartBeat)))).build()) + .build(); + + requestObserver.onNext(request); + }, 5, 5, TimeUnit.SECONDS); + } + + + public void stop() { + scheduler.shutdown(); + if (requestObserver != null) { + requestObserver.onCompleted(); + } + } + +} diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/MonitorService.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/MonitorService.java new file mode 100644 index 0000000000..f5af7596c3 --- /dev/null +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/MonitorService.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.service.monitor; + +import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; +import org.apache.eventmesh.common.remote.request.ReportMonitorRequest; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.openconnect.api.monitor.Monitor; +import org.apache.eventmesh.openconnect.api.monitor.MonitorRegistry; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import io.grpc.stub.StreamObserver; + +import com.google.protobuf.Any; +import com.google.protobuf.UnsafeByteOperations; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MonitorService { + + private final ScheduledExecutorService scheduler; + + private StreamObserver requestObserver; + + private StreamObserver responseObserver; + + private AdminServiceGrpc.AdminServiceStub adminServiceStub; + + private AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub; + + + public MonitorService(AdminServiceGrpc.AdminServiceStub adminServiceStub, AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub) { + this.adminServiceStub = adminServiceStub; + this.adminServiceBlockingStub = adminServiceBlockingStub; + + this.scheduler = Executors.newSingleThreadScheduledExecutor(); + + responseObserver = new StreamObserver() { + @Override + public void onNext(Payload response) { + log.debug("monitor service receive message: {}|{} ", response.getMetadata(), response.getBody()); + } + + @Override + public void onError(Throwable t) { + log.error("monitor service receive error message: {}", t.getMessage()); + } + + @Override + public void onCompleted() { + log.info("monitor service finished receive message and completed"); + } + }; + requestObserver = this.adminServiceStub.invokeBiStream(responseObserver); + } + + public void registerMonitor(Monitor monitor) { + MonitorRegistry.registerMonitor(monitor); + } + + public void start() { + this.startReporting(); + } + + public void startReporting() { + scheduler.scheduleAtFixedRate(() -> { + List monitors = MonitorRegistry.getMonitors(); + for (Monitor monitor : monitors) { + monitor.printMetrics(); + reportToAdminService(monitor); + } + }, 5, 30, TimeUnit.SECONDS); + } + + private void reportToAdminService(Monitor monitor) { + ReportMonitorRequest request = new ReportMonitorRequest(); + if (monitor instanceof SourceMonitor) { + SourceMonitor sourceMonitor = (SourceMonitor) monitor; + request.setTaskID(sourceMonitor.getTaskId()); + request.setJobID(sourceMonitor.getJobId()); + request.setAddress(sourceMonitor.getIp()); + request.setConnectorStage(sourceMonitor.getConnectorStage()); + request.setTotalReqNum(sourceMonitor.getTotalRecordNum().longValue()); + request.setTotalTimeCost(sourceMonitor.getTotalTimeCost().longValue()); + request.setMaxTimeCost(sourceMonitor.getMaxTimeCost().longValue()); + request.setAvgTimeCost(sourceMonitor.getAverageTime()); + request.setTps(sourceMonitor.getTps()); + } else if (monitor instanceof SinkMonitor) { + SinkMonitor sinkMonitor = (SinkMonitor) monitor; + request.setTaskID(sinkMonitor.getTaskId()); + request.setJobID(sinkMonitor.getJobId()); + request.setAddress(sinkMonitor.getIp()); + request.setConnectorStage(sinkMonitor.getConnectorStage()); + request.setTotalReqNum(sinkMonitor.getTotalRecordNum().longValue()); + request.setTotalTimeCost(sinkMonitor.getTotalTimeCost().longValue()); + request.setMaxTimeCost(sinkMonitor.getMaxTimeCost().longValue()); + request.setAvgTimeCost(sinkMonitor.getAverageTime()); + request.setTps(sinkMonitor.getTps()); + } else { + throw new IllegalArgumentException("Unsupported monitor: " + monitor); + } + + Metadata metadata = Metadata.newBuilder() + .setType(ReportMonitorRequest.class.getSimpleName()) + .build(); + Payload payload = Payload.newBuilder() + .setMetadata(metadata) + .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(request)))) + .build()) + .build(); + requestObserver.onNext(payload); + } + + public void stop() { + scheduler.shutdown(); + if (requestObserver != null) { + requestObserver.onCompleted(); + } + } + +} diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SinkMonitor.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SinkMonitor.java new file mode 100644 index 0000000000..b27b44da7c --- /dev/null +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SinkMonitor.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.service.monitor; + +import org.apache.eventmesh.common.enums.ConnectorStage; +import org.apache.eventmesh.openconnect.api.monitor.AbstractConnectorMonitor; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Getter +@Setter +public class SinkMonitor extends AbstractConnectorMonitor { + + private String connectorStage = ConnectorStage.SINK.name(); + + public SinkMonitor(String taskId, String jobId, String ip) { + super(taskId, jobId, ip); + } + + @Override + public void recordProcess(long timeCost) { + super.recordProcess(timeCost); + } + + @Override + public void recordProcess(int recordCount, long timeCost) { + super.recordProcess(recordCount, timeCost); + } + + @Override + public void printMetrics() { + super.printMetrics(); + } +} diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SourceMonitor.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SourceMonitor.java new file mode 100644 index 0000000000..3895c8df14 --- /dev/null +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/monitor/SourceMonitor.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.service.monitor; + +import org.apache.eventmesh.common.enums.ConnectorStage; +import org.apache.eventmesh.openconnect.api.monitor.AbstractConnectorMonitor; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Getter +@Setter +public class SourceMonitor extends AbstractConnectorMonitor { + + private String connectorStage = ConnectorStage.SOURCE.name(); + + public SourceMonitor(String taskId, String jobId, String ip) { + super(taskId, jobId, ip); + } + + @Override + public void recordProcess(int recordCount, long timeCost) { + super.recordProcess(recordCount, timeCost); + } + + @Override + public void printMetrics() { + super.printMetrics(); + } +} diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/status/StatusService.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/status/StatusService.java new file mode 100644 index 0000000000..e40686f575 --- /dev/null +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/status/StatusService.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.service.status; + +import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; +import org.apache.eventmesh.common.remote.JobState; +import org.apache.eventmesh.common.remote.request.ReportJobRequest; +import org.apache.eventmesh.common.utils.IPUtils; +import org.apache.eventmesh.common.utils.JsonUtils; + +import java.util.Objects; + +import io.grpc.stub.StreamObserver; + +import com.google.protobuf.Any; +import com.google.protobuf.UnsafeByteOperations; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class StatusService { + + private StreamObserver requestObserver; + + private StreamObserver responseObserver; + + private AdminServiceGrpc.AdminServiceStub adminServiceStub; + + private AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub; + + + public StatusService(AdminServiceGrpc.AdminServiceStub adminServiceStub, AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub) { + this.adminServiceStub = adminServiceStub; + this.adminServiceBlockingStub = adminServiceBlockingStub; + + responseObserver = new StreamObserver() { + @Override + public void onNext(Payload response) { + log.debug("health service receive message: {}|{} ", response.getMetadata(), response.getBody()); + } + + @Override + public void onError(Throwable t) { + log.error("health service receive error message: {}", t.getMessage()); + } + + @Override + public void onCompleted() { + log.info("health service finished receive message and completed"); + } + }; + requestObserver = this.adminServiceStub.invokeBiStream(responseObserver); + } + + public void reportJobStatus(String jobId, JobState jobState) { + ReportJobRequest reportJobRequest = new ReportJobRequest(); + reportJobRequest.setJobID(jobId); + reportJobRequest.setState(jobState); + reportJobRequest.setAddress(IPUtils.getLocalAddress()); + Metadata metadata = Metadata.newBuilder() + .setType(ReportJobRequest.class.getSimpleName()) + .build(); + Payload payload = Payload.newBuilder() + .setMetadata(metadata) + .setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportJobRequest)))) + .build()) + .build(); + log.info("report job state request: {}", JsonUtils.toJSONString(reportJobRequest)); + requestObserver.onNext(payload); + } + + public void stop() { + if (requestObserver != null) { + requestObserver.onCompleted(); + } + } +} diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/verify/VerifyService.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/verify/VerifyService.java new file mode 100644 index 0000000000..8bcb72199c --- /dev/null +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/service/verify/VerifyService.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.service.verify; + +import org.apache.eventmesh.common.enums.ConnectorStage; +import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; +import org.apache.eventmesh.common.remote.request.ReportVerifyRequest; +import org.apache.eventmesh.common.utils.IPUtils; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.eventmesh.runtime.connector.ConnectorRuntimeConfig; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import io.grpc.stub.StreamObserver; + +import com.google.protobuf.Any; +import com.google.protobuf.UnsafeByteOperations; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class VerifyService { + + private final ExecutorService reportVerifyExecutor; + + private StreamObserver requestObserver; + + private StreamObserver responseObserver; + + private AdminServiceGrpc.AdminServiceStub adminServiceStub; + + private AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub; + + private ConnectorRuntimeConfig connectorRuntimeConfig; + + + public VerifyService(AdminServiceGrpc.AdminServiceStub adminServiceStub, AdminServiceGrpc.AdminServiceBlockingStub adminServiceBlockingStub, + ConnectorRuntimeConfig connectorRuntimeConfig) { + this.adminServiceStub = adminServiceStub; + this.adminServiceBlockingStub = adminServiceBlockingStub; + this.connectorRuntimeConfig = connectorRuntimeConfig; + + this.reportVerifyExecutor = Executors.newSingleThreadExecutor(); + + responseObserver = new StreamObserver() { + @Override + public void onNext(Payload response) { + log.debug("verify service receive message: {}|{} ", response.getMetadata(), response.getBody()); + } + + @Override + public void onError(Throwable t) { + log.error("verify service receive error message: {}", t.getMessage()); + } + + @Override + public void onCompleted() { + log.info("verify service finished receive message and completed"); + } + }; + requestObserver = this.adminServiceStub.invokeBiStream(responseObserver); + } + + public void reportVerifyRequest(ConnectRecord record, ConnectorStage connectorStage) { + reportVerifyExecutor.submit(() -> { + try { + byte[] data = (byte[]) record.getData(); + // use record data + recordUniqueId for md5 + String md5Str = md5(Arrays.toString(data) + record.getExtension("recordUniqueId")); + ReportVerifyRequest reportVerifyRequest = new ReportVerifyRequest(); + reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID()); + reportVerifyRequest.setJobID(connectorRuntimeConfig.getJobID()); + reportVerifyRequest.setRecordID(record.getExtension("recordUniqueId")); + reportVerifyRequest.setRecordSig(md5Str); + reportVerifyRequest.setConnectorName( + IPUtils.getLocalAddress() + "_" + connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion()); + reportVerifyRequest.setConnectorStage(connectorStage.name()); + reportVerifyRequest.setPosition(JsonUtils.toJSONString(record.getPosition())); + + Metadata metadata = Metadata.newBuilder().setType(ReportVerifyRequest.class.getSimpleName()).build(); + + Payload request = Payload.newBuilder().setMetadata(metadata) + .setBody( + Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportVerifyRequest)))) + .build()) + .build(); + requestObserver.onNext(request); + } catch (Exception e) { + log.error("Failed to report verify request", e); + } + }); + } + + private String md5(String input) { + try { + MessageDigest md = MessageDigest.getInstance("MD5"); + byte[] messageDigest = md.digest(input.getBytes()); + StringBuilder sb = new StringBuilder(); + for (byte b : messageDigest) { + sb.append(String.format("%02x", b)); + } + return sb.toString(); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } + + public void stop() { + reportVerifyExecutor.shutdown(); + if (requestObserver != null) { + requestObserver.onCompleted(); + } + } + +} diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java index e389357d93..844a9638a3 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/util/RuntimeUtils.java @@ -17,5 +17,18 @@ package org.apache.eventmesh.runtime.util; +import java.util.Random; + public class RuntimeUtils { + + public static String getRandomAdminServerAddr(String adminServerAddrList) { + String[] addresses = adminServerAddrList.split(";"); + if (addresses.length == 0) { + throw new IllegalArgumentException("Admin server address list is empty"); + } + Random random = new Random(); + int randomIndex = random.nextInt(addresses.length); + return addresses[randomIndex]; + } + }