Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #5101] Define and standardize some common configurations for all Sources #5102

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,21 @@ public class Constants {
public static final int DEFAULT_ATTEMPT = 3;

public static final int DEFAULT_PORT = 8080;

// ======================== Source Constants ========================
/**
* Default capacity
*/
public static final int DEFAULT_CAPACITY = 1024;

/**
* Default poll batch size
*/
public static final int DEFAULT_POLL_BATCH_SIZE = 10;

/**
* Default poll timeout (unit: ms)
*/
public static final long DEFAULT_POLL_TIMEOUT = 5000L;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.config.connector;

import lombok.Data;

/**
* Source Poll Config
*/
@Data
public class PollConfig {

/**
* Capacity of the poll queue
*/
private int capacity = Constants.DEFAULT_CAPACITY;

/**
* Max batch size of the poll
*/
private int maxBatchSize = Constants.DEFAULT_POLL_BATCH_SIZE;

/**
* Max wait time of the poll
*/
private long maxWaitTime = Constants.DEFAULT_POLL_TIMEOUT;

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,7 @@ public abstract class SourceConfig extends Config {

private OffsetStorageConfig offsetStorageConfig;

// Polling configuration, e.g. capacity, batch size, wait time, etc.
private PollConfig pollConfig = new PollConfig();

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,7 @@ public class SourceConnectorConfig {
*/
private int maxFormAttributeSize = 1024 * 1024;

// max size of the queue, default 1000
private int maxStorageSize = 1000;

// batch size, default 10
private int batchSize = 10;

// protocol, default CloudEvent
// protocol, default Common
private String protocol = "Common";

// extra config, e.g. GitHub secret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,4 @@ public class SourceConnectorConfig {
private String enableAutoCommit = "false";
private String sessionTimeoutMS = "10000";
private String maxPollRecords = "1000";
private int pollTimeOut = 100;
}
cnzakii marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@

@Slf4j
public class CanalSourceCheckConnector extends AbstractComponent implements Source, ConnectorCreateService<Source> {

private CanalSourceFullConfig config;
private CanalFullPositionMgr positionMgr;
private RdbTableMgr tableMgr;
private ThreadPoolExecutor executor;
private final BlockingQueue<List<ConnectRecord>> queue = new LinkedBlockingQueue<>();
private BlockingQueue<List<ConnectRecord>> queue;
private final AtomicBoolean flag = new AtomicBoolean(true);
private long maxPollWaitTime;

@Override
protected void run() throws Exception {
Expand Down Expand Up @@ -140,6 +142,8 @@ private void init() {
DatabaseConnection.initSourceConnection();
this.tableMgr = new RdbTableMgr(config.getSourceConnectorConfig(), DatabaseConnection.sourceDataSource);
this.positionMgr = new CanalFullPositionMgr(config, tableMgr);
this.maxPollWaitTime = config.getPollConfig().getMaxWaitTime();
this.queue = new LinkedBlockingQueue<>(config.getPollConfig().getCapacity());
}

@Override
Expand Down Expand Up @@ -168,7 +172,7 @@ public void onException(ConnectRecord record) {
public List<ConnectRecord> poll() {
while (flag.get()) {
try {
List<ConnectRecord> records = queue.poll(5, TimeUnit.SECONDS);
List<ConnectRecord> records = queue.poll(maxPollWaitTime, TimeUnit.MILLISECONDS);
if (records == null || records.isEmpty()) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ public class CanalSourceFullConnector extends AbstractComponent implements Sourc
private CanalFullPositionMgr positionMgr;
private RdbTableMgr tableMgr;
private ThreadPoolExecutor executor;
private final BlockingQueue<List<ConnectRecord>> queue = new LinkedBlockingQueue<>();
private BlockingQueue<List<ConnectRecord>> queue;
private final AtomicBoolean flag = new AtomicBoolean(true);
private long maxPollWaitTime;

@Override
protected void run() throws Exception {
Expand Down Expand Up @@ -137,6 +138,8 @@ private void init() {
DatabaseConnection.initSourceConnection();
this.tableMgr = new RdbTableMgr(config.getSourceConnectorConfig(), DatabaseConnection.sourceDataSource);
this.positionMgr = new CanalFullPositionMgr(config, tableMgr);
this.maxPollWaitTime = config.getPollConfig().getMaxWaitTime();
this.queue = new LinkedBlockingQueue<>(config.getPollConfig().getCapacity());
}

@Override
Expand Down Expand Up @@ -166,7 +169,7 @@ public void onException(ConnectRecord record) {
public List<ConnectRecord> poll() {
while (flag.get()) {
try {
List<ConnectRecord> records = queue.poll(5, TimeUnit.SECONDS);
List<ConnectRecord> records = queue.poll(maxPollWaitTime, TimeUnit.MILLISECONDS);
if (records == null || records.isEmpty()) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@
@Slf4j
public class ChatGPTSourceConnector implements Source {

private static final int DEFAULT_BATCH_SIZE = 10;

private ChatGPTSourceConfig sourceConfig;
private BlockingQueue<CloudEvent> queue;
private HttpServer server;
Expand All @@ -79,6 +77,9 @@ public class ChatGPTSourceConnector implements Source {
private static final String APPLICATION_JSON = "application/json";
private static final String TEXT_PLAIN = "text/plain";

private int maxBatchSize;
private long maxPollWaitTime;


@Override
public Class<? extends Config> configClass() {
Expand Down Expand Up @@ -129,7 +130,9 @@ private void doInit() {
if (StringUtils.isNotEmpty(parsePromptTemplateStr)) {
this.parseHandler = new ParseHandler(openaiManager, parsePromptTemplateStr);
}
this.queue = new LinkedBlockingQueue<>(1024);
this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize();
this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime();
this.queue = new LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());
final Vertx vertx = Vertx.vertx();
final Router router = Router.router(vertx);
router.route().path(this.sourceConfig.connectorConfig.getPath()).method(HttpMethod.POST).handler(BodyHandler.create()).handler(ctx -> {
Expand Down Expand Up @@ -239,14 +242,21 @@ public void stop() {

@Override
public List<ConnectRecord> poll() {
List<ConnectRecord> connectRecords = new ArrayList<>(DEFAULT_BATCH_SIZE);
for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
long startTime = System.currentTimeMillis();
long remainingTime = maxPollWaitTime;

List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
for (int i = 0; i < maxBatchSize; i++) {
try {
CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
CloudEvent event = queue.poll(remainingTime, TimeUnit.MILLISECONDS);
if (event == null) {
break;
}
connectRecords.add(CloudEventUtil.convertEventToRecord(event));

// calculate elapsed time and update remaining time for next poll
long elapsedTime = System.currentTimeMillis() - startTime;
remainingTime = maxPollWaitTime > elapsedTime ? maxPollWaitTime - elapsedTime : 0;
} catch (InterruptedException e) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ public synchronized List<E> fetchRange(int start, int end, boolean removed) {
count++;
}
return items;

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.config.connector.http.HttpSourceConfig;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
import org.apache.eventmesh.connector.http.source.protocol.Protocol;
import org.apache.eventmesh.connector.http.source.protocol.ProtocolFactory;
import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
Expand All @@ -30,8 +29,9 @@
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import io.netty.handler.codec.http.HttpResponseStatus;
Expand All @@ -50,9 +50,11 @@ public class HttpSourceConnector implements Source, ConnectorCreateService<Sourc

private HttpSourceConfig sourceConfig;

private SynchronizedCircularFifoQueue<Object> queue;
private BlockingQueue<Object> queue;

private int batchSize;
private int maxBatchSize;

private long maxPollWaitTime;

private Route route;

Expand Down Expand Up @@ -92,11 +94,11 @@ public void init(ConnectorContext connectorContext) {

private void doInit() {
// init queue
int maxQueueSize = this.sourceConfig.getConnectorConfig().getMaxStorageSize();
this.queue = new SynchronizedCircularFifoQueue<>(maxQueueSize);
this.queue = new LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity());

// init batch size
this.batchSize = this.sourceConfig.getConnectorConfig().getBatchSize();
// init poll batch size and timeout
this.maxBatchSize = this.sourceConfig.getPollConfig().getMaxBatchSize();
this.maxPollWaitTime = this.sourceConfig.getPollConfig().getMaxWaitTime();

// init protocol
String protocolName = this.sourceConfig.getConnectorConfig().getProtocol();
Expand Down Expand Up @@ -183,20 +185,29 @@ public void stop() {

@Override
public List<ConnectRecord> poll() {
// if queue is empty, return empty list
if (queue.isEmpty()) {
return Collections.emptyList();
}
// record current time
long startTime = System.currentTimeMillis();
long remainingTime = maxPollWaitTime;

// poll from queue
List<ConnectRecord> connectRecords = new ArrayList<>(batchSize);
for (int i = 0; i < batchSize; i++) {
Object obj = queue.poll();
if (obj == null) {
List<ConnectRecord> connectRecords = new ArrayList<>(maxBatchSize);
for (int i = 0; i < maxBatchSize; i++) {
try {
Object obj = queue.poll(remainingTime, TimeUnit.MILLISECONDS);
if (obj == null) {
break;
}
// convert to ConnectRecord
ConnectRecord connectRecord = protocol.convertToConnectRecord(obj);
connectRecords.add(connectRecord);

// calculate elapsed time and update remaining time for next poll
long elapsedTime = System.currentTimeMillis() - startTime;
remainingTime = maxPollWaitTime > elapsedTime ? maxPollWaitTime - elapsedTime : 0;
} catch (Exception e) {
log.error("Failed to poll from queue.", e);
break;
}
// convert to ConnectRecord
ConnectRecord connectRecord = protocol.convertToConnectRecord(obj);
connectRecords.add(connectRecord);
}
return connectRecords;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package org.apache.eventmesh.connector.http.source.protocol;

import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import java.util.concurrent.BlockingQueue;

import io.vertx.ext.web.Route;


Expand All @@ -45,7 +46,7 @@ public interface Protocol {
* @param route route
* @param queue queue info
*/
void setHandler(Route route, SynchronizedCircularFifoQueue<Object> queue);
void setHandler(Route route, BlockingQueue<Object> queue);


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.eventmesh.connector.http.source.protocol.impl;

import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
import org.apache.eventmesh.connector.http.source.data.CommonResponse;
import org.apache.eventmesh.connector.http.source.protocol.Protocol;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.util.CloudEventUtil;

import java.util.concurrent.BlockingQueue;

import io.cloudevents.CloudEvent;
import io.cloudevents.http.vertx.VertxMessageFactory;
import io.netty.handler.codec.http.HttpResponseStatus;
Expand Down Expand Up @@ -60,7 +61,7 @@ public void initialize(SourceConnectorConfig sourceConnectorConfig) {
* @param queue queue info
*/
@Override
public void setHandler(Route route, SynchronizedCircularFifoQueue<Object> queue) {
public void setHandler(Route route, BlockingQueue<Object> queue) {
route.method(HttpMethod.POST)
.handler(ctx -> VertxMessageFactory.createReader(ctx.request())
.map(reader -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
import org.apache.eventmesh.connector.http.source.data.CommonResponse;
import org.apache.eventmesh.connector.http.source.data.WebhookRequest;
import org.apache.eventmesh.connector.http.source.protocol.Protocol;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import java.util.Base64;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;

import io.netty.handler.codec.http.HttpResponseStatus;
Expand Down Expand Up @@ -66,7 +66,7 @@ public void initialize(SourceConnectorConfig sourceConnectorConfig) {
* @param queue queue info
*/
@Override
public void setHandler(Route route, SynchronizedCircularFifoQueue<Object> queue) {
public void setHandler(Route route, BlockingQueue<Object> queue) {
route.method(HttpMethod.POST)
.handler(BodyHandler.create())
.handler(ctx -> {
Expand Down
Loading
Loading