Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Alibaba Object Storage Service DLQ #57

Merged
merged 7 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'com.gotocompany'
version '0.11.0'
version '0.11.1'

def projName = "firehose"

Expand Down Expand Up @@ -100,6 +100,7 @@ dependencies {
implementation platform('com.google.cloud:libraries-bom:20.5.0')
implementation 'com.google.cloud:google-cloud-storage:2.20.1'
implementation 'org.apache.logging.log4j:log4j-core:2.20.0'
implementation group: 'com.aliyun.oss', name: 'aliyun-sdk-oss', version: '3.18.1'
implementation group: 'com.gotocompany', name: 'depot', version: '0.10.0'
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'
implementation 'dev.cel:cel:0.5.2'
Expand Down
87 changes: 86 additions & 1 deletion docs/docs/advance/dlq.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Max attempts to retry for dlq.

## `DLQ_BLOB_STORAGE_TYPE`

If the writer type is set to BLOB_STORAGE, we can choose any blob storage. Currently, GCS and S3 is supported.
If the writer type is set to BLOB_STORAGE, we can choose any blob storage. Currently, GCS, S3 and OSS are supported.

* Example value: `GCS`
* Type: `optional`
Expand Down Expand Up @@ -228,3 +228,88 @@ The amount of time to allow the client to complete the execution of an API call.
* Example value: `40000`
* Type: `optional`
* Default value : `40000`

## `DLQ_OSS_ENDPOINT`

The endpoint of the oss service. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/user-guide/regions-and-endpoints?spm=a2c63.p38356.0.0.65ad7fdf6qkcoQ).
Mandatory if DLQ_BLOB_STORAGE_TYPE is OSS.

* Example value: `oss-cn-hangzhou.aliyuncs.com`
* Type: `Required if DLQ_BLOB_STORAGE_TYPE is OSS`
* Default value : `null`

## `DLQ_OSS_ACCESS_KEY_ID`

The access key id of the oss service. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/developer-reference/oss-java-configure-access-credentials#dd657ea839xv1).

* Example value: `youraccessid`
* Type: `Required if DLQ_BLOB_STORAGE_TYPE is OSS`

## `DLQ_OSS_ACCESS_KEY_SECRET`

The access key secret of the oss service. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/developer-reference/oss-java-configure-access-credentials#dd657ea839xv1).

* Example value: `youraccesskey`
* Type: `Required if DLQ_BLOB_STORAGE_TYPE is OSS`

## `DLQ_OSS_BUCKET_NAME`

The name of the oss bucket. Must adhere to the naming rules of oss. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/user-guide/bucket-naming-conventions?spm=a2c63.p38356.0.0.4cdb3962K5f3io).

* Example value: `oss_bucket`
* Type: `Required if DLQ_BLOB_STORAGE_TYPE is OSS`

## `DLQ_OSS_DIRECTORY_PREFIX`

The prefix of the directory in the oss bucket. For more information, please refer to the [oss documentation](https://www.alibabacloud.com/help/en/oss/user-guide/object-naming-conventions).

* Example value: `oss_prefix`
* Type: `optional`

## `DLQ_OSS_SOCKET_TIMEOUT_MS`

The socket timeout in milliseconds.

* Example value: `10000`
* Type: `required`
* Default value : `50000`

## `DLQ_OSS_CONNECTION_TIMEOUT_MS`

The connection timeout in milliseconds.

* Example value: `50000`
* Type: `required`
* Default value : `50000`

## `DLQ_OSS_CONNECTION_REQUEST_TIMEOUT_MS`

The connection request timeout in milliseconds. Negative value indicates no timeout.

* Example value: `100`
* Type: `required`
* Default value : `-1`

## `DLQ_OSS_REQUEST_TIMEOUT_MS`

The request timeout in milliseconds.

* Example value: `50000`
* Type: `required`
* Default value : `300000`

## `DLQ_OSS_RETRY_ENABLED`

The flag to enable retry mechanism for OSS client when transient failure occurred.

* Example value: `true`
* Type: `required`
* Default value : `true`

## `DLQ_OSS_MAX_RETRY_ATTEMPTS`

The maximum number of retry attempts. To be used in conjunction when `DLQ_OSS_RETRY_ENABLED` is set to `true`.

* Example value: `3`
* Type: `required`
* Default value : `3`
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.gotocompany.firehose.config;

import org.aeonbits.owner.Config;

public interface ObjectStorageServiceConfig extends Config {
ekawinataa marked this conversation as resolved.
Show resolved Hide resolved

@Key("${OSS_TYPE}_OSS_ENDPOINT")
String getOssEndpoint();

@Key("${OSS_TYPE}_OSS_REGION")
String getOssRegion();

@Key("${OSS_TYPE}_OSS_ACCESS_ID")
String getOssAccessId();

@Key("${OSS_TYPE}_OSS_ACCESS_KEY")
String getOssAccessKey();

@Key("${OSS_TYPE}_OSS_BUCKET_NAME")
String getOssBucketName();

@Key("${OSS_TYPE}_OSS_DIRECTORY_PREFIX")
String getOssDirectoryPrefix();

@Key("${OSS_TYPE}_OSS_SOCKET_TIMEOUT_MS")
@DefaultValue("50000")
Integer getOssSocketTimeoutMs();

@Key("${OSS_TYPE}_OSS_CONNECTION_TIMEOUT_MS")
@DefaultValue("50000")
Integer getOssConnectionTimeoutMs();

@Key("${OSS_TYPE}_OSS_CONNECTION_REQUEST_TIMEOUT_MS")
@DefaultValue("-1")
Integer getOssConnectionRequestTimeoutMs();

@Key("${OSS_TYPE}_OSS_REQUEST_TIMEOUT_MS")
@DefaultValue("300000")
Integer getOssRequestTimeoutMs();

@Key("${OSS_TYPE}_OSS_RETRY_ENABLED")
@DefaultValue("true")
boolean isRetryEnabled();

@Key("${OSS_TYPE}_OSS_MAX_RETRY_ATTEMPTS")
@DefaultValue("3")
int getOssMaxRetryAttempts();

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.gotocompany.firehose.sink.common.blobstorage;

import com.gotocompany.firehose.config.GCSConfig;
import com.gotocompany.firehose.config.ObjectStorageServiceConfig;
import com.gotocompany.firehose.config.S3Config;
import com.gotocompany.firehose.sink.common.blobstorage.gcs.GoogleCloudStorage;
import com.gotocompany.firehose.sink.common.blobstorage.oss.ObjectStorageService;
import com.gotocompany.firehose.sink.common.blobstorage.s3.S3;
import org.aeonbits.owner.ConfigFactory;

Expand All @@ -27,7 +29,13 @@ public static BlobStorage createObjectStorage(BlobStorageType storageType, Map<S
} catch (Exception e) {
throw new IllegalArgumentException("Exception while creating S3 Storage", e);
}

case OSS:
try {
ObjectStorageServiceConfig objectStorageServiceConfig = ConfigFactory.create(ObjectStorageServiceConfig.class, config);
return new ObjectStorageService(objectStorageServiceConfig);
} catch (Exception e) {
throw new IllegalArgumentException("Exception while creating OSS Storage", e);
}
default:
throw new IllegalArgumentException("Blob Storage Type " + storageType + " is not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

public enum BlobStorageType {
GCS,
S3
S3,
OSS
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.gotocompany.firehose.sink.common.blobstorage.oss;

import com.aliyun.oss.ClientBuilderConfiguration;
import com.aliyun.oss.ClientException;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.common.auth.DefaultCredentialProvider;
import com.aliyun.oss.common.comm.NoRetryStrategy;
import com.aliyun.oss.common.comm.SignVersion;
import com.aliyun.oss.model.BucketList;
import com.aliyun.oss.model.ListBucketsRequest;
import com.aliyun.oss.model.PutObjectRequest;
import com.gotocompany.firehose.config.ObjectStorageServiceConfig;
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorage;
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageException;
import lombok.extern.slf4j.Slf4j;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.util.Optional;

@Slf4j
public class ObjectStorageService implements BlobStorage {

private final OSS oss;
private final ObjectStorageServiceConfig objectStorageServiceConfig;

public ObjectStorageService(ObjectStorageServiceConfig objectStorageServiceConfig) {
this(objectStorageServiceConfig, initializeOss(objectStorageServiceConfig));
}

public ObjectStorageService(ObjectStorageServiceConfig objectStorageServiceConfig, OSS oss) {
this.oss = oss;
this.objectStorageServiceConfig = objectStorageServiceConfig;
checkBucket();
}

protected static OSS initializeOss(ObjectStorageServiceConfig objectStorageServiceConfig) {
ClientBuilderConfiguration clientBuilderConfiguration = new ClientBuilderConfiguration();
clientBuilderConfiguration.setSignatureVersion(SignVersion.V4);
clientBuilderConfiguration.setSocketTimeout(objectStorageServiceConfig.getOssSocketTimeoutMs());
clientBuilderConfiguration.setConnectionTimeout(objectStorageServiceConfig.getOssConnectionTimeoutMs());
clientBuilderConfiguration.setConnectionRequestTimeout(objectStorageServiceConfig.getOssConnectionRequestTimeoutMs());
clientBuilderConfiguration.setRequestTimeout(objectStorageServiceConfig.getOssRequestTimeoutMs());
if (objectStorageServiceConfig.isRetryEnabled()) {
clientBuilderConfiguration.setMaxErrorRetry(objectStorageServiceConfig.getOssMaxRetryAttempts());
} else {
clientBuilderConfiguration.setRetryStrategy(new NoRetryStrategy());
}
return OSSClientBuilder.create()
.endpoint(objectStorageServiceConfig.getOssEndpoint())
.region(objectStorageServiceConfig.getOssRegion())
.credentialsProvider(new DefaultCredentialProvider(objectStorageServiceConfig.getOssAccessId(),
objectStorageServiceConfig.getOssAccessKey()))
.clientConfiguration(clientBuilderConfiguration)
.build();
}

@Override
public void store(String objectName, String filePath) throws BlobStorageException {
PutObjectRequest putObjectRequest = new PutObjectRequest(
objectStorageServiceConfig.getOssBucketName(),
buildObjectPath(objectName),
new File(filePath)
);
putObject(putObjectRequest);
}

@Override
public void store(String objectName, byte[] content) throws BlobStorageException {
PutObjectRequest putObjectRequest = new PutObjectRequest(
objectStorageServiceConfig.getOssBucketName(),
buildObjectPath(objectName),
new ByteArrayInputStream(content)
);
putObject(putObjectRequest);
}

private void putObject(PutObjectRequest putObjectRequest) throws BlobStorageException {
try {
oss.putObject(putObjectRequest);
} catch (ClientException e) {
log.error("Failed to put object to OSS", e);
throw new BlobStorageException("client_error", e.getMessage(), e);
} catch (OSSException e) {
log.error("Failed to put object to OSS requestID:{} hostID:{}", e.getRequestId(), e.getHostId());
throw new BlobStorageException(e.getErrorCode(), e.getErrorMessage(), e);
}
}

private String buildObjectPath(String objectName) {
return Optional.ofNullable(objectStorageServiceConfig.getOssDirectoryPrefix())
.map(prefix -> prefix + "/" + objectName)
.orElse(objectName);
}

private void checkBucket() {
BucketList bucketList = oss.listBuckets(new ListBucketsRequest(objectStorageServiceConfig.getOssBucketName(),
null, 1));
if (bucketList.getBucketList().isEmpty()) {
log.error("Bucket does not exist:{}", objectStorageServiceConfig.getOssBucketName());
log.error("Please create OSS bucket before running firehose: {}", objectStorageServiceConfig.getOssBucketName());
throw new IllegalArgumentException("Bucket does not exist");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public static DlqWriter create(Map<String, String> configuration, StatsDReporter
case S3:
configuration.put("S3_TYPE", "DLQ");
break;
case OSS:
configuration.put("OSS_TYPE", "DLQ");
break;
ekawinataa marked this conversation as resolved.
Show resolved Hide resolved
default:
throw new IllegalArgumentException("DLQ Blob Storage type " + dlqConfig.getBlobStorageType() + "is not supported");
}
Expand Down
Loading
Loading