From d0bb42730161338804b701b32473cce5aed1101b Mon Sep 17 00:00:00 2001 From: rajuGT Date: Wed, 16 Oct 2024 02:31:10 +0530 Subject: [PATCH] Add capability to dagger to read python udfs from Ali(oss) and Tencent(cosn) storage services Given the configuration provided correctly. Set the below environment variables accordingly to access the files stored in the respective bucket. Ali(oss) - OSS_ACCESS_KEY_ID - OSS_ACCESS_KEY_SECRET Tencent(cos) - COS_SECRET_ID - COS_SECRET_KEY - COS_REGION --- dagger-functions/build.gradle | 2 + .../python/file/source/FileSourceFactory.java | 6 ++ .../python/file/source/cos/CosClient.java | 66 +++++++++++++++++++ .../python/file/source/cos/CosFileSource.java | 47 +++++++++++++ .../python/file/source/oss/OssClient.java | 61 +++++++++++++++++ .../python/file/source/oss/OssFileSource.java | 47 +++++++++++++ .../file/source/FileSourceFactoryTest.java | 20 ++++++ .../python/file/source/cos/CosClientTest.java | 52 +++++++++++++++ .../file/source/cos/CosFileSourceTest.java | 38 +++++++++++ .../python/file/source/oss/OssClientTest.java | 48 ++++++++++++++ .../file/source/oss/OssFileSourceTest.java | 38 +++++++++++ 11 files changed, 425 insertions(+) create mode 100644 dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosClient.java create mode 100644 dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosFileSource.java create mode 100644 dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssClient.java create mode 100644 dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssFileSource.java create mode 100644 dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosClientTest.java create mode 100644 dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosFileSourceTest.java create mode 100644 dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssClientTest.java create mode 100644 dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssFileSourceTest.java diff --git a/dagger-functions/build.gradle b/dagger-functions/build.gradle index a14474508..d85f905ca 100644 --- a/dagger-functions/build.gradle +++ b/dagger-functions/build.gradle @@ -64,6 +64,8 @@ dependencies { dependenciesFunctionsJar group: 'org.apache.commons', name: 'commons-jexl3', version: '3.1' dependenciesFunctionsJar group: 'org.isuper', name: 's2-geometry-library-java', version: '0.0.1' dependenciesFunctionsJar group: 'com.google.cloud', name: 'google-cloud-storage', version: '2.23.0' + dependenciesFunctionsJar group: 'com.aliyun.oss', name: 'aliyun-sdk-oss', version: '3.18.1' + dependenciesFunctionsJar group: 'com.qcloud', name: 'cos_api', version: '5.6.227' testImplementation project(':dagger-common').sourceSets.test.output testImplementation group: 'junit', name: 'junit', version: '4.12' diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/FileSourceFactory.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/FileSourceFactory.java index 60f1fe94a..9aab9b500 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/FileSourceFactory.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/FileSourceFactory.java @@ -1,7 +1,9 @@ package com.gotocompany.dagger.functions.udfs.python.file.source; +import com.gotocompany.dagger.functions.udfs.python.file.source.cos.CosFileSource; import com.gotocompany.dagger.functions.udfs.python.file.source.gcs.GcsFileSource; import com.gotocompany.dagger.functions.udfs.python.file.source.local.LocalFileSource; +import com.gotocompany.dagger.functions.udfs.python.file.source.oss.OssFileSource; /** * The type File source factory. @@ -17,6 +19,10 @@ public class FileSourceFactory { public static FileSource getFileSource(String pythonFile) { if ("GS".equals(getFileSourcePrefix(pythonFile))) { return new GcsFileSource(pythonFile); + } else if ("OSS".equals(getFileSourcePrefix(pythonFile))) { + return new OssFileSource(pythonFile); + } else if ("COSN".equals(getFileSourcePrefix(pythonFile))) { + return new CosFileSource(pythonFile); } else { return new LocalFileSource(pythonFile); } diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosClient.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosClient.java new file mode 100644 index 000000000..d787db38a --- /dev/null +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosClient.java @@ -0,0 +1,66 @@ +package com.gotocompany.dagger.functions.udfs.python.file.source.cos; + +import com.qcloud.cos.COSClient; +import com.qcloud.cos.ClientConfig; +import com.qcloud.cos.auth.BasicCOSCredentials; +import com.qcloud.cos.auth.COSCredentials; +import com.qcloud.cos.model.COSObject; +import com.qcloud.cos.model.COSObjectInputStream; +import com.qcloud.cos.region.Region; +import com.qcloud.cos.utils.IOUtils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class CosClient { + + // TODO find better way to initialize clients + private final String ENV_COS_SECRET_ID = "COS_SECRET_ID"; + private final String ENV_COS_SECRET_KEY = "COS_SECRET_KEY"; + private final String ENV_COS_REGION = "COS_REGION"; + + private final COSClient libCosClient; + + /** + * Instantiates a new Cos client. + */ + public CosClient() { + String secretID = System.getenv(ENV_COS_SECRET_ID); + String secretKey = System.getenv(ENV_COS_SECRET_KEY); + String region = System.getenv(ENV_COS_REGION); // ap-singapore + + COSCredentials credentials = new BasicCOSCredentials(secretID, secretKey); + ClientConfig clientConfig = new ClientConfig(new Region(region)); + libCosClient = new COSClient(credentials, clientConfig); + } + + /** + * Instantiates a new Cos client. + * This constructor used for unit test purposes. + * + * @param libCosClient the storage + */ + public CosClient(COSClient libCosClient) { + this.libCosClient = libCosClient; + } + + /** + * Get file byte [ ]. + * + * @param pythonFile the python file + * @return the byte [ ] + */ + public byte[] getFile(String pythonFile) throws IOException { + List file = Arrays.asList(pythonFile.replace("cosn://", "").split("/")); + + String bucketName = file.get(0); + String objectName = file.stream().skip(1).collect(Collectors.joining("/")); + + COSObject cosObject = libCosClient.getObject(bucketName, objectName); + try (COSObjectInputStream inputStream = cosObject.getObjectContent()) { + return IOUtils.toByteArray(inputStream); + } + } +} diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosFileSource.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosFileSource.java new file mode 100644 index 000000000..8f1795fb0 --- /dev/null +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosFileSource.java @@ -0,0 +1,47 @@ +package com.gotocompany.dagger.functions.udfs.python.file.source.cos; + +import com.gotocompany.dagger.functions.udfs.python.file.source.FileSource; + +import java.io.IOException; + +public class CosFileSource implements FileSource { + + private CosClient cosClient; + private final String pythonFile; + + /** + * Instantiates a new Cos file source. + * + * @param pythonFile the python file + */ + public CosFileSource(String pythonFile) { + this.pythonFile = pythonFile; + } + + /** + * Instantiates a new Cos file source. + * + * @param pythonFile the python file + */ + public CosFileSource(String pythonFile, CosClient cosClient) { + this.pythonFile = pythonFile; + this.cosClient = cosClient; + } + + @Override + public byte[] getObjectFile() throws IOException { + return getCosClient().getFile(pythonFile); + } + + /** + * Gets cos client. + * + * @return the cos client + */ + private CosClient getCosClient() { + if (this.cosClient == null) { + this.cosClient = new CosClient(); + } + return this.cosClient; + } +} diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssClient.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssClient.java new file mode 100644 index 000000000..b6b15cff8 --- /dev/null +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssClient.java @@ -0,0 +1,61 @@ +package com.gotocompany.dagger.functions.udfs.python.file.source.oss; + +import com.aliyun.core.utils.IOUtils; +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import com.aliyun.oss.common.auth.CredentialsProviderFactory; +import com.aliyun.oss.model.OSSObject; +import com.aliyuncs.exceptions.ClientException; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class OssClient { + + // TODO refactor to take this value from the configuration + private static final String endpoint = "oss-cn-hangzhou.aliyuncs.com"; + + private final OSS libOssClient; + + /** + * Instantiates a new Oss client. + */ + public OssClient() { + try { + libOssClient = new OSSClientBuilder().build(endpoint, CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider()); + } catch (ClientException e) { + throw new RuntimeException("failed to initialise oss client", e); + } + } + + /** + * Instantiates a new OSS client. + * This constructor used for unit test purposes. + * + * @param libOssClient the storage + */ + public OssClient(OSS libOssClient) { + this.libOssClient = libOssClient; + } + + /** + * Get file byte [ ]. + * + * @param pythonFile the python file + * @return the byte [ ] + */ + public byte[] getFile(String pythonFile) throws IOException { + List file = Arrays.asList(pythonFile.replace("oss://", "").split("/")); + + String bucketName = file.get(0); + String objectName = file.stream().skip(1).collect(Collectors.joining("/")); + + OSSObject ossObject = libOssClient.getObject(bucketName, objectName); + try (InputStream inputStream = ossObject.getObjectContent()) { + return IOUtils.toByteArray(inputStream); + } + } +} diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssFileSource.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssFileSource.java new file mode 100644 index 000000000..cc183005b --- /dev/null +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssFileSource.java @@ -0,0 +1,47 @@ +package com.gotocompany.dagger.functions.udfs.python.file.source.oss; + +import com.gotocompany.dagger.functions.udfs.python.file.source.FileSource; + +import java.io.IOException; + +public class OssFileSource implements FileSource { + + private OssClient ossClient; + private final String pythonFile; + + /** + * Instantiates a new Oss file source. + * + * @param pythonFile the python file + */ + public OssFileSource(String pythonFile) { + this.pythonFile = pythonFile; + } + + /** + * Instantiates a new Oss file source. + * + * @param pythonFile the python file + */ + public OssFileSource(String pythonFile, OssClient ossClient) { + this.pythonFile = pythonFile; + this.ossClient = ossClient; + } + + @Override + public byte[] getObjectFile() throws IOException { + return getOssClient().getFile(pythonFile); + } + + /** + * Gets oss client. + * + * @return the oss client + */ + private OssClient getOssClient() { + if (this.ossClient == null) { + this.ossClient = new OssClient(); + } + return this.ossClient; + } +} diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/FileSourceFactoryTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/FileSourceFactoryTest.java index 9bcaadbf0..4dbfbb81a 100644 --- a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/FileSourceFactoryTest.java +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/FileSourceFactoryTest.java @@ -1,7 +1,9 @@ package com.gotocompany.dagger.functions.udfs.python.file.source; +import com.gotocompany.dagger.functions.udfs.python.file.source.cos.CosFileSource; import com.gotocompany.dagger.functions.udfs.python.file.source.gcs.GcsFileSource; import com.gotocompany.dagger.functions.udfs.python.file.source.local.LocalFileSource; +import com.gotocompany.dagger.functions.udfs.python.file.source.oss.OssFileSource; import org.junit.Assert; import org.junit.Test; @@ -24,4 +26,22 @@ public void shouldGetGcsFileSource() { Assert.assertTrue(fileSource instanceof GcsFileSource); } + + @Test + public void shouldGetOssFileSource() { + String pythonFile = "oss://bucket-name/path/to/file/test_function.py"; + + FileSource fileSource = FileSourceFactory.getFileSource(pythonFile); + + Assert.assertTrue(fileSource instanceof OssFileSource); + } + + @Test + public void shouldGetCosnFileSource() { + String pythonFile = "cosn://bucket-name/path/to/file/test_function.py"; + + FileSource fileSource = FileSourceFactory.getFileSource(pythonFile); + + Assert.assertTrue(fileSource instanceof CosFileSource); + } } diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosClientTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosClientTest.java new file mode 100644 index 000000000..aad1cdb43 --- /dev/null +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosClientTest.java @@ -0,0 +1,52 @@ +package com.gotocompany.dagger.functions.udfs.python.file.source.cos; + +import com.qcloud.cos.COSClient; +import com.qcloud.cos.model.COSObject; +import com.qcloud.cos.model.COSObjectInputStream; +import org.apache.http.client.methods.HttpRequestBase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Arrays; + +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +public class CosClientTest { + + @Mock + private COSClient libCosClient; + + @Mock + private COSObject cosObject; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldGetObjectFile() throws IOException { + HttpRequestBase mockRequest = Mockito.mock(HttpRequestBase.class); + + String pythonFile = "cosn://bucket_name/path/to/file/python_udf.zip"; + String bucketName = "bucket_name"; + String objectName = "path/to/file/python_udf.zip"; + String expectedValue = Arrays.toString("objectFile".getBytes()); + + when(libCosClient.getObject(bucketName, objectName)).thenReturn(cosObject); + when(cosObject.getObjectContent()).thenReturn(new COSObjectInputStream(new ByteArrayInputStream("objectFile".getBytes()), mockRequest)); + + CosClient cosClient = new CosClient(libCosClient); + byte[] actualValue = cosClient.getFile(pythonFile); + + verify(libCosClient, times(1)).getObject(bucketName, objectName); + verify(cosObject, times(1)).getObjectContent(); + Assert.assertEquals(expectedValue, Arrays.toString(actualValue)); + } +} diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosFileSourceTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosFileSourceTest.java new file mode 100644 index 000000000..f0700a4bb --- /dev/null +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/cos/CosFileSourceTest.java @@ -0,0 +1,38 @@ +package com.gotocompany.dagger.functions.udfs.python.file.source.cos; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class CosFileSourceTest { + + @Mock + private CosClient cosClient; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldGetObjectFile() throws IOException { + ClassLoader classLoader = getClass().getClassLoader(); + String pythonFile = classLoader.getResource("python_udf.zip").getFile(); + byte[] expectedObject = Files.readAllBytes(Paths.get(pythonFile)); + + when(cosClient.getFile(pythonFile)).thenReturn(expectedObject); + CosFileSource cosFileSource = new CosFileSource(pythonFile, cosClient); + + byte[] actualObject = cosFileSource.getObjectFile(); + + Assert.assertEquals(expectedObject, actualObject); + } +} diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssClientTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssClientTest.java new file mode 100644 index 000000000..f89b4fcc5 --- /dev/null +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssClientTest.java @@ -0,0 +1,48 @@ +package com.gotocompany.dagger.functions.udfs.python.file.source.oss; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.model.OSSObject; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Arrays; + +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +public class OssClientTest { + + @Mock + private OSS libOSSClient; + + @Mock + private OSSObject ossObject; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldGetObjectFile() throws IOException { + + String pythonFile = "oss://bucket_name/path/to/file/python_udf.zip"; + String bucketName = "bucket_name"; + String objectName = "path/to/file/python_udf.zip"; + String expectedValue = Arrays.toString("objectFile".getBytes()); + + when(libOSSClient.getObject(bucketName, objectName)).thenReturn(ossObject); + when(ossObject.getObjectContent()).thenReturn(new ByteArrayInputStream("objectFile".getBytes())); + + OssClient ossClient = new OssClient(libOSSClient); + byte[] actualValue = ossClient.getFile(pythonFile); + + verify(libOSSClient, times(1)).getObject(bucketName, objectName); + verify(ossObject, times(1)).getObjectContent(); + Assert.assertEquals(expectedValue, Arrays.toString(actualValue)); + } +} diff --git a/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssFileSourceTest.java b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssFileSourceTest.java new file mode 100644 index 000000000..4b7b444ac --- /dev/null +++ b/dagger-functions/src/test/java/com/gotocompany/dagger/functions/udfs/python/file/source/oss/OssFileSourceTest.java @@ -0,0 +1,38 @@ +package com.gotocompany.dagger.functions.udfs.python.file.source.oss; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class OssFileSourceTest { + + @Mock + private OssClient ossClient; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldGetObjectFile() throws IOException { + ClassLoader classLoader = getClass().getClassLoader(); + String pythonFile = classLoader.getResource("python_udf.zip").getFile(); + byte[] expectedObject = Files.readAllBytes(Paths.get(pythonFile)); + + when(ossClient.getFile(pythonFile)).thenReturn(expectedObject); + OssFileSource ossFileSource = new OssFileSource(pythonFile, ossClient); + + byte[] actualObject = ossFileSource.getObjectFile(); + + Assert.assertEquals(expectedObject, actualObject); + } +}