From 6409a7ea177aa0b718fb075eeec3855ad5be8c10 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Tue, 3 Dec 2024 16:42:39 +0700 Subject: [PATCH] feat: add oss dlq --- build.gradle | 3 +- docs/docs/advance/dlq.md | 84 ++++++++++ .../config/ObjectStorageServiceConfig.java | 49 ++++++ .../blobstorage/BlobStorageFactory.java | 10 +- .../common/blobstorage/BlobStorageType.java | 3 +- .../blobstorage/oss/ObjectStorageService.java | 108 +++++++++++++ .../firehose/sink/dlq/DlqWriterFactory.java | 3 + .../common/oss/ObjectStorageServiceTest.java | 147 ++++++++++++++++++ 8 files changed, 404 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/gotocompany/firehose/config/ObjectStorageServiceConfig.java create mode 100644 src/main/java/com/gotocompany/firehose/sink/common/blobstorage/oss/ObjectStorageService.java create mode 100644 src/test/java/com/gotocompany/firehose/sink/common/oss/ObjectStorageServiceTest.java diff --git a/build.gradle b/build.gradle index bdd91140b..4a37d45af 100644 --- a/build.gradle +++ b/build.gradle @@ -33,7 +33,7 @@ lombok { } group 'com.gotocompany' -version '0.10.7' +version '0.10.8' def projName = "firehose" @@ -101,6 +101,7 @@ dependencies { implementation 'com.google.cloud:google-cloud-storage:2.20.1' implementation 'org.apache.logging.log4j:log4j-core:2.20.0' implementation group: 'com.gotocompany', name: 'depot', version: '0.9.2' + implementation group: 'com.aliyun.oss', name: 'aliyun-sdk-oss', version: '3.18.1' implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j' implementation 'dev.cel:cel:0.5.2' diff --git a/docs/docs/advance/dlq.md b/docs/docs/advance/dlq.md index 91d9b2ff8..e517d09da 100644 --- a/docs/docs/advance/dlq.md +++ b/docs/docs/advance/dlq.md @@ -228,3 +228,87 @@ The amount of time to allow the client to complete the execution of an API call. * Example value: `40000` * Type: `optional` * Default value : `40000` + +## `DLQ_OSS_ENDPOINT` + +The endpoint of the oss service. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/user-guide/regions-and-endpoints?spm=a2c63.p38356.0.0.65ad7fdf6qkcoQ). + +* Example value: `oss-cn-hangzhou.aliyuncs.com` +* Type: `optional` +* Default value : `null + +## `DLQ_OSS_ACCESS_KEY_ID` + +The access key id of the oss service. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/developer-reference/oss-java-configure-access-credentials#dd657ea839xv1). + +* Example value: `youraccessid` +* Type: `optional` + +## `DLQ_OSS_ACCESS_KEY_SECRET` + +The access key secret of the oss service. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/developer-reference/oss-java-configure-access-credentials#dd657ea839xv1). + +* Example value: `youraccesskey` +* Type: `optional` + +## `DLQ_OSS_BUCKET_NAME` + +The name of the oss bucket. Must adhere to the naming rules of oss. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/user-guide/bucket-naming-conventions?spm=a2c63.p38356.0.0.4cdb3962K5f3io). + +* Example value: `oss_bucket` +* Type: `optional` + +## `DLQ_OSS_DIRECTORY_PREFIX` + +The prefix of the directory in the oss bucket. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/user-guide/object-naming-conventions). + +* Example value: `oss_prefix` +* Type: `optional` + +## `DLQ_OSS_SOCKET_TIMEOUT_MS` + +The socket timeout in milliseconds. + +* Example value: `10000` +* Type: `required` +* Default value : `50000` + +## `DLQ_OSS_CONNECTION_TIMEOUT_MS` + +The connection timeout in milliseconds. + +* Example value: `50000` +* Type: `required` +* Default value : `50000` + +## `DLQ_OSS_CONNECTION_REQUEST_TIMEOUT_MS` + +The connection request timeout in milliseconds. Negative value indicates no timeout. + +* Example value: `100` +* Type: `required` +* Default value : `-1` + +## `DLQ_OSS_REQUEST_TIMEOUT_MS` + +The request timeout in milliseconds. + +* Example value: `50000` +* Type: `required` +* Default value : `300000` + +## `DLQ_OSS_RETRY_ENABLED` + +The flag to enable retry mechanism. + +* Example value: `true` +* Type: `required` +* Default value : `true` + +## `DLQ_OSS_MAX_RETRY_ATTEMPTS` + +The maximum number of retry attempts. + +* Example value: `3` +* Type: `required` +* Default value : `3` \ No newline at end of file diff --git a/src/main/java/com/gotocompany/firehose/config/ObjectStorageServiceConfig.java b/src/main/java/com/gotocompany/firehose/config/ObjectStorageServiceConfig.java new file mode 100644 index 000000000..54abba0b8 --- /dev/null +++ b/src/main/java/com/gotocompany/firehose/config/ObjectStorageServiceConfig.java @@ -0,0 +1,49 @@ +package com.gotocompany.firehose.config; + +import org.aeonbits.owner.Config; + +public interface ObjectStorageServiceConfig extends Config { + + @Key("${OSS_TYPE}_OSS_ENDPOINT") + String getOssEndpoint(); + + @Key("${OSS_TYPE}_OSS_REGION") + String getOssRegion(); + + @Key("${OSS_TYPE}_OSS_ACCESS_ID") + String getOssAccessId(); + + @Key("${OSS_TYPE}_OSS_ACCESS_KEY") + String getOssAccessKey(); + + @Key("${OSS_TYPE}_OSS_BUCKET_NAME") + String getOssBucketName(); + + @Key("${OSS_TYPE}_OSS_DIRECTORY_PREFIX") + String getOssDirectoryPrefix(); + + @Key("${OSS_TYPE}_OSS_SOCKET_TIMEOUT_MS") + @DefaultValue("50000") + Integer getOssSocketTimeoutMs(); + + @Key("${OSS_TYPE}_OSS_CONNECTION_TIMEOUT_MS") + @DefaultValue("50000") + Integer getOssConnectionTimeoutMs(); + + @Key("${OSS_TYPE}_OSS_CONNECTION_REQUEST_TIMEOUT_MS") + @DefaultValue("-1") + Integer getOssConnectionRequestTimeoutMs(); + + @Key("${OSS_TYPE}_OSS_REQUEST_TIMEOUT_MS") + @DefaultValue("300000") + Integer getOssRequestTimeoutMs(); + + @Key("${OSS_TYPE}_OSS_RETRY_ENABLED") + @DefaultValue("true") + boolean isRetryEnabled(); + + @Key("${OSS_TYPE}_OSS_MAX_RETRY_ATTEMPTS") + @DefaultValue("3") + int getOssMaxRetryAttempts(); + +} diff --git a/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/BlobStorageFactory.java b/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/BlobStorageFactory.java index dab74c3b8..b722c23f1 100644 --- a/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/BlobStorageFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/BlobStorageFactory.java @@ -1,8 +1,10 @@ package com.gotocompany.firehose.sink.common.blobstorage; import com.gotocompany.firehose.config.GCSConfig; +import com.gotocompany.firehose.config.ObjectStorageServiceConfig; import com.gotocompany.firehose.config.S3Config; import com.gotocompany.firehose.sink.common.blobstorage.gcs.GoogleCloudStorage; +import com.gotocompany.firehose.sink.common.blobstorage.oss.ObjectStorageService; import com.gotocompany.firehose.sink.common.blobstorage.s3.S3; import org.aeonbits.owner.ConfigFactory; @@ -27,7 +29,13 @@ public static BlobStorage createObjectStorage(BlobStorageType storageType, Map prefix + "/" + objectName) + .orElse(objectName); + } + + private void checkBucket() { + BucketList bucketList = oss.listBuckets(new ListBucketsRequest(objectStorageServiceConfig.getOssBucketName(), + null, 1)); + if (bucketList.getBucketList().isEmpty()) { + log.error("Bucket does not exist:{}", objectStorageServiceConfig.getOssBucketName()); + log.error("Please create OSS bucket before running firehose: {}", objectStorageServiceConfig.getOssBucketName()); + throw new IllegalArgumentException("Bucket does not exist"); + } + } + +} diff --git a/src/main/java/com/gotocompany/firehose/sink/dlq/DlqWriterFactory.java b/src/main/java/com/gotocompany/firehose/sink/dlq/DlqWriterFactory.java index be7e315a0..1f1ae5772 100644 --- a/src/main/java/com/gotocompany/firehose/sink/dlq/DlqWriterFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/dlq/DlqWriterFactory.java @@ -39,6 +39,9 @@ public static DlqWriter create(Map configuration, StatsDReporter case S3: configuration.put("S3_TYPE", "DLQ"); break; + case OSS: + configuration.put("OSS_TYPE", "DLQ"); + break; default: throw new IllegalArgumentException("DLQ Blob Storage type " + dlqConfig.getBlobStorageType() + "is not supported"); } diff --git a/src/test/java/com/gotocompany/firehose/sink/common/oss/ObjectStorageServiceTest.java b/src/test/java/com/gotocompany/firehose/sink/common/oss/ObjectStorageServiceTest.java new file mode 100644 index 000000000..4dbe52c77 --- /dev/null +++ b/src/test/java/com/gotocompany/firehose/sink/common/oss/ObjectStorageServiceTest.java @@ -0,0 +1,147 @@ +package com.gotocompany.firehose.sink.common.oss; + +import com.aliyun.oss.ClientException; +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSException; +import com.aliyun.oss.model.Bucket; +import com.aliyun.oss.model.BucketList; +import com.aliyun.oss.model.ListBucketsRequest; +import com.aliyun.oss.model.PutObjectRequest; +import com.gotocompany.firehose.config.ObjectStorageServiceConfig; +import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageException; +import com.gotocompany.firehose.sink.common.blobstorage.oss.ObjectStorageService; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; + +public class ObjectStorageServiceTest { + + @Test + public void shouldStoreObjectGivenFilePath() throws BlobStorageException { + ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); + Mockito.when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); + Mockito.when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); + Mockito.when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); + Mockito.when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); + Mockito.when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); + Mockito.when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); + OSS oss = Mockito.spy(OSS.class); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + Mockito.when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(null); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + Mockito.when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); + + objectStorageService.store("objectName", "filePath"); + + Mockito.verify(oss, Mockito.times(1)) + .putObject(argumentCaptor.capture()); + assertEquals("bucket_name", argumentCaptor.getValue().getBucketName()); + assertEquals("dir_prefix/objectName", argumentCaptor.getValue().getKey()); + assertEquals(new File("filePath"), argumentCaptor.getValue().getFile()); + } + + @Test + public void shouldStoreObjectGivenFileContent() throws BlobStorageException, IOException { + ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); + Mockito.when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); + Mockito.when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); + Mockito.when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); + Mockito.when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); + Mockito.when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); + Mockito.when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); + OSS oss = Mockito.spy(OSS.class); + Mockito.when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(null); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + Mockito.when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); + + objectStorageService.store("objectName", "content".getBytes()); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + Mockito.verify(oss, Mockito.times(1)) + .putObject(argumentCaptor.capture()); + assertEquals("bucket_name", argumentCaptor.getValue().getBucketName()); + assertEquals("dir_prefix/objectName", argumentCaptor.getValue().getKey()); + InputStream inputStream = argumentCaptor.getValue().getInputStream(); + assertEquals("content", getContent(inputStream)); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionWhenGivenBucketIsNotExists() throws BlobStorageException { + ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); + Mockito.when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); + Mockito.when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); + Mockito.when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); + Mockito.when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); + Mockito.when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); + Mockito.when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); + OSS oss = Mockito.spy(OSS.class); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(new ArrayList<>()); + Mockito.when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); + + objectStorageService.store("objectName", "content".getBytes()); + } + + @Test(expected = BlobStorageException.class) + public void shouldWrapToBlobStorageExceptionWhenClientExceptionIsThrown() throws BlobStorageException { + ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); + Mockito.when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); + Mockito.when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); + Mockito.when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); + Mockito.when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); + Mockito.when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); + Mockito.when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); + OSS oss = Mockito.spy(OSS.class); + Mockito.when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new ClientException("client_error")); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + Mockito.when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); + + objectStorageService.store("objectName", "content".getBytes()); + } + + @Test(expected = BlobStorageException.class) + public void shouldWrapToBlobStorageExceptionWhenOSSExceptionIsThrown() throws BlobStorageException { + ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); + Mockito.when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); + Mockito.when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); + Mockito.when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); + Mockito.when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); + Mockito.when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); + Mockito.when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); + OSS oss = Mockito.spy(OSS.class); + Mockito.when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new OSSException("server is down")); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + Mockito.when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); + + objectStorageService.store("objectName", "content".getBytes()); + } + + private static String getContent(InputStream inputStream) throws IOException { + ByteArrayOutputStream result = new ByteArrayOutputStream(); + byte[] buffer = new byte[1024]; + int length; + while ((length = inputStream.read(buffer)) != -1) { + result.write(buffer, 0, length); + } + return result.toString("UTF-8"); + } + +}