From 6409a7ea177aa0b718fb075eeec3855ad5be8c10 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Tue, 3 Dec 2024 16:42:39 +0700 Subject: [PATCH 1/6] feat: add oss dlq --- build.gradle | 3 +- docs/docs/advance/dlq.md | 84 ++++++++++ .../config/ObjectStorageServiceConfig.java | 49 ++++++ .../blobstorage/BlobStorageFactory.java | 10 +- .../common/blobstorage/BlobStorageType.java | 3 +- .../blobstorage/oss/ObjectStorageService.java | 108 +++++++++++++ .../firehose/sink/dlq/DlqWriterFactory.java | 3 + .../common/oss/ObjectStorageServiceTest.java | 147 ++++++++++++++++++ 8 files changed, 404 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/gotocompany/firehose/config/ObjectStorageServiceConfig.java create mode 100644 src/main/java/com/gotocompany/firehose/sink/common/blobstorage/oss/ObjectStorageService.java create mode 100644 src/test/java/com/gotocompany/firehose/sink/common/oss/ObjectStorageServiceTest.java diff --git a/build.gradle b/build.gradle index bdd91140b..4a37d45af 100644 --- a/build.gradle +++ b/build.gradle @@ -33,7 +33,7 @@ lombok { } group 'com.gotocompany' -version '0.10.7' +version '0.10.8' def projName = "firehose" @@ -101,6 +101,7 @@ dependencies { implementation 'com.google.cloud:google-cloud-storage:2.20.1' implementation 'org.apache.logging.log4j:log4j-core:2.20.0' implementation group: 'com.gotocompany', name: 'depot', version: '0.9.2' + implementation group: 'com.aliyun.oss', name: 'aliyun-sdk-oss', version: '3.18.1' implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j' implementation 'dev.cel:cel:0.5.2' diff --git a/docs/docs/advance/dlq.md b/docs/docs/advance/dlq.md index 91d9b2ff8..e517d09da 100644 --- a/docs/docs/advance/dlq.md +++ b/docs/docs/advance/dlq.md @@ -228,3 +228,87 @@ The amount of time to allow the client to complete the execution of an API call. * Example value: `40000` * Type: `optional` * Default value : `40000` + +## `DLQ_OSS_ENDPOINT` + +The endpoint of the oss service. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/user-guide/regions-and-endpoints?spm=a2c63.p38356.0.0.65ad7fdf6qkcoQ). + +* Example value: `oss-cn-hangzhou.aliyuncs.com` +* Type: `optional` +* Default value : `null + +## `DLQ_OSS_ACCESS_KEY_ID` + +The access key id of the oss service. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/developer-reference/oss-java-configure-access-credentials#dd657ea839xv1). + +* Example value: `youraccessid` +* Type: `optional` + +## `DLQ_OSS_ACCESS_KEY_SECRET` + +The access key secret of the oss service. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/developer-reference/oss-java-configure-access-credentials#dd657ea839xv1). + +* Example value: `youraccesskey` +* Type: `optional` + +## `DLQ_OSS_BUCKET_NAME` + +The name of the oss bucket. Must adhere to the naming rules of oss. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/user-guide/bucket-naming-conventions?spm=a2c63.p38356.0.0.4cdb3962K5f3io). + +* Example value: `oss_bucket` +* Type: `optional` + +## `DLQ_OSS_DIRECTORY_PREFIX` + +The prefix of the directory in the oss bucket. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/user-guide/object-naming-conventions). + +* Example value: `oss_prefix` +* Type: `optional` + +## `DLQ_OSS_SOCKET_TIMEOUT_MS` + +The socket timeout in milliseconds. + +* Example value: `10000` +* Type: `required` +* Default value : `50000` + +## `DLQ_OSS_CONNECTION_TIMEOUT_MS` + +The connection timeout in milliseconds. + +* Example value: `50000` +* Type: `required` +* Default value : `50000` + +## `DLQ_OSS_CONNECTION_REQUEST_TIMEOUT_MS` + +The connection request timeout in milliseconds. Negative value indicates no timeout. + +* Example value: `100` +* Type: `required` +* Default value : `-1` + +## `DLQ_OSS_REQUEST_TIMEOUT_MS` + +The request timeout in milliseconds. + +* Example value: `50000` +* Type: `required` +* Default value : `300000` + +## `DLQ_OSS_RETRY_ENABLED` + +The flag to enable retry mechanism. + +* Example value: `true` +* Type: `required` +* Default value : `true` + +## `DLQ_OSS_MAX_RETRY_ATTEMPTS` + +The maximum number of retry attempts. + +* Example value: `3` +* Type: `required` +* Default value : `3` \ No newline at end of file diff --git a/src/main/java/com/gotocompany/firehose/config/ObjectStorageServiceConfig.java b/src/main/java/com/gotocompany/firehose/config/ObjectStorageServiceConfig.java new file mode 100644 index 000000000..54abba0b8 --- /dev/null +++ b/src/main/java/com/gotocompany/firehose/config/ObjectStorageServiceConfig.java @@ -0,0 +1,49 @@ +package com.gotocompany.firehose.config; + +import org.aeonbits.owner.Config; + +public interface ObjectStorageServiceConfig extends Config { + + @Key("${OSS_TYPE}_OSS_ENDPOINT") + String getOssEndpoint(); + + @Key("${OSS_TYPE}_OSS_REGION") + String getOssRegion(); + + @Key("${OSS_TYPE}_OSS_ACCESS_ID") + String getOssAccessId(); + + @Key("${OSS_TYPE}_OSS_ACCESS_KEY") + String getOssAccessKey(); + + @Key("${OSS_TYPE}_OSS_BUCKET_NAME") + String getOssBucketName(); + + @Key("${OSS_TYPE}_OSS_DIRECTORY_PREFIX") + String getOssDirectoryPrefix(); + + @Key("${OSS_TYPE}_OSS_SOCKET_TIMEOUT_MS") + @DefaultValue("50000") + Integer getOssSocketTimeoutMs(); + + @Key("${OSS_TYPE}_OSS_CONNECTION_TIMEOUT_MS") + @DefaultValue("50000") + Integer getOssConnectionTimeoutMs(); + + @Key("${OSS_TYPE}_OSS_CONNECTION_REQUEST_TIMEOUT_MS") + @DefaultValue("-1") + Integer getOssConnectionRequestTimeoutMs(); + + @Key("${OSS_TYPE}_OSS_REQUEST_TIMEOUT_MS") + @DefaultValue("300000") + Integer getOssRequestTimeoutMs(); + + @Key("${OSS_TYPE}_OSS_RETRY_ENABLED") + @DefaultValue("true") + boolean isRetryEnabled(); + + @Key("${OSS_TYPE}_OSS_MAX_RETRY_ATTEMPTS") + @DefaultValue("3") + int getOssMaxRetryAttempts(); + +} diff --git a/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/BlobStorageFactory.java b/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/BlobStorageFactory.java index dab74c3b8..b722c23f1 100644 --- a/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/BlobStorageFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/BlobStorageFactory.java @@ -1,8 +1,10 @@ package com.gotocompany.firehose.sink.common.blobstorage; import com.gotocompany.firehose.config.GCSConfig; +import com.gotocompany.firehose.config.ObjectStorageServiceConfig; import com.gotocompany.firehose.config.S3Config; import com.gotocompany.firehose.sink.common.blobstorage.gcs.GoogleCloudStorage; +import com.gotocompany.firehose.sink.common.blobstorage.oss.ObjectStorageService; import com.gotocompany.firehose.sink.common.blobstorage.s3.S3; import org.aeonbits.owner.ConfigFactory; @@ -27,7 +29,13 @@ public static BlobStorage createObjectStorage(BlobStorageType storageType, Map prefix + "/" + objectName) + .orElse(objectName); + } + + private void checkBucket() { + BucketList bucketList = oss.listBuckets(new ListBucketsRequest(objectStorageServiceConfig.getOssBucketName(), + null, 1)); + if (bucketList.getBucketList().isEmpty()) { + log.error("Bucket does not exist:{}", objectStorageServiceConfig.getOssBucketName()); + log.error("Please create OSS bucket before running firehose: {}", objectStorageServiceConfig.getOssBucketName()); + throw new IllegalArgumentException("Bucket does not exist"); + } + } + +} diff --git a/src/main/java/com/gotocompany/firehose/sink/dlq/DlqWriterFactory.java b/src/main/java/com/gotocompany/firehose/sink/dlq/DlqWriterFactory.java index be7e315a0..1f1ae5772 100644 --- a/src/main/java/com/gotocompany/firehose/sink/dlq/DlqWriterFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/dlq/DlqWriterFactory.java @@ -39,6 +39,9 @@ public static DlqWriter create(Map configuration, StatsDReporter case S3: configuration.put("S3_TYPE", "DLQ"); break; + case OSS: + configuration.put("OSS_TYPE", "DLQ"); + break; default: throw new IllegalArgumentException("DLQ Blob Storage type " + dlqConfig.getBlobStorageType() + "is not supported"); } diff --git a/src/test/java/com/gotocompany/firehose/sink/common/oss/ObjectStorageServiceTest.java b/src/test/java/com/gotocompany/firehose/sink/common/oss/ObjectStorageServiceTest.java new file mode 100644 index 000000000..4dbe52c77 --- /dev/null +++ b/src/test/java/com/gotocompany/firehose/sink/common/oss/ObjectStorageServiceTest.java @@ -0,0 +1,147 @@ +package com.gotocompany.firehose.sink.common.oss; + +import com.aliyun.oss.ClientException; +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSException; +import com.aliyun.oss.model.Bucket; +import com.aliyun.oss.model.BucketList; +import com.aliyun.oss.model.ListBucketsRequest; +import com.aliyun.oss.model.PutObjectRequest; +import com.gotocompany.firehose.config.ObjectStorageServiceConfig; +import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageException; +import com.gotocompany.firehose.sink.common.blobstorage.oss.ObjectStorageService; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; + +public class ObjectStorageServiceTest { + + @Test + public void shouldStoreObjectGivenFilePath() throws BlobStorageException { + ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); + Mockito.when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); + Mockito.when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); + Mockito.when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); + Mockito.when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); + Mockito.when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); + Mockito.when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); + OSS oss = Mockito.spy(OSS.class); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + Mockito.when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(null); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + Mockito.when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); + + objectStorageService.store("objectName", "filePath"); + + Mockito.verify(oss, Mockito.times(1)) + .putObject(argumentCaptor.capture()); + assertEquals("bucket_name", argumentCaptor.getValue().getBucketName()); + assertEquals("dir_prefix/objectName", argumentCaptor.getValue().getKey()); + assertEquals(new File("filePath"), argumentCaptor.getValue().getFile()); + } + + @Test + public void shouldStoreObjectGivenFileContent() throws BlobStorageException, IOException { + ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); + Mockito.when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); + Mockito.when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); + Mockito.when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); + Mockito.when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); + Mockito.when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); + Mockito.when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); + OSS oss = Mockito.spy(OSS.class); + Mockito.when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(null); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + Mockito.when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); + + objectStorageService.store("objectName", "content".getBytes()); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + Mockito.verify(oss, Mockito.times(1)) + .putObject(argumentCaptor.capture()); + assertEquals("bucket_name", argumentCaptor.getValue().getBucketName()); + assertEquals("dir_prefix/objectName", argumentCaptor.getValue().getKey()); + InputStream inputStream = argumentCaptor.getValue().getInputStream(); + assertEquals("content", getContent(inputStream)); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionWhenGivenBucketIsNotExists() throws BlobStorageException { + ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); + Mockito.when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); + Mockito.when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); + Mockito.when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); + Mockito.when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); + Mockito.when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); + Mockito.when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); + OSS oss = Mockito.spy(OSS.class); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(new ArrayList<>()); + Mockito.when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); + + objectStorageService.store("objectName", "content".getBytes()); + } + + @Test(expected = BlobStorageException.class) + public void shouldWrapToBlobStorageExceptionWhenClientExceptionIsThrown() throws BlobStorageException { + ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); + Mockito.when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); + Mockito.when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); + Mockito.when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); + Mockito.when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); + Mockito.when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); + Mockito.when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); + OSS oss = Mockito.spy(OSS.class); + Mockito.when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new ClientException("client_error")); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + Mockito.when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); + + objectStorageService.store("objectName", "content".getBytes()); + } + + @Test(expected = BlobStorageException.class) + public void shouldWrapToBlobStorageExceptionWhenOSSExceptionIsThrown() throws BlobStorageException { + ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); + Mockito.when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); + Mockito.when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); + Mockito.when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); + Mockito.when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); + Mockito.when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); + Mockito.when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); + OSS oss = Mockito.spy(OSS.class); + Mockito.when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new OSSException("server is down")); + BucketList bucketList = new BucketList(); + bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); + Mockito.when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); + + objectStorageService.store("objectName", "content".getBytes()); + } + + private static String getContent(InputStream inputStream) throws IOException { + ByteArrayOutputStream result = new ByteArrayOutputStream(); + byte[] buffer = new byte[1024]; + int length; + while ((length = inputStream.read(buffer)) != -1) { + result.write(buffer, 0, length); + } + return result.toString("UTF-8"); + } + +} From ca2dc7f8ab1e1339335e27f696be3f7c5f37e4a4 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Fri, 6 Dec 2024 11:33:48 +0700 Subject: [PATCH 2/6] chore: add static import for mock and assertion --- .../common/oss/ObjectStorageServiceTest.java | 79 ++++++++++--------- 1 file changed, 40 insertions(+), 39 deletions(-) diff --git a/src/test/java/com/gotocompany/firehose/sink/common/oss/ObjectStorageServiceTest.java b/src/test/java/com/gotocompany/firehose/sink/common/oss/ObjectStorageServiceTest.java index 4dbe52c77..81345a318 100644 --- a/src/test/java/com/gotocompany/firehose/sink/common/oss/ObjectStorageServiceTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/common/oss/ObjectStorageServiceTest.java @@ -22,24 +22,25 @@ import java.util.Collections; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; public class ObjectStorageServiceTest { @Test public void shouldStoreObjectGivenFilePath() throws BlobStorageException { ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); - Mockito.when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); - Mockito.when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); - Mockito.when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); - Mockito.when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); - Mockito.when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); - Mockito.when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); + when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); + when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); + when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); + when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); + when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); + when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); OSS oss = Mockito.spy(OSS.class); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); - Mockito.when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(null); + when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(null); BucketList bucketList = new BucketList(); bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); - Mockito.when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); objectStorageService.store("objectName", "filePath"); @@ -54,17 +55,17 @@ public void shouldStoreObjectGivenFilePath() throws BlobStorageException { @Test public void shouldStoreObjectGivenFileContent() throws BlobStorageException, IOException { ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); - Mockito.when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); - Mockito.when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); - Mockito.when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); - Mockito.when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); - Mockito.when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); - Mockito.when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); + when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); + when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); + when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); + when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); + when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); + when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); OSS oss = Mockito.spy(OSS.class); - Mockito.when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(null); + when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(null); BucketList bucketList = new BucketList(); bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); - Mockito.when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); objectStorageService.store("objectName", "content".getBytes()); @@ -81,16 +82,16 @@ public void shouldStoreObjectGivenFileContent() throws BlobStorageException, IOE @Test(expected = IllegalArgumentException.class) public void shouldThrowIllegalArgumentExceptionWhenGivenBucketIsNotExists() throws BlobStorageException { ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); - Mockito.when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); - Mockito.when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); - Mockito.when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); - Mockito.when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); - Mockito.when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); - Mockito.when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); + when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); + when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); + when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); + when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); + when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); + when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); OSS oss = Mockito.spy(OSS.class); BucketList bucketList = new BucketList(); bucketList.setBucketList(new ArrayList<>()); - Mockito.when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); objectStorageService.store("objectName", "content".getBytes()); @@ -99,17 +100,17 @@ public void shouldThrowIllegalArgumentExceptionWhenGivenBucketIsNotExists() thro @Test(expected = BlobStorageException.class) public void shouldWrapToBlobStorageExceptionWhenClientExceptionIsThrown() throws BlobStorageException { ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); - Mockito.when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); - Mockito.when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); - Mockito.when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); - Mockito.when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); - Mockito.when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); - Mockito.when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); + when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); + when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); + when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); + when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); + when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); + when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); OSS oss = Mockito.spy(OSS.class); - Mockito.when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new ClientException("client_error")); + when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new ClientException("client_error")); BucketList bucketList = new BucketList(); bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); - Mockito.when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); objectStorageService.store("objectName", "content".getBytes()); @@ -118,17 +119,17 @@ public void shouldWrapToBlobStorageExceptionWhenClientExceptionIsThrown() throws @Test(expected = BlobStorageException.class) public void shouldWrapToBlobStorageExceptionWhenOSSExceptionIsThrown() throws BlobStorageException { ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); - Mockito.when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); - Mockito.when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); - Mockito.when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); - Mockito.when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); - Mockito.when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); - Mockito.when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); + when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); + when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); + when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); + when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); + when(objectStorageServiceConfig.getOssBucketName()).thenReturn("bucket_name"); + when(objectStorageServiceConfig.getOssDirectoryPrefix()).thenReturn("dir_prefix"); OSS oss = Mockito.spy(OSS.class); - Mockito.when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new OSSException("server is down")); + when(oss.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new OSSException("server is down")); BucketList bucketList = new BucketList(); bucketList.setBucketList(Collections.singletonList(Mockito.mock(Bucket.class))); - Mockito.when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); + when(oss.listBuckets(Mockito.any(ListBucketsRequest.class))).thenReturn(bucketList); ObjectStorageService objectStorageService = new ObjectStorageService(objectStorageServiceConfig, oss); objectStorageService.store("objectName", "content".getBytes()); From 3dec049db9319cd73e3a5148b389d425b59cfd9d Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Fri, 6 Dec 2024 12:02:05 +0700 Subject: [PATCH 3/6] test: add testing for OSSClient initialization --- .../blobstorage/oss/ObjectStorageService.java | 2 +- .../oss/ObjectStorageServiceTest.java | 68 ++++++++++++++++++- 2 files changed, 67 insertions(+), 3 deletions(-) rename src/test/java/com/gotocompany/firehose/sink/common/{ => blobstorage}/oss/ObjectStorageServiceTest.java (67%) diff --git a/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/oss/ObjectStorageService.java b/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/oss/ObjectStorageService.java index 97340edbe..a981a78db 100644 --- a/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/oss/ObjectStorageService.java +++ b/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/oss/ObjectStorageService.java @@ -36,7 +36,7 @@ public ObjectStorageService(ObjectStorageServiceConfig objectStorageServiceConfi checkBucket(); } - private static OSS initializeOss(ObjectStorageServiceConfig objectStorageServiceConfig) { + protected static OSS initializeOss(ObjectStorageServiceConfig objectStorageServiceConfig) { ClientBuilderConfiguration clientBuilderConfiguration = new ClientBuilderConfiguration(); clientBuilderConfiguration.setSignatureVersion(SignVersion.V4); clientBuilderConfiguration.setSocketTimeout(objectStorageServiceConfig.getOssSocketTimeoutMs()); diff --git a/src/test/java/com/gotocompany/firehose/sink/common/oss/ObjectStorageServiceTest.java b/src/test/java/com/gotocompany/firehose/sink/common/blobstorage/oss/ObjectStorageServiceTest.java similarity index 67% rename from src/test/java/com/gotocompany/firehose/sink/common/oss/ObjectStorageServiceTest.java rename to src/test/java/com/gotocompany/firehose/sink/common/blobstorage/oss/ObjectStorageServiceTest.java index 81345a318..4835bbc00 100644 --- a/src/test/java/com/gotocompany/firehose/sink/common/oss/ObjectStorageServiceTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/common/blobstorage/oss/ObjectStorageServiceTest.java @@ -1,15 +1,19 @@ -package com.gotocompany.firehose.sink.common.oss; +package com.gotocompany.firehose.sink.common.blobstorage.oss; import com.aliyun.oss.ClientException; import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClient; import com.aliyun.oss.OSSException; +import com.aliyun.oss.common.auth.Credentials; +import com.aliyun.oss.common.auth.DefaultCredentialProvider; +import com.aliyun.oss.common.comm.ServiceClient; +import com.aliyun.oss.internal.OSSOperation; import com.aliyun.oss.model.Bucket; import com.aliyun.oss.model.BucketList; import com.aliyun.oss.model.ListBucketsRequest; import com.aliyun.oss.model.PutObjectRequest; import com.gotocompany.firehose.config.ObjectStorageServiceConfig; import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageException; -import com.gotocompany.firehose.sink.common.blobstorage.oss.ObjectStorageService; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -18,6 +22,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collections; @@ -26,6 +31,65 @@ public class ObjectStorageServiceTest { + @Test + public void shouldInitializeOssGivenObjectStorageServiceConfig() throws NoSuchFieldException, IllegalAccessException { + ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); + when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); + when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); + when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); + when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); + + OSSClient oss = (OSSClient) ObjectStorageService.initializeOss(objectStorageServiceConfig); + Field credentialProviderField = oss.getClass().getDeclaredField("credsProvider"); + credentialProviderField.setAccessible(true); + Field credentialsField = DefaultCredentialProvider.class.getDeclaredField("creds"); + credentialsField.setAccessible(true); + Field endpointField = oss.getClass().getDeclaredField("endpoint"); + endpointField.setAccessible(true); + Field ossBucketOperationField = oss.getClass().getDeclaredField("bucketOperation"); + ossBucketOperationField.setAccessible(true); + Field region = OSSOperation.class.getDeclaredField("region"); + region.setAccessible(true); + + Credentials credentials = (Credentials) credentialsField.get(credentialProviderField.get(oss)); + assertEquals("http://localhost:9000", endpointField.get(oss).toString()); + assertEquals("ap-southeast-5", region.get(ossBucketOperationField.get(oss))); + assertEquals("accessId", credentials.getAccessKeyId()); + assertEquals("accessKey", credentials.getSecretAccessKey()); + } + + @Test + public void shouldInitializeOssGivenObjectStorageServiceConfigWithRetryConfig() throws NoSuchFieldException, IllegalAccessException { + ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); + when(objectStorageServiceConfig.getOssEndpoint()).thenReturn("http://localhost:9000"); + when(objectStorageServiceConfig.getOssRegion()).thenReturn("ap-southeast-5"); + when(objectStorageServiceConfig.getOssAccessId()).thenReturn("accessId"); + when(objectStorageServiceConfig.getOssAccessKey()).thenReturn("accessKey"); + when(objectStorageServiceConfig.isRetryEnabled()).thenReturn(true); + when(objectStorageServiceConfig.getOssMaxRetryAttempts()).thenReturn(3); + + OSSClient oss = (OSSClient) ObjectStorageService.initializeOss(objectStorageServiceConfig); + Field serviceClient = oss.getClass().getDeclaredField("serviceClient"); + serviceClient.setAccessible(true); + Field credentialProviderField = oss.getClass().getDeclaredField("credsProvider"); + credentialProviderField.setAccessible(true); + Field credentialsField = DefaultCredentialProvider.class.getDeclaredField("creds"); + credentialsField.setAccessible(true); + Field endpointField = oss.getClass().getDeclaredField("endpoint"); + endpointField.setAccessible(true); + Field ossBucketOperationField = oss.getClass().getDeclaredField("bucketOperation"); + ossBucketOperationField.setAccessible(true); + Field region = OSSOperation.class.getDeclaredField("region"); + region.setAccessible(true); + + Credentials credentials = (Credentials) credentialsField.get(credentialProviderField.get(oss)); + assertEquals("http://localhost:9000", endpointField.get(oss).toString()); + assertEquals("ap-southeast-5", region.get(ossBucketOperationField.get(oss))); + assertEquals("accessId", credentials.getAccessKeyId()); + assertEquals("accessKey", credentials.getSecretAccessKey()); + assertEquals(3, ((ServiceClient) serviceClient.get(oss)).getClientConfiguration().getMaxErrorRetry()); + } + @Test public void shouldStoreObjectGivenFilePath() throws BlobStorageException { ObjectStorageServiceConfig objectStorageServiceConfig = Mockito.mock(ObjectStorageServiceConfig.class); From 8e8c0e977750d3af78ca9a39e92fa95704c238b9 Mon Sep 17 00:00:00 2001 From: ekawinataa Date: Fri, 6 Dec 2024 15:07:25 +0700 Subject: [PATCH 4/6] chore: Update build.gradle chore: Update build.gradle to 0.11.1 --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 4a37d45af..4b7e6e4f4 100644 --- a/build.gradle +++ b/build.gradle @@ -33,7 +33,7 @@ lombok { } group 'com.gotocompany' -version '0.10.8' +version '0.11.1' def projName = "firehose" From ea19bb1409987f1843a9fe6d298f34f5723df0a7 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Fri, 6 Dec 2024 15:53:10 +0700 Subject: [PATCH 5/6] chore: set version to 0.10.8 --- docs/docs/advance/dlq.md | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/docs/docs/advance/dlq.md b/docs/docs/advance/dlq.md index e517d09da..42fe69d41 100644 --- a/docs/docs/advance/dlq.md +++ b/docs/docs/advance/dlq.md @@ -32,7 +32,7 @@ Max attempts to retry for dlq. ## `DLQ_BLOB_STORAGE_TYPE` -If the writer type is set to BLOB_STORAGE, we can choose any blob storage. Currently, GCS and S3 is supported. +If the writer type is set to BLOB_STORAGE, we can choose any blob storage. Currently, GCS, S3 and OSS are supported. * Example value: `GCS` * Type: `optional` @@ -232,9 +232,10 @@ The amount of time to allow the client to complete the execution of an API call. ## `DLQ_OSS_ENDPOINT` The endpoint of the oss service. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/user-guide/regions-and-endpoints?spm=a2c63.p38356.0.0.65ad7fdf6qkcoQ). +Mandatory if DLQ_BLOB_STORAGE_TYPE is OSS. * Example value: `oss-cn-hangzhou.aliyuncs.com` -* Type: `optional` +* Type: `Required if DLQ_BLOB_STORAGE_TYPE is OSS` * Default value : `null ## `DLQ_OSS_ACCESS_KEY_ID` @@ -242,21 +243,21 @@ The endpoint of the oss service. For more information, please refer to the [oss The access key id of the oss service. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/developer-reference/oss-java-configure-access-credentials#dd657ea839xv1). * Example value: `youraccessid` -* Type: `optional` +* Type: `Required if DLQ_BLOB_STORAGE_TYPE is OSS` ## `DLQ_OSS_ACCESS_KEY_SECRET` The access key secret of the oss service. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/developer-reference/oss-java-configure-access-credentials#dd657ea839xv1). * Example value: `youraccesskey` -* Type: `optional` +* Type: `Required if DLQ_BLOB_STORAGE_TYPE is OSS` ## `DLQ_OSS_BUCKET_NAME` The name of the oss bucket. Must adhere to the naming rules of oss. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/user-guide/bucket-naming-conventions?spm=a2c63.p38356.0.0.4cdb3962K5f3io). * Example value: `oss_bucket` -* Type: `optional` +* Type: `Required if DLQ_BLOB_STORAGE_TYPE is OSS` ## `DLQ_OSS_DIRECTORY_PREFIX` @@ -299,7 +300,7 @@ The request timeout in milliseconds. ## `DLQ_OSS_RETRY_ENABLED` -The flag to enable retry mechanism. +The flag to enable retry mechanism for OSS client when transient failure occurred. * Example value: `true` * Type: `required` @@ -307,7 +308,7 @@ The flag to enable retry mechanism. ## `DLQ_OSS_MAX_RETRY_ATTEMPTS` -The maximum number of retry attempts. +The maximum number of retry attempts. To be used in conjunction when `DLQ_OSS_RETRY_ENABLED` is set to `true`. * Example value: `3` * Type: `required` From 0f5b23de7cc41a64371c14ec3cb7b1061467af92 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Fri, 6 Dec 2024 16:06:29 +0700 Subject: [PATCH 6/6] chore: typo on default value --- docs/docs/advance/dlq.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/docs/advance/dlq.md b/docs/docs/advance/dlq.md index 42fe69d41..262ece7be 100644 --- a/docs/docs/advance/dlq.md +++ b/docs/docs/advance/dlq.md @@ -236,7 +236,7 @@ Mandatory if DLQ_BLOB_STORAGE_TYPE is OSS. * Example value: `oss-cn-hangzhou.aliyuncs.com` * Type: `Required if DLQ_BLOB_STORAGE_TYPE is OSS` -* Default value : `null +* Default value : `null` ## `DLQ_OSS_ACCESS_KEY_ID`