Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add capability to dagger to read python udfs from Ali(oss) and Tencent(cosn) storage services #42

Open
wants to merge 3 commits into
base: add-publish-to-maven-local-tasks
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dagger-functions/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ dependencies {
dependenciesFunctionsJar group: 'org.apache.commons', name: 'commons-jexl3', version: '3.1'
dependenciesFunctionsJar group: 'org.isuper', name: 's2-geometry-library-java', version: '0.0.1'
dependenciesFunctionsJar group: 'com.google.cloud', name: 'google-cloud-storage', version: '2.23.0'
dependenciesFunctionsJar group: 'com.aliyun.oss', name: 'aliyun-sdk-oss', version: '3.18.1'
dependenciesFunctionsJar group: 'com.qcloud', name: 'cos_api', version: '5.6.227'

testImplementation project(':dagger-common').sourceSets.test.output
testImplementation group: 'junit', name: 'junit', version: '4.12'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.gotocompany.dagger.functions.udfs.python.file.source;

import com.gotocompany.dagger.functions.udfs.python.file.source.cos.CosFileSource;
import com.gotocompany.dagger.functions.udfs.python.file.source.gcs.GcsFileSource;
import com.gotocompany.dagger.functions.udfs.python.file.source.local.LocalFileSource;
import com.gotocompany.dagger.functions.udfs.python.file.source.oss.OssFileSource;

/**
* The type File source factory.
Expand All @@ -17,6 +19,10 @@ public class FileSourceFactory {
public static FileSource getFileSource(String pythonFile) {
if ("GS".equals(getFileSourcePrefix(pythonFile))) {
return new GcsFileSource(pythonFile);
} else if ("OSS".equals(getFileSourcePrefix(pythonFile))) {
return new OssFileSource(pythonFile);
} else if ("COSN".equals(getFileSourcePrefix(pythonFile))) {
return new CosFileSource(pythonFile);
} else {
return new LocalFileSource(pythonFile);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.gotocompany.dagger.functions.udfs.python.file.source.cos;

import com.qcloud.cos.COSClient;
import com.qcloud.cos.ClientConfig;
import com.qcloud.cos.auth.BasicCOSCredentials;
import com.qcloud.cos.auth.COSCredentials;
import com.qcloud.cos.model.COSObject;
import com.qcloud.cos.model.COSObjectInputStream;
import com.qcloud.cos.region.Region;
import com.qcloud.cos.utils.IOUtils;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class CosClient {

private static final String ENV_COS_SECRET_ID = "COS_SECRET_ID";
private static final String ENV_COS_SECRET_KEY = "COS_SECRET_KEY";
private static final String ENV_COS_REGION = "COS_REGION";

private final COSClient libCosClient;

/**
* Instantiates a new Cos client.
*/
public CosClient() {
String secretID = System.getenv(ENV_COS_SECRET_ID);
String secretKey = System.getenv(ENV_COS_SECRET_KEY);
String region = System.getenv(ENV_COS_REGION); // ap-singapore

COSCredentials credentials = new BasicCOSCredentials(secretID, secretKey);
ClientConfig clientConfig = new ClientConfig(new Region(region));
libCosClient = new COSClient(credentials, clientConfig);
}

/**
* Instantiates a new Cos client.
* This constructor used for unit test purposes.
*
* @param libCosClient the storage
*/
public CosClient(COSClient libCosClient) {
this.libCosClient = libCosClient;
}

/**
* Get file byte [ ].
*
* @param pythonFile the python file
* @return the byte [ ]
*/
public byte[] getFile(String pythonFile) throws IOException {
List<String> file = Arrays.asList(pythonFile.replace("cosn://", "").split("/"));

String bucketName = file.get(0);
String objectName = file.stream().skip(1).collect(Collectors.joining("/"));

COSObject cosObject = libCosClient.getObject(bucketName, objectName);
try (COSObjectInputStream inputStream = cosObject.getObjectContent()) {
return IOUtils.toByteArray(inputStream);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.gotocompany.dagger.functions.udfs.python.file.source.cos;

import com.gotocompany.dagger.functions.udfs.python.file.source.FileSource;

import java.io.IOException;

public class CosFileSource implements FileSource {

private CosClient cosClient;
private final String pythonFile;

/**
* Instantiates a new Cos file source.
*
* @param pythonFile the python file
*/
public CosFileSource(String pythonFile) {
this.pythonFile = pythonFile;
}

/**
* Instantiates a new Cos file source.
*
* @param pythonFile the python file
*/
public CosFileSource(String pythonFile, CosClient cosClient) {
this.pythonFile = pythonFile;
this.cosClient = cosClient;
}

@Override
public byte[] getObjectFile() throws IOException {
return getCosClient().getFile(pythonFile);
}

/**
* Gets cos client.
*
* @return the cos client
*/
private CosClient getCosClient() {
if (this.cosClient == null) {
this.cosClient = new CosClient();
}
return this.cosClient;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.gotocompany.dagger.functions.udfs.python.file.source.oss;

import com.aliyun.core.utils.IOUtils;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
import com.aliyun.oss.model.OSSObject;
import com.aliyuncs.exceptions.ClientException;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class OssClient {

private static final String ENV_OSS_ENDPOINT = "OSS_ENDPOINT";
private static final String DEFAULT_OSS_ENDPOINT = "oss-ap-southeast-1.aliyuncs.com";

private final OSS libOssClient;

/**
* Instantiates a new Oss client.
*/
public OssClient() {
String endpoint = System.getenv(ENV_OSS_ENDPOINT);
if (endpoint == null || endpoint.isEmpty()) {
endpoint = DEFAULT_OSS_ENDPOINT;
}
try {
libOssClient = new OSSClientBuilder().build(endpoint, CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider());
} catch (ClientException e) {
throw new RuntimeException("failed to initialise oss client", e);
}
}

/**
* Instantiates a new OSS client.
* This constructor used for unit test purposes.
*
* @param libOssClient the storage
*/
public OssClient(OSS libOssClient) {
this.libOssClient = libOssClient;
}

/**
* Get file byte [ ].
*
* @param pythonFile the python file
* @return the byte [ ]
*/
public byte[] getFile(String pythonFile) throws IOException {
List<String> file = Arrays.asList(pythonFile.replace("oss://", "").split("/"));

String bucketName = file.get(0);
String objectName = file.stream().skip(1).collect(Collectors.joining("/"));

OSSObject ossObject = libOssClient.getObject(bucketName, objectName);
try (InputStream inputStream = ossObject.getObjectContent()) {
return IOUtils.toByteArray(inputStream);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.gotocompany.dagger.functions.udfs.python.file.source.oss;

import com.gotocompany.dagger.functions.udfs.python.file.source.FileSource;

import java.io.IOException;

public class OssFileSource implements FileSource {

private OssClient ossClient;
private final String pythonFile;

/**
* Instantiates a new Oss file source.
*
* @param pythonFile the python file
*/
public OssFileSource(String pythonFile) {
this.pythonFile = pythonFile;
}

/**
* Instantiates a new Oss file source.
*
* @param pythonFile the python file
*/
public OssFileSource(String pythonFile, OssClient ossClient) {
this.pythonFile = pythonFile;
this.ossClient = ossClient;
}

@Override
public byte[] getObjectFile() throws IOException {
return getOssClient().getFile(pythonFile);
}

/**
* Gets oss client.
*
* @return the oss client
*/
private OssClient getOssClient() {
if (this.ossClient == null) {
this.ossClient = new OssClient();
}
return this.ossClient;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.gotocompany.dagger.functions.udfs.python.file.source;

import com.gotocompany.dagger.functions.udfs.python.file.source.cos.CosFileSource;
import com.gotocompany.dagger.functions.udfs.python.file.source.gcs.GcsFileSource;
import com.gotocompany.dagger.functions.udfs.python.file.source.local.LocalFileSource;
import com.gotocompany.dagger.functions.udfs.python.file.source.oss.OssFileSource;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -24,4 +26,22 @@ public void shouldGetGcsFileSource() {

Assert.assertTrue(fileSource instanceof GcsFileSource);
}

@Test
public void shouldGetOssFileSource() {
String pythonFile = "oss://bucket-name/path/to/file/test_function.py";

FileSource fileSource = FileSourceFactory.getFileSource(pythonFile);

Assert.assertTrue(fileSource instanceof OssFileSource);
}

@Test
public void shouldGetCosnFileSource() {
String pythonFile = "cosn://bucket-name/path/to/file/test_function.py";

FileSource fileSource = FileSourceFactory.getFileSource(pythonFile);

Assert.assertTrue(fileSource instanceof CosFileSource);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.gotocompany.dagger.functions.udfs.python.file.source.cos;

import com.qcloud.cos.COSClient;
import com.qcloud.cos.model.COSObject;
import com.qcloud.cos.model.COSObjectInputStream;
import org.apache.http.client.methods.HttpRequestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Arrays;

import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;

public class CosClientTest {

@Mock
private COSClient libCosClient;

@Mock
private COSObject cosObject;

@Before
public void setup() {
initMocks(this);
}

@Test
public void shouldGetObjectFile() throws IOException {
HttpRequestBase mockRequest = Mockito.mock(HttpRequestBase.class);

String pythonFile = "cosn://bucket_name/path/to/file/python_udf.zip";
String bucketName = "bucket_name";
String objectName = "path/to/file/python_udf.zip";
String expectedValue = Arrays.toString("objectFile".getBytes());

when(libCosClient.getObject(bucketName, objectName)).thenReturn(cosObject);
when(cosObject.getObjectContent()).thenReturn(new COSObjectInputStream(new ByteArrayInputStream("objectFile".getBytes()), mockRequest));

CosClient cosClient = new CosClient(libCosClient);
byte[] actualValue = cosClient.getFile(pythonFile);

verify(libCosClient, times(1)).getObject(bucketName, objectName);
verify(cosObject, times(1)).getObjectContent();
Assert.assertEquals(expectedValue, Arrays.toString(actualValue));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.gotocompany.dagger.functions.udfs.python.file.source.cos;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

public class CosFileSourceTest {

@Mock
private CosClient cosClient;

@Before
public void setup() {
initMocks(this);
}

@Test
public void shouldGetObjectFile() throws IOException {
ClassLoader classLoader = getClass().getClassLoader();
String pythonFile = classLoader.getResource("python_udf.zip").getFile();
byte[] expectedObject = Files.readAllBytes(Paths.get(pythonFile));

when(cosClient.getFile(pythonFile)).thenReturn(expectedObject);
CosFileSource cosFileSource = new CosFileSource(pythonFile, cosClient);

byte[] actualObject = cosFileSource.getObjectFile();

Assert.assertEquals(expectedObject, actualObject);
}
}
Loading
Loading