Skip to content

Commit

Permalink
[FLINK-36598] Provide FileSystem instance in intialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Oct 25, 2024
1 parent 5819655 commit 120848f
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 60 deletions.
118 changes: 71 additions & 47 deletions env/flink/env_flink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,16 @@ class FlinkDirectory : public FSDirectory {
};

FlinkFileSystem::FlinkFileSystem(const std::shared_ptr<FileSystem>& base_fs,
const std::string& base_path)
: FileSystemWrapper(base_fs), base_path_(TrimTrailingSlash(base_path)) {}
const std::string& base_path,
jobject file_system_instance)
: FileSystemWrapper(base_fs),
base_path_(TrimTrailingSlash(base_path)),
file_system_instance_(file_system_instance),
file_system_instance_managed_(file_system_instance == nullptr) {
}

FlinkFileSystem::~FlinkFileSystem() {
if (file_system_instance_ != nullptr) {
if (file_system_instance_managed_ && file_system_instance_ != nullptr) {
JNIEnv* env = getJNIEnv();
env->DeleteGlobalRef(file_system_instance_);
}
Expand All @@ -325,48 +330,62 @@ Status FlinkFileSystem::Init() {
}
class_cache_ = javaClassCache.release();

// Delegate Flink to load real FileSystem (e.g.
// S3FileSystem/OSSFileSystem/...)
JavaClassCache::JavaClassContext fileSystemClass =
class_cache_->GetJClass(JavaClassCache::JC_FLINK_FILE_SYSTEM);
JavaClassCache::JavaMethodContext fileSystemGetMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_GET);

JavaClassCache::JavaClassContext uriClass =
class_cache_->GetJClass(JavaClassCache::JC_URI);
JavaClassCache::JavaMethodContext uriConstructor =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_URI_CONSTRUCTOR);

// Construct URI
jstring uriStringArg = jniEnv->NewStringUTF(base_path_.c_str());
jobject uriInstance = jniEnv->NewObject(
uriClass.javaClass, uriConstructor.javaMethod, uriStringArg);
jniEnv->DeleteLocalRef(uriStringArg);
if (uriInstance == nullptr) {
if (file_system_instance_ == nullptr) {
// Delegate Flink to load real FileSystem (e.g.
// S3FileSystem/OSSFileSystem/...)
JavaClassCache::JavaClassContext fileSystemClass =
class_cache_->GetJClass(JavaClassCache::JC_FLINK_FILE_SYSTEM);
JavaClassCache::JavaMethodContext fileSystemGetMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_GET);

JavaClassCache::JavaClassContext uriClass =
class_cache_->GetJClass(JavaClassCache::JC_URI);
JavaClassCache::JavaMethodContext uriConstructor =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_URI_CONSTRUCTOR);

// Construct URI
jstring uriStringArg = jniEnv->NewStringUTF(base_path_.c_str());
jobject uriInstance = jniEnv->NewObject(
uriClass.javaClass, uriConstructor.javaMethod, uriStringArg);
jniEnv->DeleteLocalRef(uriStringArg);
if (uriInstance == nullptr) {
return CheckThenError(
std::string("NewObject Exception when Init FlinkFileSystem, ")
.append(uriClass.ToString())
.append(uriConstructor.ToString())
.append(", args: ")
.append(base_path_));
}

// Construct FileSystem
jobject fileSystemInstance = jniEnv->CallStaticObjectMethod(
fileSystemClass.javaClass, fileSystemGetMethod.javaMethod, uriInstance);
jniEnv->DeleteLocalRef(uriInstance);
if (fileSystemInstance == nullptr || jniEnv->ExceptionCheck()) {
return CheckThenError(
std::string(
"CallStaticObjectMethod Exception when Init FlinkFileSystem, ")
.append(fileSystemClass.ToString())
.append(fileSystemGetMethod.ToString())
.append(", args: URI(")
.append(base_path_)
.append(")"));
}
file_system_instance_ = jniEnv->NewGlobalRef(fileSystemInstance);
jniEnv->DeleteLocalRef(fileSystemInstance);
}

if (file_system_instance_ == nullptr) {
return CheckThenError(
std::string("NewObject Exception when Init FlinkFileSystem, ")
.append(uriClass.ToString())
.append(uriConstructor.ToString())
.append(", args: ")
.append(base_path_));
}

// Construct FileSystem
jobject fileSystemInstance = jniEnv->CallStaticObjectMethod(
fileSystemClass.javaClass, fileSystemGetMethod.javaMethod, uriInstance);
jniEnv->DeleteLocalRef(uriInstance);
if (fileSystemInstance == nullptr || jniEnv->ExceptionCheck()) {
std::string(
"Error when init flink env, the file system provided is null"));
}

if (jniEnv->ExceptionCheck()) {
return CheckThenError(
std::string(
"CallStaticObjectMethod Exception when Init FlinkFileSystem, ")
.append(fileSystemClass.ToString())
.append(fileSystemGetMethod.ToString())
.append(", args: URI(")
.append(base_path_)
.append(")"));
"Error when init flink env, JNI throws exception."));
}
file_system_instance_ = jniEnv->NewGlobalRef(fileSystemInstance);
jniEnv->DeleteLocalRef(fileSystemInstance);
return Status::OK();
}

Expand Down Expand Up @@ -856,28 +875,33 @@ IOStatus FlinkFileSystem::UnlockFile(FileLock* /*lock*/,

Status FlinkFileSystem::Create(const std::shared_ptr<FileSystem>& base,
const std::string& uri,
std::unique_ptr<FileSystem>* result) {
auto* fileSystem = new FlinkFileSystem(base, uri);
std::unique_ptr<FileSystem>* result,
jobject file_system_instance) {
auto* fileSystem = new FlinkFileSystem(base, uri,
file_system_instance);
Status status = fileSystem->Init();
result->reset(fileSystem);
return status;
}

Status NewFlinkEnv(const std::string& uri,
std::unique_ptr<Env>* flinkFileSystem) {
std::unique_ptr<Env>* flinkFileSystem,
jobject file_system_instance) {
std::shared_ptr<FileSystem> fs;
Status s = NewFlinkFileSystem(uri, &fs);
Status s = NewFlinkFileSystem(uri, &fs, file_system_instance);
if (s.ok()) {
*flinkFileSystem = NewCompositeEnv(fs);
}
return s;
}

Status NewFlinkFileSystem(const std::string& uri,
std::shared_ptr<FileSystem>* fs) {
std::shared_ptr<FileSystem>* fs,
jobject file_system_instance) {
std::unique_ptr<FileSystem> flinkFileSystem;
Status s =
FlinkFileSystem::Create(FileSystem::Default(), uri, &flinkFileSystem);
FlinkFileSystem::Create(FileSystem::Default(), uri, &flinkFileSystem,
file_system_instance);
if (s.ok()) {
fs->reset(flinkFileSystem.release());
}
Expand Down
13 changes: 9 additions & 4 deletions env/flink/env_flink.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class FlinkFileSystem : public FileSystemWrapper {
// base_path
static Status Create(const std::shared_ptr<FileSystem>& /*base_fs*/,
const std::string& /*base_path*/,
std::unique_ptr<FileSystem>* /*fs*/);
std::unique_ptr<FileSystem>* /*fs*/,
jobject file_system_instance);

// Define some names
static const char* kClassName() { return "FlinkFileSystem"; }
Expand Down Expand Up @@ -101,9 +102,11 @@ class FlinkFileSystem : public FileSystemWrapper {
const std::string base_path_;
JavaClassCache* class_cache_;
jobject file_system_instance_;
bool file_system_instance_managed_;

explicit FlinkFileSystem(const std::shared_ptr<FileSystem>& base,
const std::string& fsname);
const std::string& fsname,
jobject file_system_instance);

// Init FileSystem
Status Init();
Expand All @@ -126,8 +129,10 @@ class FlinkFileSystem : public FileSystemWrapper {
};

// Returns a `FlinkEnv` with base_path
Status NewFlinkEnv(const std::string& base_path, std::unique_ptr<Env>* env);
Status NewFlinkEnv(const std::string& base_path, std::unique_ptr<Env>* env,
jobject file_system_instance);
// Returns a `FlinkFileSystem` with base_path
Status NewFlinkFileSystem(const std::string& base_path,
std::shared_ptr<FileSystem>* fs);
std::shared_ptr<FileSystem>* fs,
jobject file_system_instance);
} // namespace ROCKSDB_NAMESPACE
2 changes: 1 addition & 1 deletion env/flink/env_flink_test_suite.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void EnvFlinkTestSuites::runAllTestSuites() {
}

void EnvFlinkTestSuites::setUp() {
auto status = ROCKSDB_NAMESPACE::NewFlinkEnv(base_path_, &flink_env_);
auto status = ROCKSDB_NAMESPACE::NewFlinkEnv(base_path_, &flink_env_, nullptr);
if (!status.ok()) {
throw std::runtime_error("New FlinkEnv failed");
}
Expand Down
8 changes: 5 additions & 3 deletions java/forstjni/env_flink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
/*
* Class: org_forstdb_FlinkEnv
* Method: createFlinkEnv
* Signature: (Ljava/lang/String;)J
* Signature: (Ljava/lang/String;Ljava/lang/Object;)J
*/
jlong Java_org_forstdb_FlinkEnv_createFlinkEnv(JNIEnv* env, jclass,
jstring base_path) {
jstring base_path,
jobject file_system_instance_) {
jboolean has_exception = JNI_FALSE;
auto path =
ROCKSDB_NAMESPACE::JniUtil::copyStdString(env, base_path, &has_exception);
Expand All @@ -41,7 +42,8 @@ jlong Java_org_forstdb_FlinkEnv_createFlinkEnv(JNIEnv* env, jclass,
return 0;
}
std::unique_ptr<ROCKSDB_NAMESPACE::Env> flink_env;
auto status = ROCKSDB_NAMESPACE::NewFlinkEnv(path, &flink_env);
auto status = ROCKSDB_NAMESPACE::NewFlinkEnv(path, &flink_env,
file_system_instance_);
if (!status.ok()) {
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, status);
return 0;
Expand Down
4 changes: 2 additions & 2 deletions java/include/org_forstdb_FlinkEnv.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions java/src/main/java/org/forstdb/FlinkEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ public class FlinkEnv extends Env {
* @param basePath the base path string for the given Flink file system,
* formatted as "{fs-schema-supported-by-flink}://xxx"
*/
public FlinkEnv(final String basePath) {
super(createFlinkEnv(basePath));
public FlinkEnv(final String basePath, final Object fileSystem) {
super(createFlinkEnv(basePath, fileSystem));
}

private static native long createFlinkEnv(final String basePath);
private static native long createFlinkEnv(final String basePath, final Object fileSystem);

@Override protected final native void disposeInternal(final long handle);
}

0 comments on commit 120848f

Please sign in to comment.