diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h index deb11f99edd..4407869ba7d 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h @@ -79,6 +79,7 @@ class SdkConfig { uint32_t manager_url_timeout_; // URL parsing timeout, seconds uint32_t max_proxy_num_; uint32_t msg_type_; + bool enable_isolation_; // Network parameters bool enable_tcp_nagle_; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc index 340b6f1e667..3620d8985ee 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc @@ -84,7 +84,7 @@ void SdkConfig::defaultInit() { recv_buf_size_ = constants::kRecvBufSize; max_msg_size_ = constants::kExtPackSize; max_group_id_num_ = constants::kMaxGroupIdNum; - max_stream_id_num_=constants::kMaxStreamIdNum; + max_stream_id_num_ = constants::kMaxStreamIdNum; // Packaging parameters enable_pack_ = constants::kEnablePack; @@ -106,6 +106,7 @@ void SdkConfig::defaultInit() { manager_update_interval_ = constants::kManagerUpdateInterval; manager_url_timeout_ = constants::kManagerTimeout; max_proxy_num_ = constants::kMaxProxyNum; + enable_isolation_ = constants::kEnableIsolation; local_ip_ = constants::kSerIP; local_port_ = constants::kSerPort; @@ -325,6 +326,13 @@ void SdkConfig::InitManagerParam(const rapidjson::Value &doc) { std::string inlong_group_ids_str = obj.GetString(); Utils::splitOperate(inlong_group_ids_str, inlong_group_ids_, ","); } + // enable isolation + if (doc.HasMember("enable_isolation") && doc["enable_isolation"].IsBool()) { + const rapidjson::Value &obj = doc["enable_isolation"]; + enable_isolation_ = obj.GetBool(); + } else { + enable_isolation_ = constants::kEnableIsolation; + } } void SdkConfig::InitTcpParam(const rapidjson::Value &doc) { @@ -460,6 +468,7 @@ void SdkConfig::ShowClientConfig() { LOG_INFO("auth_key: " << auth_key_.c_str()); LOG_INFO("max_group_id_num: " << max_group_id_num_); LOG_INFO("max_stream_id_num: " << max_stream_id_num_); + LOG_INFO("enable_isolation: " << enable_isolation_); } } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc index 5b5d13055a8..abe33dde212 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc @@ -69,15 +69,15 @@ void ProxyManager::DoUpdate() { std::srand(unsigned(std::time(nullptr))); - if (groupid_2_cluster_map_.empty()) { + if (groupid_2_cluster_id_map_.empty()) { LOG_INFO("empty groupid, no need to DoUpdate proxy list"); update_mutex_.unlock(); return; } { - unique_read_lock rdlck(groupid_2_cluster_rwmutex_); - for (auto &groupid2cluster : groupid_2_cluster_map_) { + unique_read_lock rdlck(groupid_2_cluster_id_rwmutex_); + for (auto &groupid2cluster : groupid_2_cluster_id_map_) { std::string url; if (SdkConfig::getInstance()->enable_manager_url_from_cluster_) url = SdkConfig::getInstance()->manager_cluster_url_; @@ -122,6 +122,11 @@ void ProxyManager::DoUpdate() { } } } + + UpdateGroupid2ClusterIdMap(); + + UpdateClusterId2ProxyMap(); + update_mutex_.unlock(); LOG_INFO("finish ProxyManager DoUpdate."); } @@ -174,7 +179,8 @@ int32_t ProxyManager::ParseAndGet(const std::string &inlong_group_id, << inlong_group_id.c_str()); return SdkCode::kErrorParseJson; } - + groupid_2_cluster_id_update_map_[inlong_group_id] = + clusterInfo["clusterId"].GetInt(); // proxy list for (auto &proxy : nodeList.GetArray()) { std::string ip; @@ -212,31 +218,28 @@ int32_t ProxyManager::ParseAndGet(const std::string &inlong_group_id, return SdkCode::kSuccess; } -int32_t ProxyManager::GetProxy(const std::string &groupid, +int32_t ProxyManager::GetProxy(const std::string &key, ProxyInfoVec &proxy_info_vec) { - unique_read_lock rdlck(groupid_2_proxy_map_rwmutex_); - auto it = groupid_2_proxy_map_.find(groupid); - if (it == groupid_2_proxy_map_.end()) { - LOG_ERROR("GetProxyByGroupid failed . Groupid " << groupid); - return SdkCode::kFailGetProxyConf; + if (SdkConfig::getInstance()->enable_isolation_) { + return GetProxyByGroupid(key, proxy_info_vec); + } else { + return GetProxyByClusterId(key, proxy_info_vec); } - proxy_info_vec = it->second; - return SdkCode::kSuccess; } int32_t ProxyManager::CheckBidConf(const std::string &inlong_group_id, bool is_inited) { { - unique_read_lock rdlck(groupid_2_cluster_rwmutex_); - auto it = groupid_2_cluster_map_.find(inlong_group_id); - if (it != groupid_2_cluster_map_.end()) { + unique_read_lock rdlck(groupid_2_cluster_id_rwmutex_); + auto it = groupid_2_cluster_id_map_.find(inlong_group_id); + if (it != groupid_2_cluster_id_map_.end()) { return SdkCode::kSuccess; } } { - unique_write_lock wtlck(groupid_2_cluster_rwmutex_); - groupid_2_cluster_map_.emplace(inlong_group_id, -1); + unique_write_lock wtlck(groupid_2_cluster_id_rwmutex_); + groupid_2_cluster_id_map_.emplace(inlong_group_id, ""); } LOG_INFO("CheckProxyConf groupid:" << inlong_group_id @@ -252,12 +255,81 @@ int32_t ProxyManager::CheckBidConf(const std::string &inlong_group_id, return SdkCode::kSuccess; } -bool ProxyManager::IsExist(const std::string &inlong_group_id) { +bool ProxyManager::HasProxy(const std::string &inlong_group_id) { + if (SdkConfig::getInstance()->enable_isolation_) { + return CheckGroupid(inlong_group_id); + } else { + return CheckClusterId(inlong_group_id); + } +} +int32_t ProxyManager::GetProxyByGroupid(const std::string &inlong_group_id, + ProxyInfoVec &proxy_info_vec) { unique_read_lock rdlck(groupid_2_proxy_map_rwmutex_); auto it = groupid_2_proxy_map_.find(inlong_group_id); + if (it == groupid_2_proxy_map_.end()) { + LOG_ERROR("GetProxy failed! inlong_group_id: " << inlong_group_id); + return SdkCode::kFailGetConn; + } + proxy_info_vec = it->second; + return SdkCode::kSuccess; +} +int32_t ProxyManager::GetProxyByClusterId(const std::string &cluster_id, + ProxyInfoVec &proxy_info_vec) { + unique_read_lock rdlck(clusterid_2_proxy_map_rwmutex_); + auto it = cluster_id_2_proxy_map_.find(cluster_id); + if (it == cluster_id_2_proxy_map_.end()) { + LOG_ERROR("GetProxy failed! cluster_id:" << cluster_id); + return SdkCode::kFailGetConn; + } + proxy_info_vec = it->second; + return SdkCode::kSuccess; +} +std::string ProxyManager::GetSendGroupKey(const std::string &groupid) { + if (SdkConfig::getInstance()->enable_isolation_) { + return groupid; + } + unique_read_lock rdlck(groupid_2_cluster_id_rwmutex_); + auto it = groupid_2_cluster_id_map_.find(groupid); + if (it == groupid_2_cluster_id_map_.end()) { + return ""; + } + return it->second; +} +bool ProxyManager::CheckGroupid(const std::string &groupid) { + unique_read_lock rdlck(groupid_2_proxy_map_rwmutex_); + auto it = groupid_2_proxy_map_.find(groupid); if (it == groupid_2_proxy_map_.end()) { return false; } return true; } +bool ProxyManager::CheckClusterId(const std::string &cluster_id) { + unique_read_lock rdlck(clusterid_2_proxy_map_rwmutex_); + auto it = cluster_id_2_proxy_map_.find(cluster_id); + if (it == cluster_id_2_proxy_map_.end()) { + return false; + } + return true; +} +void ProxyManager::UpdateClusterId2ProxyMap() { + if (SdkConfig::getInstance()->enable_isolation_) { + return; + } + unique_read_lock rdlck(groupid_2_cluster_id_rwmutex_); + for (const auto &it : groupid_2_cluster_id_update_map_) { + ProxyInfoVec proxy_info_vec; + if (GetProxyByGroupid(it.first, proxy_info_vec) == SdkCode::kSuccess) { + unique_write_lock wtlck(clusterid_2_proxy_map_rwmutex_); + cluster_id_2_proxy_map_[std::to_string(it.second)] = proxy_info_vec; + } + } +} +void ProxyManager::UpdateGroupid2ClusterIdMap() { + unique_write_lock wtlck(groupid_2_cluster_id_rwmutex_); + for (const auto &it : groupid_2_cluster_id_update_map_) { + groupid_2_cluster_id_map_[it.first] = std::to_string(it.second); + LOG_INFO("UpdateGroup2ClusterIdMap groupid:" << it.first << " ,cluster id:" + << it.second); + } +} } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h index b3f46bc68db..18856f61ec4 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h @@ -31,11 +31,17 @@ class ProxyManager { private: static ProxyManager *instance_; uint32_t timeout_; - read_write_mutex groupid_2_cluster_rwmutex_; + read_write_mutex groupid_2_cluster_id_rwmutex_; read_write_mutex groupid_2_proxy_map_rwmutex_; + read_write_mutex clusterid_2_proxy_map_rwmutex_; + + std::unordered_map groupid_2_cluster_id_map_; + std::unordered_map groupid_2_cluster_id_update_map_; - std::unordered_map groupid_2_cluster_map_; std::unordered_map groupid_2_proxy_map_; + std::unordered_map + cluster_id_2_proxy_map_; // + bool update_flag_; std::mutex cond_mutex_; std::mutex update_mutex_; @@ -45,7 +51,7 @@ class ProxyManager { std::thread update_conf_thread_; volatile bool inited_ = false; - int32_t ParseAndGet(const std::string &groupid, const std::string &meta_data, + int32_t ParseAndGet(const std::string &key, const std::string &meta_data, ProxyInfoVec &proxy_info_vec); public: @@ -57,7 +63,15 @@ class ProxyManager { void DoUpdate(); void Init(); int32_t GetProxy(const std::string &groupid, ProxyInfoVec &proxy_info_vec); - bool IsExist(const std::string &inlong_group_id); + int32_t GetProxyByGroupid(const std::string &inlong_group_id, ProxyInfoVec &proxy_info_vec); + int32_t GetProxyByClusterId(const std::string &cluster_id, + ProxyInfoVec &proxy_info_vec); + std::string GetSendGroupKey(const std::string &groupid); + bool HasProxy(const std::string &inlong_group_id); + bool CheckGroupid(const std::string &groupid); + bool CheckClusterId(const std::string &cluster_id); + void UpdateClusterId2ProxyMap(); + void UpdateGroupid2ClusterIdMap(); }; } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc index 24b74c2bf20..475fe14b3f0 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc @@ -28,48 +28,52 @@ SendManager::SendManager() : send_group_idx_(0) { << SdkConfig::getInstance()->inlong_group_ids_[i] << " send group num:" << SdkConfig::getInstance()->per_groupid_thread_nums_); - DoAddSendGroup(SdkConfig::getInstance()->inlong_group_ids_[i]); + std::string send_group_key = ProxyManager::GetInstance()->GetSendGroupKey( + SdkConfig::getInstance()->inlong_group_ids_[i]); + AddSendGroup(send_group_key); } } SendGroupPtr SendManager::GetSendGroup(const std::string &group_id) { - SendGroupPtr send_group_ptr = DoGetSendGroup(group_id); + std::string send_group_key = + ProxyManager::GetInstance()->GetSendGroupKey(group_id); + SendGroupPtr send_group_ptr = DoGetSendGroup(send_group_key); if (send_group_ptr == nullptr) { - AddSendGroup(group_id); + AddSendGroup(send_group_key); } return send_group_ptr; } -bool SendManager::AddSendGroup(const std::string &inlong_group_id) { - if (!ProxyManager::GetInstance()->IsExist(inlong_group_id)) { - LOG_ERROR("inlong_group_id is not exist." << inlong_group_id); +bool SendManager::AddSendGroup(const std::string &send_group_key) { + if (!ProxyManager::GetInstance()->HasProxy(send_group_key)) { + LOG_ERROR("inlong_group_id is not exist." << send_group_key); return false; } - DoAddSendGroup(inlong_group_id); + DoAddSendGroup(send_group_key); return false; } -void SendManager::DoAddSendGroup(const std::string &group_id) { - unique_write_lock wtlck(group_id_2_send_group_map_rwmutex_); - auto send_group_map = group_id_2_send_group_map_.find(group_id); - if (send_group_map != group_id_2_send_group_map_.end()) { - LOG_WARN("send group has exist." << group_id); +void SendManager::DoAddSendGroup(const std::string &send_group_key) { + unique_write_lock wtlck(send_group_map_rwmutex_); + auto send_group_map = send_group_map_.find(send_group_key); + if (send_group_map != send_group_map_.end()) { + LOG_WARN("send group has exist." << send_group_key); return; } std::vector send_group; send_group.reserve(SdkConfig::getInstance()->per_groupid_thread_nums_); for (int32_t j = 0; j < SdkConfig::getInstance()->per_groupid_thread_nums_; j++) { - send_group.push_back(std::make_shared(group_id)); + send_group.push_back(std::make_shared(send_group_key)); } - group_id_2_send_group_map_[group_id] = send_group; + send_group_map_[send_group_key] = send_group; } -SendGroupPtr SendManager::DoGetSendGroup(const std::string &group_id) { - unique_read_lock rdlck(group_id_2_send_group_map_rwmutex_); - auto send_group_map = group_id_2_send_group_map_.find(group_id); - if (send_group_map == group_id_2_send_group_map_.end()) { - LOG_ERROR("fail to get send group, group_id:%s" << group_id); +SendGroupPtr SendManager::DoGetSendGroup(const std::string &send_group_key) { + unique_read_lock rdlck(send_group_map_rwmutex_); + auto send_group_map = send_group_map_.find(send_group_key); + if (send_group_map == send_group_map_.end()) { + LOG_ERROR("fail to get send group, group_id:%s" << send_group_key); return nullptr; } if (send_group_map->second.empty()) { diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h index 6ee55d4fb45..fa627647f2d 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h @@ -29,18 +29,17 @@ using namespace inlong; class SendManager : noncopyable { private: - read_write_mutex group_id_2_send_group_map_rwmutex_; - std::unordered_map> - group_id_2_send_group_map_; - SendGroupPtr DoGetSendGroup(const std::string &group_id); - void DoAddSendGroup(const std::string &group_id); + read_write_mutex send_group_map_rwmutex_; + std::unordered_map> send_group_map_; + SendGroupPtr DoGetSendGroup(const std::string &send_group_key); + void DoAddSendGroup(const std::string &send_group_key); volatile uint32_t send_group_idx_; public: SendManager(); virtual ~SendManager(){}; SendGroupPtr GetSendGroup(const std::string &group_id); - bool AddSendGroup(const std::string &inlong_group_id); + bool AddSendGroup(const std::string &send_group_key); }; } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h index 274c3161e2d..b2af1911120 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h @@ -89,6 +89,7 @@ static const bool kNeedAuth = false; static const uint32_t kMaxAttrLen = 2048; const uint32_t ATTR_LENGTH = 10; +static const bool kEnableIsolation = false; } // namespace constants } // namespace inlong