diff --git a/eventmesh-connectors/eventmesh-connector-redis/build.gradle b/eventmesh-connectors/eventmesh-connector-redis/build.gradle index 42d3d4ced2..425a10570a 100644 --- a/eventmesh-connectors/eventmesh-connector-redis/build.gradle +++ b/eventmesh-connectors/eventmesh-connector-redis/build.gradle @@ -24,4 +24,7 @@ dependencies { compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' + + testImplementation 'ai.grakn:redis-mock:0.1.6' + testImplementation project(":eventmesh-common") } \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/AbstractRedisServer.java b/eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/AbstractRedisServer.java new file mode 100644 index 0000000000..46272496f4 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/AbstractRedisServer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.redis; + +import ai.grakn.redismock.RedisServer; + +public abstract class AbstractRedisServer { + + protected RedisServer redisServer; + + public void setupRedisServer(int port) throws Exception { + redisServer = RedisServer.newRedisServer(port); + redisServer.start(); + } + + public void shutdownRedisServer() { + if (redisServer != null) { + redisServer.stop(); + } + } + + public static int getPortFromUrl(String url) { + return Integer.parseInt(url.substring(url.lastIndexOf(":") + 1)); + } +} diff --git a/eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnectorTest.java new file mode 100644 index 0000000000..13ec4f7379 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-redis/src/test/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnectorTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.redis.sink.connector; + +import org.apache.eventmesh.connector.redis.AbstractRedisServer; +import org.apache.eventmesh.connector.redis.cloudevent.CloudEventCodec; +import org.apache.eventmesh.connector.redis.sink.config.RedisSinkConfig; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition; +import org.apache.eventmesh.openconnect.util.ConfigUtil; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.redisson.Redisson; +import org.redisson.api.RTopic; +import org.redisson.config.Config; + +import io.cloudevents.CloudEvent; + +public class RedisSinkConnectorTest extends AbstractRedisServer { + + private RedisSinkConnector connector; + + private Redisson redisson; + + private RedisSinkConfig sinkConfig; + + @BeforeEach + public void setUp() throws Exception { + connector = new RedisSinkConnector(); + sinkConfig = (RedisSinkConfig) ConfigUtil.parse(connector.configClass()); + setupRedisServer(getPortFromUrl(sinkConfig.getConnectorConfig().getServer())); + connector.init(sinkConfig); + connector.start(); + Config config = new Config(); + config.setCodec(CloudEventCodec.getInstance()); + config.useSingleServer() + .setAddress(sinkConfig.getConnectorConfig().getServer()); + redisson = (Redisson) Redisson.create(config); + } + + @Test + public void testPutConnectRecords() throws InterruptedException { + RTopic topic = redisson.getTopic(sinkConfig.connectorConfig.getTopic()); + + final String expectedMessage = "\"testRedisMessage\""; + final int expectedCount = 5; + final CountDownLatch downLatch = new CountDownLatch(expectedCount); + topic.addListener(CloudEvent.class, (channel, msg) -> { + downLatch.countDown(); + Assertions.assertNotNull(msg.getData()); + Assertions.assertEquals(expectedMessage, new String(msg.getData().toBytes())); + }); + + List records = new ArrayList<>(); + for (int i = 0; i < expectedCount; i++) { + RecordPartition partition = new RecordPartition(); + RecordOffset offset = new RecordOffset(); + ConnectRecord connectRecord = new ConnectRecord(partition, offset, System.currentTimeMillis(), + expectedMessage.getBytes(StandardCharsets.UTF_8)); + connectRecord.addExtension("id", String.valueOf(UUID.randomUUID())); + connectRecord.addExtension("source", "testSource"); + connectRecord.addExtension("type", "testType"); + records.add(connectRecord); + } + connector.put(records); + Assertions.assertTrue(downLatch.await(10, TimeUnit.SECONDS)); + } + + @AfterEach + public void tearDown() throws Exception { + connector.stop(); + redisson.shutdown(); + shutdownRedisServer(); + } +} diff --git a/eventmesh-connectors/eventmesh-connector-redis/src/test/resources/sink-config.yml b/eventmesh-connectors/eventmesh-connector-redis/src/test/resources/sink-config.yml new file mode 100644 index 0000000000..596006a2a1 --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-redis/src/test/resources/sink-config.yml @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pubSubConfig: + meshAddress: 127.0.0.1:10000 + subject: TopicTest + idc: FT + env: PRD + group: redisSink + appId: 5031 + userName: redisSinkUser + passWord: redisPassWord +connectorConfig: + connectorName: redisSink + server: redis://127.0.0.1:6379 + topic: SinkTopic +