Skip to content

Commit

Permalink
feat: add oss dlq
Browse files Browse the repository at this point in the history
  • Loading branch information
ekawinataa committed Dec 3, 2024
1 parent a376678 commit 6409a7e
Show file tree
Hide file tree
Showing 8 changed files with 404 additions and 3 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'com.gotocompany'
version '0.10.7'
version '0.10.8'

def projName = "firehose"

Expand Down Expand Up @@ -101,6 +101,7 @@ dependencies {
implementation 'com.google.cloud:google-cloud-storage:2.20.1'
implementation 'org.apache.logging.log4j:log4j-core:2.20.0'
implementation group: 'com.gotocompany', name: 'depot', version: '0.9.2'
implementation group: 'com.aliyun.oss', name: 'aliyun-sdk-oss', version: '3.18.1'
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'
implementation 'dev.cel:cel:0.5.2'

Expand Down
84 changes: 84 additions & 0 deletions docs/docs/advance/dlq.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,87 @@ The amount of time to allow the client to complete the execution of an API call.
* Example value: `40000`
* Type: `optional`
* Default value : `40000`

## `DLQ_OSS_ENDPOINT`

The endpoint of the oss service. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/user-guide/regions-and-endpoints?spm=a2c63.p38356.0.0.65ad7fdf6qkcoQ).

* Example value: `oss-cn-hangzhou.aliyuncs.com`
* Type: `optional`
* Default value : `null

## `DLQ_OSS_ACCESS_KEY_ID`

The access key id of the oss service. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/developer-reference/oss-java-configure-access-credentials#dd657ea839xv1).

* Example value: `youraccessid`
* Type: `optional`

## `DLQ_OSS_ACCESS_KEY_SECRET`

The access key secret of the oss service. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/developer-reference/oss-java-configure-access-credentials#dd657ea839xv1).

* Example value: `youraccesskey`
* Type: `optional`

## `DLQ_OSS_BUCKET_NAME`

The name of the oss bucket. Must adhere to the naming rules of oss. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/user-guide/bucket-naming-conventions?spm=a2c63.p38356.0.0.4cdb3962K5f3io).

* Example value: `oss_bucket`
* Type: `optional`

## `DLQ_OSS_DIRECTORY_PREFIX`

The prefix of the directory in the oss bucket. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/user-guide/object-naming-conventions).

* Example value: `oss_prefix`
* Type: `optional`

## `DLQ_OSS_SOCKET_TIMEOUT_MS`

The socket timeout in milliseconds.

* Example value: `10000`
* Type: `required`
* Default value : `50000`

## `DLQ_OSS_CONNECTION_TIMEOUT_MS`

The connection timeout in milliseconds.

* Example value: `50000`
* Type: `required`
* Default value : `50000`

## `DLQ_OSS_CONNECTION_REQUEST_TIMEOUT_MS`

The connection request timeout in milliseconds. Negative value indicates no timeout.

* Example value: `100`
* Type: `required`
* Default value : `-1`

## `DLQ_OSS_REQUEST_TIMEOUT_MS`

The request timeout in milliseconds.

* Example value: `50000`
* Type: `required`
* Default value : `300000`

## `DLQ_OSS_RETRY_ENABLED`

The flag to enable retry mechanism.

* Example value: `true`
* Type: `required`
* Default value : `true`

## `DLQ_OSS_MAX_RETRY_ATTEMPTS`

The maximum number of retry attempts.

* Example value: `3`
* Type: `required`
* Default value : `3`
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.gotocompany.firehose.config;

import org.aeonbits.owner.Config;

public interface ObjectStorageServiceConfig extends Config {

@Key("${OSS_TYPE}_OSS_ENDPOINT")
String getOssEndpoint();

@Key("${OSS_TYPE}_OSS_REGION")
String getOssRegion();

@Key("${OSS_TYPE}_OSS_ACCESS_ID")
String getOssAccessId();

@Key("${OSS_TYPE}_OSS_ACCESS_KEY")
String getOssAccessKey();

@Key("${OSS_TYPE}_OSS_BUCKET_NAME")
String getOssBucketName();

@Key("${OSS_TYPE}_OSS_DIRECTORY_PREFIX")
String getOssDirectoryPrefix();

@Key("${OSS_TYPE}_OSS_SOCKET_TIMEOUT_MS")
@DefaultValue("50000")
Integer getOssSocketTimeoutMs();

@Key("${OSS_TYPE}_OSS_CONNECTION_TIMEOUT_MS")
@DefaultValue("50000")
Integer getOssConnectionTimeoutMs();

@Key("${OSS_TYPE}_OSS_CONNECTION_REQUEST_TIMEOUT_MS")
@DefaultValue("-1")
Integer getOssConnectionRequestTimeoutMs();

@Key("${OSS_TYPE}_OSS_REQUEST_TIMEOUT_MS")
@DefaultValue("300000")
Integer getOssRequestTimeoutMs();

@Key("${OSS_TYPE}_OSS_RETRY_ENABLED")
@DefaultValue("true")
boolean isRetryEnabled();

@Key("${OSS_TYPE}_OSS_MAX_RETRY_ATTEMPTS")
@DefaultValue("3")
int getOssMaxRetryAttempts();

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.gotocompany.firehose.sink.common.blobstorage;

import com.gotocompany.firehose.config.GCSConfig;
import com.gotocompany.firehose.config.ObjectStorageServiceConfig;
import com.gotocompany.firehose.config.S3Config;
import com.gotocompany.firehose.sink.common.blobstorage.gcs.GoogleCloudStorage;
import com.gotocompany.firehose.sink.common.blobstorage.oss.ObjectStorageService;
import com.gotocompany.firehose.sink.common.blobstorage.s3.S3;
import org.aeonbits.owner.ConfigFactory;

Expand All @@ -27,7 +29,13 @@ public static BlobStorage createObjectStorage(BlobStorageType storageType, Map<S
} catch (Exception e) {
throw new IllegalArgumentException("Exception while creating S3 Storage", e);
}

case OSS:
try {
ObjectStorageServiceConfig objectStorageServiceConfig = ConfigFactory.create(ObjectStorageServiceConfig.class, config);
return new ObjectStorageService(objectStorageServiceConfig);
} catch (Exception e) {
throw new IllegalArgumentException("Exception while creating OSS Storage", e);
}
default:
throw new IllegalArgumentException("Blob Storage Type " + storageType + " is not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

public enum BlobStorageType {
GCS,
S3
S3,
OSS
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.gotocompany.firehose.sink.common.blobstorage.oss;

import com.aliyun.oss.ClientBuilderConfiguration;
import com.aliyun.oss.ClientException;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.common.auth.DefaultCredentialProvider;
import com.aliyun.oss.common.comm.NoRetryStrategy;
import com.aliyun.oss.common.comm.SignVersion;
import com.aliyun.oss.model.BucketList;
import com.aliyun.oss.model.ListBucketsRequest;
import com.aliyun.oss.model.PutObjectRequest;
import com.gotocompany.firehose.config.ObjectStorageServiceConfig;
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorage;
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageException;
import lombok.extern.slf4j.Slf4j;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.util.Optional;

@Slf4j
public class ObjectStorageService implements BlobStorage {

private final OSS oss;
private final ObjectStorageServiceConfig objectStorageServiceConfig;

public ObjectStorageService(ObjectStorageServiceConfig objectStorageServiceConfig) {
this(objectStorageServiceConfig, initializeOss(objectStorageServiceConfig));
}

public ObjectStorageService(ObjectStorageServiceConfig objectStorageServiceConfig, OSS oss) {
this.oss = oss;
this.objectStorageServiceConfig = objectStorageServiceConfig;
checkBucket();
}

private static OSS initializeOss(ObjectStorageServiceConfig objectStorageServiceConfig) {
ClientBuilderConfiguration clientBuilderConfiguration = new ClientBuilderConfiguration();
clientBuilderConfiguration.setSignatureVersion(SignVersion.V4);
clientBuilderConfiguration.setSocketTimeout(objectStorageServiceConfig.getOssSocketTimeoutMs());
clientBuilderConfiguration.setConnectionTimeout(objectStorageServiceConfig.getOssConnectionTimeoutMs());
clientBuilderConfiguration.setConnectionRequestTimeout(objectStorageServiceConfig.getOssConnectionRequestTimeoutMs());
clientBuilderConfiguration.setRequestTimeout(objectStorageServiceConfig.getOssRequestTimeoutMs());
if (objectStorageServiceConfig.isRetryEnabled()) {
clientBuilderConfiguration.setMaxErrorRetry(objectStorageServiceConfig.getOssMaxRetryAttempts());
} else {
clientBuilderConfiguration.setRetryStrategy(new NoRetryStrategy());
}
return OSSClientBuilder.create()
.endpoint(objectStorageServiceConfig.getOssEndpoint())
.region(objectStorageServiceConfig.getOssRegion())
.credentialsProvider(new DefaultCredentialProvider(objectStorageServiceConfig.getOssAccessId(),
objectStorageServiceConfig.getOssAccessKey()))
.clientConfiguration(clientBuilderConfiguration)
.build();
}

@Override
public void store(String objectName, String filePath) throws BlobStorageException {
PutObjectRequest putObjectRequest = new PutObjectRequest(
objectStorageServiceConfig.getOssBucketName(),
buildObjectPath(objectName),
new File(filePath)
);
putObject(putObjectRequest);
}

@Override
public void store(String objectName, byte[] content) throws BlobStorageException {
PutObjectRequest putObjectRequest = new PutObjectRequest(
objectStorageServiceConfig.getOssBucketName(),
buildObjectPath(objectName),
new ByteArrayInputStream(content)
);
putObject(putObjectRequest);
}

private void putObject(PutObjectRequest putObjectRequest) throws BlobStorageException {
try {
oss.putObject(putObjectRequest);
} catch (ClientException e) {
log.error("Failed to put object to OSS", e);
throw new BlobStorageException("client_error", e.getMessage(), e);
} catch (OSSException e) {
log.error("Failed to put object to OSS requestID:{} hostID:{}", e.getRequestId(), e.getHostId());
throw new BlobStorageException(e.getErrorCode(), e.getErrorMessage(), e);
}
}

private String buildObjectPath(String objectName) {
return Optional.ofNullable(objectStorageServiceConfig.getOssDirectoryPrefix())
.map(prefix -> prefix + "/" + objectName)
.orElse(objectName);
}

private void checkBucket() {
BucketList bucketList = oss.listBuckets(new ListBucketsRequest(objectStorageServiceConfig.getOssBucketName(),
null, 1));
if (bucketList.getBucketList().isEmpty()) {
log.error("Bucket does not exist:{}", objectStorageServiceConfig.getOssBucketName());
log.error("Please create OSS bucket before running firehose: {}", objectStorageServiceConfig.getOssBucketName());
throw new IllegalArgumentException("Bucket does not exist");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public static DlqWriter create(Map<String, String> configuration, StatsDReporter
case S3:
configuration.put("S3_TYPE", "DLQ");
break;
case OSS:
configuration.put("OSS_TYPE", "DLQ");
break;
default:
throw new IllegalArgumentException("DLQ Blob Storage type " + dlqConfig.getBlobStorageType() + "is not supported");
}
Expand Down
Loading

0 comments on commit 6409a7e

Please sign in to comment.