diff --git a/env/flink/env_flink.cc b/env/flink/env_flink.cc index eae1773cf..3ef7927f6 100644 --- a/env/flink/env_flink.cc +++ b/env/flink/env_flink.cc @@ -305,11 +305,16 @@ class FlinkDirectory : public FSDirectory { }; FlinkFileSystem::FlinkFileSystem(const std::shared_ptr& base_fs, - const std::string& base_path) - : FileSystemWrapper(base_fs), base_path_(TrimTrailingSlash(base_path)) {} + const std::string& base_path, + jobject file_system_instance) + : FileSystemWrapper(base_fs), + base_path_(TrimTrailingSlash(base_path)), + file_system_instance_(file_system_instance), + file_system_instance_managed_(file_system_instance == nullptr) { +} FlinkFileSystem::~FlinkFileSystem() { - if (file_system_instance_ != nullptr) { + if (file_system_instance_managed_ && file_system_instance_ != nullptr) { JNIEnv* env = getJNIEnv(); env->DeleteGlobalRef(file_system_instance_); } @@ -325,48 +330,62 @@ Status FlinkFileSystem::Init() { } class_cache_ = javaClassCache.release(); - // Delegate Flink to load real FileSystem (e.g. - // S3FileSystem/OSSFileSystem/...) - JavaClassCache::JavaClassContext fileSystemClass = - class_cache_->GetJClass(JavaClassCache::JC_FLINK_FILE_SYSTEM); - JavaClassCache::JavaMethodContext fileSystemGetMethod = - class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_GET); - - JavaClassCache::JavaClassContext uriClass = - class_cache_->GetJClass(JavaClassCache::JC_URI); - JavaClassCache::JavaMethodContext uriConstructor = - class_cache_->GetJMethod(JavaClassCache::JM_FLINK_URI_CONSTRUCTOR); - - // Construct URI - jstring uriStringArg = jniEnv->NewStringUTF(base_path_.c_str()); - jobject uriInstance = jniEnv->NewObject( - uriClass.javaClass, uriConstructor.javaMethod, uriStringArg); - jniEnv->DeleteLocalRef(uriStringArg); - if (uriInstance == nullptr) { + if (file_system_instance_ == nullptr) { + // Delegate Flink to load real FileSystem (e.g. + // S3FileSystem/OSSFileSystem/...) + JavaClassCache::JavaClassContext fileSystemClass = + class_cache_->GetJClass(JavaClassCache::JC_FLINK_FILE_SYSTEM); + JavaClassCache::JavaMethodContext fileSystemGetMethod = + class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_GET); + + JavaClassCache::JavaClassContext uriClass = + class_cache_->GetJClass(JavaClassCache::JC_URI); + JavaClassCache::JavaMethodContext uriConstructor = + class_cache_->GetJMethod(JavaClassCache::JM_FLINK_URI_CONSTRUCTOR); + + // Construct URI + jstring uriStringArg = jniEnv->NewStringUTF(base_path_.c_str()); + jobject uriInstance = jniEnv->NewObject( + uriClass.javaClass, uriConstructor.javaMethod, uriStringArg); + jniEnv->DeleteLocalRef(uriStringArg); + if (uriInstance == nullptr) { + return CheckThenError( + std::string("NewObject Exception when Init FlinkFileSystem, ") + .append(uriClass.ToString()) + .append(uriConstructor.ToString()) + .append(", args: ") + .append(base_path_)); + } + + // Construct FileSystem + jobject fileSystemInstance = jniEnv->CallStaticObjectMethod( + fileSystemClass.javaClass, fileSystemGetMethod.javaMethod, uriInstance); + jniEnv->DeleteLocalRef(uriInstance); + if (fileSystemInstance == nullptr || jniEnv->ExceptionCheck()) { + return CheckThenError( + std::string( + "CallStaticObjectMethod Exception when Init FlinkFileSystem, ") + .append(fileSystemClass.ToString()) + .append(fileSystemGetMethod.ToString()) + .append(", args: URI(") + .append(base_path_) + .append(")")); + } + file_system_instance_ = jniEnv->NewGlobalRef(fileSystemInstance); + jniEnv->DeleteLocalRef(fileSystemInstance); + } + + if (file_system_instance_ == nullptr) { return CheckThenError( - std::string("NewObject Exception when Init FlinkFileSystem, ") - .append(uriClass.ToString()) - .append(uriConstructor.ToString()) - .append(", args: ") - .append(base_path_)); - } - - // Construct FileSystem - jobject fileSystemInstance = jniEnv->CallStaticObjectMethod( - fileSystemClass.javaClass, fileSystemGetMethod.javaMethod, uriInstance); - jniEnv->DeleteLocalRef(uriInstance); - if (fileSystemInstance == nullptr || jniEnv->ExceptionCheck()) { + std::string( + "Error when init flink env, the file system provided is null")); + } + + if (jniEnv->ExceptionCheck()) { return CheckThenError( std::string( - "CallStaticObjectMethod Exception when Init FlinkFileSystem, ") - .append(fileSystemClass.ToString()) - .append(fileSystemGetMethod.ToString()) - .append(", args: URI(") - .append(base_path_) - .append(")")); + "Error when init flink env, JNI throws exception.")); } - file_system_instance_ = jniEnv->NewGlobalRef(fileSystemInstance); - jniEnv->DeleteLocalRef(fileSystemInstance); return Status::OK(); } @@ -856,17 +875,20 @@ IOStatus FlinkFileSystem::UnlockFile(FileLock* /*lock*/, Status FlinkFileSystem::Create(const std::shared_ptr& base, const std::string& uri, - std::unique_ptr* result) { - auto* fileSystem = new FlinkFileSystem(base, uri); + std::unique_ptr* result, + jobject file_system_instance) { + auto* fileSystem = new FlinkFileSystem(base, uri, + file_system_instance); Status status = fileSystem->Init(); result->reset(fileSystem); return status; } Status NewFlinkEnv(const std::string& uri, - std::unique_ptr* flinkFileSystem) { + std::unique_ptr* flinkFileSystem, + jobject file_system_instance) { std::shared_ptr fs; - Status s = NewFlinkFileSystem(uri, &fs); + Status s = NewFlinkFileSystem(uri, &fs, file_system_instance); if (s.ok()) { *flinkFileSystem = NewCompositeEnv(fs); } @@ -874,10 +896,12 @@ Status NewFlinkEnv(const std::string& uri, } Status NewFlinkFileSystem(const std::string& uri, - std::shared_ptr* fs) { + std::shared_ptr* fs, + jobject file_system_instance) { std::unique_ptr flinkFileSystem; Status s = - FlinkFileSystem::Create(FileSystem::Default(), uri, &flinkFileSystem); + FlinkFileSystem::Create(FileSystem::Default(), uri, &flinkFileSystem, + file_system_instance); if (s.ok()) { fs->reset(flinkFileSystem.release()); } diff --git a/env/flink/env_flink.h b/env/flink/env_flink.h index 04295815f..97ead9ba3 100644 --- a/env/flink/env_flink.h +++ b/env/flink/env_flink.h @@ -34,7 +34,8 @@ class FlinkFileSystem : public FileSystemWrapper { // base_path static Status Create(const std::shared_ptr& /*base_fs*/, const std::string& /*base_path*/, - std::unique_ptr* /*fs*/); + std::unique_ptr* /*fs*/, + jobject file_system_instance); // Define some names static const char* kClassName() { return "FlinkFileSystem"; } @@ -101,9 +102,11 @@ class FlinkFileSystem : public FileSystemWrapper { const std::string base_path_; JavaClassCache* class_cache_; jobject file_system_instance_; + bool file_system_instance_managed_; explicit FlinkFileSystem(const std::shared_ptr& base, - const std::string& fsname); + const std::string& fsname, + jobject file_system_instance); // Init FileSystem Status Init(); @@ -126,8 +129,10 @@ class FlinkFileSystem : public FileSystemWrapper { }; // Returns a `FlinkEnv` with base_path -Status NewFlinkEnv(const std::string& base_path, std::unique_ptr* env); +Status NewFlinkEnv(const std::string& base_path, std::unique_ptr* env, + jobject file_system_instance); // Returns a `FlinkFileSystem` with base_path Status NewFlinkFileSystem(const std::string& base_path, - std::shared_ptr* fs); + std::shared_ptr* fs, + jobject file_system_instance); } // namespace ROCKSDB_NAMESPACE diff --git a/env/flink/env_flink_test_suite.cc b/env/flink/env_flink_test_suite.cc index 4db7f6968..26c07dd4b 100644 --- a/env/flink/env_flink_test_suite.cc +++ b/env/flink/env_flink_test_suite.cc @@ -52,7 +52,7 @@ void EnvFlinkTestSuites::runAllTestSuites() { } void EnvFlinkTestSuites::setUp() { - auto status = ROCKSDB_NAMESPACE::NewFlinkEnv(base_path_, &flink_env_); + auto status = ROCKSDB_NAMESPACE::NewFlinkEnv(base_path_, &flink_env_, nullptr); if (!status.ok()) { throw std::runtime_error("New FlinkEnv failed"); } diff --git a/java/forstjni/env_flink.cc b/java/forstjni/env_flink.cc index c3fee7690..73af83172 100644 --- a/java/forstjni/env_flink.cc +++ b/java/forstjni/env_flink.cc @@ -28,10 +28,11 @@ /* * Class: org_forstdb_FlinkEnv * Method: createFlinkEnv - * Signature: (Ljava/lang/String;)J + * Signature: (Ljava/lang/String;Ljava/lang/Object;)J */ jlong Java_org_forstdb_FlinkEnv_createFlinkEnv(JNIEnv* env, jclass, - jstring base_path) { + jstring base_path, + jobject file_system_instance_) { jboolean has_exception = JNI_FALSE; auto path = ROCKSDB_NAMESPACE::JniUtil::copyStdString(env, base_path, &has_exception); @@ -41,7 +42,8 @@ jlong Java_org_forstdb_FlinkEnv_createFlinkEnv(JNIEnv* env, jclass, return 0; } std::unique_ptr flink_env; - auto status = ROCKSDB_NAMESPACE::NewFlinkEnv(path, &flink_env); + auto status = ROCKSDB_NAMESPACE::NewFlinkEnv(path, &flink_env, + file_system_instance_); if (!status.ok()) { ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, status); return 0; diff --git a/java/include/org_forstdb_FlinkEnv.h b/java/include/org_forstdb_FlinkEnv.h index 4dfe9e786..15d40c164 100644 --- a/java/include/org_forstdb_FlinkEnv.h +++ b/java/include/org_forstdb_FlinkEnv.h @@ -10,10 +10,10 @@ extern "C" { /* * Class: org_forstdb_FlinkEnv * Method: createFlinkEnv - * Signature: (Ljava/lang/String;)J + * Signature: (Ljava/lang/String;Ljava/lang/Object;)J */ JNIEXPORT jlong JNICALL Java_org_forstdb_FlinkEnv_createFlinkEnv - (JNIEnv *, jclass, jstring); + (JNIEnv *, jclass, jstring, jobject); /* * Class: org_forstdb_FlinkEnv diff --git a/java/src/main/java/org/forstdb/FlinkEnv.java b/java/src/main/java/org/forstdb/FlinkEnv.java index 758e72952..3fdb2be1c 100644 --- a/java/src/main/java/org/forstdb/FlinkEnv.java +++ b/java/src/main/java/org/forstdb/FlinkEnv.java @@ -31,11 +31,11 @@ public class FlinkEnv extends Env { * @param basePath the base path string for the given Flink file system, * formatted as "{fs-schema-supported-by-flink}://xxx" */ - public FlinkEnv(final String basePath) { - super(createFlinkEnv(basePath)); + public FlinkEnv(final String basePath, final Object fileSystem) { + super(createFlinkEnv(basePath, fileSystem)); } - private static native long createFlinkEnv(final String basePath); + private static native long createFlinkEnv(final String basePath, final Object fileSystem); @Override protected final native void disposeInternal(final long handle); } \ No newline at end of file