Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add client for etcd v3 restful api #1832

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,15 @@ BRPC_PROTOS = $(filter %.proto,$(BRPC_SOURCES))
BRPC_CFAMILIES = $(filter-out %.proto %.pb.cc,$(BRPC_SOURCES))
BRPC_OBJS = $(BRPC_PROTOS:.proto=.pb.o) $(addsuffix .o, $(basename $(BRPC_CFAMILIES)))

ETCD_CLIENT_PROTOS = src/etcd_client/proto/kv.proto \
src/etcd_client/proto/auth.proto \
src/etcd_client/proto/rpc.proto \
src/etcd_client/proto/gogoproto/gogo.proto\
src/etcd_client/proto/google/api/annotations.proto \
src/etcd_client/proto/google/api/http.proto
ETCD_CLIENT_SOURCES = src/etcd_client/etcd_client.cpp
ETCD_CLIENT_OBJS = $(ETCD_CLIENT_PROTOS:.proto=.pb.o) $(addsuffix .o, $(basename $(ETCD_CLIENT_SOURCES)))

MCPACK2PB_SOURCES = \
src/mcpack2pb/field_type.cpp \
src/mcpack2pb/mcpack2pb.cpp \
Expand All @@ -214,12 +223,12 @@ ifeq (ENABLE_THRIFT_FRAMED_PROTOCOL, $(findstring ENABLE_THRIFT_FRAMED_PROTOCOL,
THRIFT_OBJS = $(addsuffix .o, $(basename $(THRIFT_SOURCES)))
endif

OBJS=$(BUTIL_OBJS) $(BVAR_OBJS) $(BTHREAD_OBJS) $(JSON2PB_OBJS) $(MCPACK2PB_OBJS) $(BRPC_OBJS) $(THRIFT_OBJS)
OBJS=$(BUTIL_OBJS) $(BVAR_OBJS) $(BTHREAD_OBJS) $(JSON2PB_OBJS) $(MCPACK2PB_OBJS) $(BRPC_OBJS) $(THRIFT_OBJS) $(ETCD_CLIENT_OBJS)

BVAR_DEBUG_OBJS=$(BUTIL_OBJS:.o=.dbg.o) $(BVAR_OBJS:.o=.dbg.o)
DEBUG_OBJS = $(OBJS:.o=.dbg.o)

PROTOS=$(BRPC_PROTOS) src/idl_options.proto
PROTOS=$(BRPC_PROTOS) src/idl_options.proto $(ETCD_CLIENT_PROTOS)

.PHONY:all
all: protoc-gen-mcpack libbrpc.a libbrpc.$(SOEXT) output/include output/lib output/bin
Expand Down
91 changes: 91 additions & 0 deletions example/etcdctl_c++/etcdctl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/channel.h>
#include <brpc/server.h>
#include <thread>
#include <memory>
#include "butil/base64.h"
#include "google/protobuf/util/json_util.h"
#include "etcd_client/proto/rpc.pb.h"
#include "etcd_client/etcd_client.h"

DEFINE_string(url, "localhost:2379", "connect to etcd server");
DEFINE_string(api, "", "etcd api");
DEFINE_string(key, "", "key");
DEFINE_string(value, "", "value");
DEFINE_string(range_end, "", "range end");


class MyWatcher : public brpc::Watcher {
public:
void OnEventResponse(const ::etcdserverpb::WatchResponse& response) override {
LOG(INFO) << response.DebugString();
}
void OnParseError() {
LOG(INFO) << "parser error";
}
};

int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
gflags::ParseCommandLineFlags(&argc, &argv, true);

brpc::EtcdClient client;
LOG(INFO) << "FLAGS_url " << FLAGS_url;
const char* url = FLAGS_url.c_str();
client.Init(url);
if (FLAGS_api == "watch") {
std::shared_ptr<MyWatcher> watcher(new MyWatcher);
::etcdserverpb::WatchRequest request;
auto* create_request = request.mutable_create_request();
create_request->set_key(FLAGS_key);
if (!FLAGS_range_end.empty()) {
create_request->set_range_end(FLAGS_range_end);
}
client.Watch(request, watcher);
while (1) {
// ctrl c to exit
sleep(10);
}
} else if (FLAGS_api == "put") {
::etcdserverpb::PutRequest request;
::etcdserverpb::PutResponse response;
request.set_key(FLAGS_key);
if (!FLAGS_value.empty()) {
request.set_value(FLAGS_value);
}
LOG(INFO) << "Put Request: " << request.DebugString();
client.Put(request, &response);
LOG(INFO) << "Put response: " << response.DebugString();
} else if (FLAGS_api == "range") {
::etcdserverpb::RangeRequest request;
::etcdserverpb::RangeResponse response;
request.set_key(FLAGS_key);
if (!FLAGS_range_end.empty()) {
request.set_range_end(FLAGS_range_end);
}
LOG(INFO) << "Range Request: " << request.DebugString();
client.Range(request, &response);
LOG(INFO) << "Range response: " << response.DebugString();
} else {
LOG(ERROR) << "unknown api " << FLAGS_api;
}
return 0;
}
265 changes: 265 additions & 0 deletions src/etcd_client/etcd_client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.


#include "etcd_client/etcd_client.h"
#include <memory>
#include "butil/string_splitter.h"
#include "json2pb/pb_to_json.h"
#include "json2pb/json_to_pb.h"

namespace brpc {

EtcdClient::EtcdClient() {
}

EtcdClient::~EtcdClient() {
}

bool EtcdClient::Init(const std::string& url, const std::string& lb) {
brpc::ChannelOptions options;
options.protocol = "http";
if (_channel.Init(url.c_str(), lb.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to initialize channel with " << url;
return false;
}
return true;
}

bool EtcdClient::Init(butil::EndPoint server_addr_and_port) {
brpc::ChannelOptions options;
options.protocol = "http";
if (_channel.Init(server_addr_and_port, &options) != 0) {
LOG(ERROR) << "Fail to initialize channel with " << server_addr_and_port;
return false;
}
return true;
}

bool EtcdClient::Init(const char* server_addr_and_port) {
brpc::ChannelOptions options;
options.protocol = "http";
if (_channel.Init(server_addr_and_port, &options) != 0) {
LOG(ERROR) << "Fail to initialize channel with " << server_addr_and_port;
return false;
}
return true;
}

bool EtcdClient::Init(const char* server_addr, int port) {
brpc::ChannelOptions options;
options.protocol = "http";
if (_channel.Init(server_addr, port, &options) != 0) {
LOG(ERROR) << "Fail to initialize channel with " << server_addr
<< ":" << port;
return false;
}
return true;
}

enum EtcdOps {
RANGE = 0,
PUT,
DELETE_RANGE,
TXN,
COMPACT,
LEASE_GRANT,
LEASE_REVOKE,
LEASE_KEEPALIVE,
LEASE_TTL,
LEASE_LEASES,
};

template<typename Request, typename Response>
static bool EtcdOp(brpc::Channel* channel, EtcdOps op, const Request& request, Response* response) {
// invoker will make sure the op is valid
std::string path = "";
switch (op) {
case RANGE : path = "/v3/kv/range"; break;
case PUT : path = "/v3/kv/put"; break;
case DELETE_RANGE : path = "/v3/kv/deleterange"; break;
case TXN : path = "/v3/kv/txn"; break;
case COMPACT : path = "/v3/kv/compaction"; break;
case LEASE_GRANT : path = "/v3/lease/grant"; break;
case LEASE_REVOKE : path = "/v3/lease/revoke"; break;
case LEASE_KEEPALIVE : path = "/v3/lease/keepalive"; break;
case LEASE_TTL : path = "/v3/lease/timetolive"; break;
case LEASE_LEASES : path = "/v3/lease/leases"; break;
default:
LOG(ERROR) << "Invalid operation " << op << " with " << request.ShortDebugString();
return false;
}
Controller cntl;
brpc::URI& uri = cntl.http_request().uri();
uri.set_path(path);
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
std::string output;
struct json2pb::Pb2JsonOptions pb2json_option;
pb2json_option.bytes_to_base64 = true;
if (!json2pb::ProtoMessageToJson(request, &output, pb2json_option)) {
LOG(ERROR) << "convert PutRequest " << request.ShortDebugString() << " to json fail.";
return false;
}
cntl.request_attachment().append(output);
channel->CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "request " << request.ShortDebugString() << " to etcd server fail."
<< "error message : " << cntl.ErrorText();
return false;
}
struct json2pb::Json2PbOptions json2pb_option;
json2pb_option.base64_to_bytes = true;
if (!json2pb::JsonToProtoMessage(cntl.response_attachment().to_string(), response, json2pb_option)) {
LOG(ERROR) << "Json to proto fail for " << cntl.response_attachment().to_string();
return false;
}
return true;
}

bool EtcdClient::Put(const ::etcdserverpb::PutRequest& request,
::etcdserverpb::PutResponse* response) {
return EtcdOp(&_channel, EtcdOps::PUT, request, response);
}

bool EtcdClient::Range(const ::etcdserverpb::RangeRequest& request,
::etcdserverpb::RangeResponse* response) {
return EtcdOp(&_channel, EtcdOps::RANGE, request, response);
}

bool EtcdClient::DeleteRange(const ::etcdserverpb::DeleteRangeRequest& request,
::etcdserverpb::DeleteRangeResponse* response) {
return EtcdOp(&_channel, DELETE_RANGE, request, response);
}

bool EtcdClient::Txn(const ::etcdserverpb::TxnRequest& request,
::etcdserverpb::TxnResponse* response) {
return EtcdOp(&_channel, EtcdOps::TXN, request, response);
}

bool EtcdClient::Compact(const ::etcdserverpb::CompactionRequest& request,
::etcdserverpb::CompactionResponse* response) {
return EtcdOp(&_channel, EtcdOps::COMPACT, request, response);
}

bool EtcdClient::LeaseGrant(const ::etcdserverpb::LeaseGrantRequest& request,
::etcdserverpb::LeaseGrantResponse* response) {
return EtcdOp(&_channel, EtcdOps::LEASE_GRANT, request, response);
}

bool EtcdClient::LeaseRevoke(const ::etcdserverpb::LeaseRevokeRequest& request,
::etcdserverpb::LeaseRevokeResponse* response) {
return EtcdOp(&_channel, EtcdOps::LEASE_REVOKE, request, response);
}

bool EtcdClient::LeaseTimeToLive(const ::etcdserverpb::LeaseTimeToLiveRequest& request,
::etcdserverpb::LeaseTimeToLiveResponse* response) {
return EtcdOp(&_channel, EtcdOps::LEASE_TTL, request, response);
}

bool EtcdClient::LeaseLeases(const ::etcdserverpb::LeaseLeasesRequest& request,
::etcdserverpb::LeaseLeasesResponse* response) {
return EtcdOp(&_channel, EtcdOps::LEASE_LEASES, request, response);
}

bool EtcdClient::LeaseKeepAlive(const ::etcdserverpb::LeaseKeepAliveRequest& request,
::etcdserverpb::LeaseKeepAliveResponse* response) {
return EtcdOp(&_channel, EtcdOps::LEASE_KEEPALIVE, request, response);
}

class ReadBody : public brpc::ProgressiveReader, public brpc::SharedObject {
public:
ReadBody() : _destroyed(false) {
butil::intrusive_ptr<ReadBody>(this).detach();
}

butil::Status OnReadOnePart(const void* data, size_t length) {
if (length > 0) {
butil::IOBuf os;
os.append(data, length);
::etcdserverpb::WatchResponseEx response;
struct json2pb::Json2PbOptions option;
option.base64_to_bytes = true;
option.allow_remaining_bytes_after_parsing = true;
std::string err;
if (!json2pb::JsonToProtoMessage(os.to_string(), &response, option, &err)) {
_watcher->OnParseError();
LOG(WARNING) << "watch parse " << os.to_string() << " fail. "
<< "error msg " << err;
} else {
_watcher->OnEventResponse(response.result());
}
}
return butil::Status::OK();
}

void OnEndOfMessage(const butil::Status& st) {
butil::intrusive_ptr<ReadBody>(this, false);
_destroyed = true;
_destroying_st = st;
return;
}

bool destroyed() const {
return _destroyed;
}

const butil::Status& destroying_status() const {
return _destroying_st;
}

void set_watcher(std::shared_ptr<Watcher> watcher) {
_watcher = watcher;
}

private:
bool _destroyed;
butil::Status _destroying_st;
std::shared_ptr<Watcher> _watcher;
};

bool EtcdClient::Watch(const ::etcdserverpb::WatchRequest& request, WatcherPtr watcher) {
if (!watcher.get()) {
LOG(ERROR) << "watcher is nullptr";
return false;
}
brpc::Controller cntl;
cntl.http_request().uri().set_path("/v3/watch");
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
std::string output;
struct json2pb::Pb2JsonOptions pb2json_option;
pb2json_option.bytes_to_base64 = true;
if (!json2pb::ProtoMessageToJson(request, &output, pb2json_option)) {
LOG(ERROR) << "convert WatchRequest " << request.ShortDebugString() << " to json fail.";
return false;
}
cntl.request_attachment().append(output);
cntl.response_will_be_read_progressively();
_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "watch request " << request.ShortDebugString() << " to etcd server fail."
<< "error message : " << cntl.ErrorText();
return false;
}
butil::intrusive_ptr<ReadBody> reader;
reader.reset(new ReadBody);
reader->set_watcher(watcher);
cntl.ReadProgressiveAttachmentBy(reader.get());
return true;
}

} // namespace brpc

Loading