Skip to content

Commit

Permalink
[INLONG-9213][SDK] Support isolation by inlong groupid (#9218)
Browse files Browse the repository at this point in the history
  • Loading branch information
doleyzi authored Nov 5, 2023
1 parent a891202 commit ad34b4a
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class SdkConfig {
uint32_t manager_url_timeout_; // URL parsing timeout, seconds
uint32_t max_proxy_num_;
uint32_t msg_type_;
bool enable_isolation_;

// Network parameters
bool enable_tcp_nagle_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void SdkConfig::defaultInit() {
recv_buf_size_ = constants::kRecvBufSize;
max_msg_size_ = constants::kExtPackSize;
max_group_id_num_ = constants::kMaxGroupIdNum;
max_stream_id_num_=constants::kMaxStreamIdNum;
max_stream_id_num_ = constants::kMaxStreamIdNum;

// Packaging parameters
enable_pack_ = constants::kEnablePack;
Expand All @@ -106,6 +106,7 @@ void SdkConfig::defaultInit() {
manager_update_interval_ = constants::kManagerUpdateInterval;
manager_url_timeout_ = constants::kManagerTimeout;
max_proxy_num_ = constants::kMaxProxyNum;
enable_isolation_ = constants::kEnableIsolation;

local_ip_ = constants::kSerIP;
local_port_ = constants::kSerPort;
Expand Down Expand Up @@ -325,6 +326,13 @@ void SdkConfig::InitManagerParam(const rapidjson::Value &doc) {
std::string inlong_group_ids_str = obj.GetString();
Utils::splitOperate(inlong_group_ids_str, inlong_group_ids_, ",");
}
// enable isolation
if (doc.HasMember("enable_isolation") && doc["enable_isolation"].IsBool()) {
const rapidjson::Value &obj = doc["enable_isolation"];
enable_isolation_ = obj.GetBool();
} else {
enable_isolation_ = constants::kEnableIsolation;
}
}

void SdkConfig::InitTcpParam(const rapidjson::Value &doc) {
Expand Down Expand Up @@ -460,6 +468,7 @@ void SdkConfig::ShowClientConfig() {
LOG_INFO("auth_key: " << auth_key_.c_str());
LOG_INFO("max_group_id_num: " << max_group_id_num_);
LOG_INFO("max_stream_id_num: " << max_stream_id_num_);
LOG_INFO("enable_isolation: " << enable_isolation_);
}

} // namespace inlong
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ void ProxyManager::DoUpdate() {

std::srand(unsigned(std::time(nullptr)));

if (groupid_2_cluster_map_.empty()) {
if (groupid_2_cluster_id_map_.empty()) {
LOG_INFO("empty groupid, no need to DoUpdate proxy list");
update_mutex_.unlock();
return;
}

{
unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_rwmutex_);
for (auto &groupid2cluster : groupid_2_cluster_map_) {
unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
for (auto &groupid2cluster : groupid_2_cluster_id_map_) {
std::string url;
if (SdkConfig::getInstance()->enable_manager_url_from_cluster_)
url = SdkConfig::getInstance()->manager_cluster_url_;
Expand Down Expand Up @@ -122,6 +122,11 @@ void ProxyManager::DoUpdate() {
}
}
}

UpdateGroupid2ClusterIdMap();

UpdateClusterId2ProxyMap();

update_mutex_.unlock();
LOG_INFO("finish ProxyManager DoUpdate.");
}
Expand Down Expand Up @@ -174,7 +179,8 @@ int32_t ProxyManager::ParseAndGet(const std::string &inlong_group_id,
<< inlong_group_id.c_str());
return SdkCode::kErrorParseJson;
}

groupid_2_cluster_id_update_map_[inlong_group_id] =
clusterInfo["clusterId"].GetInt();
// proxy list
for (auto &proxy : nodeList.GetArray()) {
std::string ip;
Expand Down Expand Up @@ -212,31 +218,28 @@ int32_t ProxyManager::ParseAndGet(const std::string &inlong_group_id,
return SdkCode::kSuccess;
}

int32_t ProxyManager::GetProxy(const std::string &groupid,
int32_t ProxyManager::GetProxy(const std::string &key,
ProxyInfoVec &proxy_info_vec) {
unique_read_lock<read_write_mutex> rdlck(groupid_2_proxy_map_rwmutex_);
auto it = groupid_2_proxy_map_.find(groupid);
if (it == groupid_2_proxy_map_.end()) {
LOG_ERROR("GetProxyByGroupid failed . Groupid " << groupid);
return SdkCode::kFailGetProxyConf;
if (SdkConfig::getInstance()->enable_isolation_) {
return GetProxyByGroupid(key, proxy_info_vec);
} else {
return GetProxyByClusterId(key, proxy_info_vec);
}
proxy_info_vec = it->second;
return SdkCode::kSuccess;
}

int32_t ProxyManager::CheckBidConf(const std::string &inlong_group_id,
bool is_inited) {
{
unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_rwmutex_);
auto it = groupid_2_cluster_map_.find(inlong_group_id);
if (it != groupid_2_cluster_map_.end()) {
unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
auto it = groupid_2_cluster_id_map_.find(inlong_group_id);
if (it != groupid_2_cluster_id_map_.end()) {
return SdkCode::kSuccess;
}
}

{
unique_write_lock<read_write_mutex> wtlck(groupid_2_cluster_rwmutex_);
groupid_2_cluster_map_.emplace(inlong_group_id, -1);
unique_write_lock<read_write_mutex> wtlck(groupid_2_cluster_id_rwmutex_);
groupid_2_cluster_id_map_.emplace(inlong_group_id, "");
}

LOG_INFO("CheckProxyConf groupid:" << inlong_group_id
Expand All @@ -252,12 +255,81 @@ int32_t ProxyManager::CheckBidConf(const std::string &inlong_group_id,
return SdkCode::kSuccess;
}

bool ProxyManager::IsExist(const std::string &inlong_group_id) {
bool ProxyManager::HasProxy(const std::string &inlong_group_id) {
if (SdkConfig::getInstance()->enable_isolation_) {
return CheckGroupid(inlong_group_id);
} else {
return CheckClusterId(inlong_group_id);
}
}
int32_t ProxyManager::GetProxyByGroupid(const std::string &inlong_group_id,
ProxyInfoVec &proxy_info_vec) {
unique_read_lock<read_write_mutex> rdlck(groupid_2_proxy_map_rwmutex_);
auto it = groupid_2_proxy_map_.find(inlong_group_id);
if (it == groupid_2_proxy_map_.end()) {
LOG_ERROR("GetProxy failed! inlong_group_id: " << inlong_group_id);
return SdkCode::kFailGetConn;
}
proxy_info_vec = it->second;
return SdkCode::kSuccess;
}
int32_t ProxyManager::GetProxyByClusterId(const std::string &cluster_id,
ProxyInfoVec &proxy_info_vec) {
unique_read_lock<read_write_mutex> rdlck(clusterid_2_proxy_map_rwmutex_);
auto it = cluster_id_2_proxy_map_.find(cluster_id);
if (it == cluster_id_2_proxy_map_.end()) {
LOG_ERROR("GetProxy failed! cluster_id:" << cluster_id);
return SdkCode::kFailGetConn;
}
proxy_info_vec = it->second;
return SdkCode::kSuccess;
}
std::string ProxyManager::GetSendGroupKey(const std::string &groupid) {
if (SdkConfig::getInstance()->enable_isolation_) {
return groupid;
}
unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
auto it = groupid_2_cluster_id_map_.find(groupid);
if (it == groupid_2_cluster_id_map_.end()) {
return "";
}
return it->second;
}
bool ProxyManager::CheckGroupid(const std::string &groupid) {
unique_read_lock<read_write_mutex> rdlck(groupid_2_proxy_map_rwmutex_);
auto it = groupid_2_proxy_map_.find(groupid);
if (it == groupid_2_proxy_map_.end()) {
return false;
}
return true;
}
bool ProxyManager::CheckClusterId(const std::string &cluster_id) {
unique_read_lock<read_write_mutex> rdlck(clusterid_2_proxy_map_rwmutex_);
auto it = cluster_id_2_proxy_map_.find(cluster_id);
if (it == cluster_id_2_proxy_map_.end()) {
return false;
}
return true;
}
void ProxyManager::UpdateClusterId2ProxyMap() {
if (SdkConfig::getInstance()->enable_isolation_) {
return;
}
unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
for (const auto &it : groupid_2_cluster_id_update_map_) {
ProxyInfoVec proxy_info_vec;
if (GetProxyByGroupid(it.first, proxy_info_vec) == SdkCode::kSuccess) {
unique_write_lock<read_write_mutex> wtlck(clusterid_2_proxy_map_rwmutex_);
cluster_id_2_proxy_map_[std::to_string(it.second)] = proxy_info_vec;
}
}
}
void ProxyManager::UpdateGroupid2ClusterIdMap() {
unique_write_lock<read_write_mutex> wtlck(groupid_2_cluster_id_rwmutex_);
for (const auto &it : groupid_2_cluster_id_update_map_) {
groupid_2_cluster_id_map_[it.first] = std::to_string(it.second);
LOG_INFO("UpdateGroup2ClusterIdMap groupid:" << it.first << " ,cluster id:"
<< it.second);
}
}
} // namespace inlong
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,17 @@ class ProxyManager {
private:
static ProxyManager *instance_;
uint32_t timeout_;
read_write_mutex groupid_2_cluster_rwmutex_;
read_write_mutex groupid_2_cluster_id_rwmutex_;
read_write_mutex groupid_2_proxy_map_rwmutex_;
read_write_mutex clusterid_2_proxy_map_rwmutex_;

std::unordered_map<std::string, std::string> groupid_2_cluster_id_map_;
std::unordered_map<std::string, int32_t> groupid_2_cluster_id_update_map_;

std::unordered_map<std::string, int32_t> groupid_2_cluster_map_;
std::unordered_map<std::string, ProxyInfoVec> groupid_2_proxy_map_;
std::unordered_map<std::string, ProxyInfoVec>
cluster_id_2_proxy_map_; //<cluster_id,busList>

bool update_flag_;
std::mutex cond_mutex_;
std::mutex update_mutex_;
Expand All @@ -45,7 +51,7 @@ class ProxyManager {
std::thread update_conf_thread_;
volatile bool inited_ = false;

int32_t ParseAndGet(const std::string &groupid, const std::string &meta_data,
int32_t ParseAndGet(const std::string &key, const std::string &meta_data,
ProxyInfoVec &proxy_info_vec);

public:
Expand All @@ -57,7 +63,15 @@ class ProxyManager {
void DoUpdate();
void Init();
int32_t GetProxy(const std::string &groupid, ProxyInfoVec &proxy_info_vec);
bool IsExist(const std::string &inlong_group_id);
int32_t GetProxyByGroupid(const std::string &inlong_group_id, ProxyInfoVec &proxy_info_vec);
int32_t GetProxyByClusterId(const std::string &cluster_id,
ProxyInfoVec &proxy_info_vec);
std::string GetSendGroupKey(const std::string &groupid);
bool HasProxy(const std::string &inlong_group_id);
bool CheckGroupid(const std::string &groupid);
bool CheckClusterId(const std::string &cluster_id);
void UpdateClusterId2ProxyMap();
void UpdateGroupid2ClusterIdMap();
};
} // namespace inlong

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,48 +28,52 @@ SendManager::SendManager() : send_group_idx_(0) {
<< SdkConfig::getInstance()->inlong_group_ids_[i]
<< " send group num:"
<< SdkConfig::getInstance()->per_groupid_thread_nums_);
DoAddSendGroup(SdkConfig::getInstance()->inlong_group_ids_[i]);
std::string send_group_key = ProxyManager::GetInstance()->GetSendGroupKey(
SdkConfig::getInstance()->inlong_group_ids_[i]);
AddSendGroup(send_group_key);
}
}

SendGroupPtr SendManager::GetSendGroup(const std::string &group_id) {
SendGroupPtr send_group_ptr = DoGetSendGroup(group_id);
std::string send_group_key =
ProxyManager::GetInstance()->GetSendGroupKey(group_id);
SendGroupPtr send_group_ptr = DoGetSendGroup(send_group_key);
if (send_group_ptr == nullptr) {
AddSendGroup(group_id);
AddSendGroup(send_group_key);
}
return send_group_ptr;
}

bool SendManager::AddSendGroup(const std::string &inlong_group_id) {
if (!ProxyManager::GetInstance()->IsExist(inlong_group_id)) {
LOG_ERROR("inlong_group_id is not exist." << inlong_group_id);
bool SendManager::AddSendGroup(const std::string &send_group_key) {
if (!ProxyManager::GetInstance()->HasProxy(send_group_key)) {
LOG_ERROR("inlong_group_id is not exist." << send_group_key);
return false;
}
DoAddSendGroup(inlong_group_id);
DoAddSendGroup(send_group_key);
return false;
}

void SendManager::DoAddSendGroup(const std::string &group_id) {
unique_write_lock<read_write_mutex> wtlck(group_id_2_send_group_map_rwmutex_);
auto send_group_map = group_id_2_send_group_map_.find(group_id);
if (send_group_map != group_id_2_send_group_map_.end()) {
LOG_WARN("send group has exist." << group_id);
void SendManager::DoAddSendGroup(const std::string &send_group_key) {
unique_write_lock<read_write_mutex> wtlck(send_group_map_rwmutex_);
auto send_group_map = send_group_map_.find(send_group_key);
if (send_group_map != send_group_map_.end()) {
LOG_WARN("send group has exist." << send_group_key);
return;
}
std::vector<SendGroupPtr> send_group;
send_group.reserve(SdkConfig::getInstance()->per_groupid_thread_nums_);
for (int32_t j = 0; j < SdkConfig::getInstance()->per_groupid_thread_nums_;
j++) {
send_group.push_back(std::make_shared<SendGroup>(group_id));
send_group.push_back(std::make_shared<SendGroup>(send_group_key));
}
group_id_2_send_group_map_[group_id] = send_group;
send_group_map_[send_group_key] = send_group;
}

SendGroupPtr SendManager::DoGetSendGroup(const std::string &group_id) {
unique_read_lock<read_write_mutex> rdlck(group_id_2_send_group_map_rwmutex_);
auto send_group_map = group_id_2_send_group_map_.find(group_id);
if (send_group_map == group_id_2_send_group_map_.end()) {
LOG_ERROR("fail to get send group, group_id:%s" << group_id);
SendGroupPtr SendManager::DoGetSendGroup(const std::string &send_group_key) {
unique_read_lock<read_write_mutex> rdlck(send_group_map_rwmutex_);
auto send_group_map = send_group_map_.find(send_group_key);
if (send_group_map == send_group_map_.end()) {
LOG_ERROR("fail to get send group, group_id:%s" << send_group_key);
return nullptr;
}
if (send_group_map->second.empty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,17 @@ using namespace inlong;

class SendManager : noncopyable {
private:
read_write_mutex group_id_2_send_group_map_rwmutex_;
std::unordered_map<std::string, std::vector<SendGroupPtr>>
group_id_2_send_group_map_;
SendGroupPtr DoGetSendGroup(const std::string &group_id);
void DoAddSendGroup(const std::string &group_id);
read_write_mutex send_group_map_rwmutex_;
std::unordered_map<std::string, std::vector<SendGroupPtr>> send_group_map_;
SendGroupPtr DoGetSendGroup(const std::string &send_group_key);
void DoAddSendGroup(const std::string &send_group_key);
volatile uint32_t send_group_idx_;

public:
SendManager();
virtual ~SendManager(){};
SendGroupPtr GetSendGroup(const std::string &group_id);
bool AddSendGroup(const std::string &inlong_group_id);
bool AddSendGroup(const std::string &send_group_key);
};
} // namespace inlong

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ static const bool kNeedAuth = false;

static const uint32_t kMaxAttrLen = 2048;
const uint32_t ATTR_LENGTH = 10;
static const bool kEnableIsolation = false;

} // namespace constants
} // namespace inlong
Expand Down

0 comments on commit ad34b4a

Please sign in to comment.