From 46031387c1b219ebd3e26f3ca47755d992b2609d Mon Sep 17 00:00:00 2001 From: Minoru Osuka Date: Tue, 31 Mar 2020 21:14:38 +0900 Subject: [PATCH] Refactoring protobuf (#33) --- Makefile | 2 +- README.md | 46 +++++++------- client/grpc_client.go | 16 ++--- cmd/start.go | 6 +- marshaler/util_test.go | 24 +++---- protobuf/kvs.pb.go | 138 ++++++++++++++++++++--------------------- protobuf/kvs.proto | 6 +- server/grpc_gateway.go | 34 +++++----- server/grpc_server.go | 30 ++++----- server/grpc_service.go | 60 +++++++++--------- server/raft_server.go | 118 +++++++++++++++++------------------ 11 files changed, 240 insertions(+), 240 deletions(-) diff --git a/Makefile b/Makefile index dbcdfa5..f38665e 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ CGO_ENABLED ?= 0 CGO_CFLAGS ?= CGO_LDFLAGS ?= BUILD_TAGS ?= -VERSION ?= $(shell git tag -l --sort=-v:refname | head -1) +VERSION ?= BIN_EXT ?= DOCKER_REPOSITORY ?= mosuka diff --git a/README.md b/README.md index 0a09737..9c76b94 100644 --- a/README.md +++ b/README.md @@ -114,10 +114,10 @@ The result of the above command is: ```json { "node": { - "bind_addr": ":7000", + "raft_address": ":7000", "metadata": { - "grpc_addr": ":9000", - "http_addr": ":8000" + "grpc_address": ":9000", + "http_address": ":8000" }, "state": "Leader" } @@ -231,26 +231,26 @@ You can see the result in JSON format. The result of the above command is: "cluster": { "nodes": { "node1": { - "bind_addr": ":7000", + "raft_address": ":7000", "metadata": { - "grpc_addr": ":9000", - "http_addr": ":8000" + "grpc_address": ":9000", + "http_address": ":8000" }, "state": "Leader" }, "node2": { - "bind_addr": ":7001", + "raft_address": ":7001", "metadata": { - "grpc_addr": ":9001", - "http_addr": ":8001" + "grpc_address": ":9001", + "http_address": ":8001" }, "state": "Follower" }, "node3": { - "bind_addr": ":7002", + "raft_address": ":7002", "metadata": { - "grpc_addr": ":9002", - "http_addr": ":8002" + "grpc_address": ":9002", + "http_address": ":8002" }, "state": "Follower" } @@ -273,10 +273,10 @@ or, you can use the RESTful API as follows: ```bash $ curl -X PUT 'http://127.0.0.1:8000/v1/cluster/node2' --data-binary ' { - "bind_addr": ":7001", + "raft_address": ":7001", "metadata": { - "grpc_addr": ":9001", - "http_addr": ":8001" + "grpc_address": ":9001", + "http_address": ":8001" } } ' @@ -297,13 +297,13 @@ $ curl -X DELETE 'http://127.0.0.1:8000/v1/cluster/node2' The following command indexes documents to any node in the cluster: ```bash -$ ./bin/cete set --grpc-addr=:9000 1 value1 +$ ./bin/cete set 1 value1 --grpc-address=:9000 ``` So, you can get the document from the node specified by the above command as follows: ```bash -$ ./bin/cete get --grpc-addr=:9000 1 +$ ./bin/cete get 1 --grpc-address=:9000 ``` You can see the result. The result of the above command is: @@ -315,8 +315,8 @@ value1 You can also get the same document from other nodes in the cluster as follows: ```bash -$ ./bin/cete get --grpc-addr=:9001 1 -$ ./bin/cete get --grpc-addr=:9002 1 +$ ./bin/cete get 1 --grpc-address=:9001 +$ ./bin/cete get 1 --grpc-address=:9002 ``` You can see the result. The result of the above command is: @@ -400,15 +400,15 @@ writing new private key to 'key.pem' Starting a node with HTTPS enabled, node-to-node encryption, and with the above configuration file. It is assumed the HTTPS X.509 certificate and key are at the paths server.crt and key.pem respectively. ```bash -$ ./bin/cete start --id=node1 --bind-addr=:7000 --grpc-addr=:9000 --http-addr=:8000 --data-dir=/tmp/cete/node1 --peer-grpc-addr=:9000 --cert-file=./etc/cert.pem --key-file=./etc/key.pem --cert-hostname=localhost -$ ./bin/cete start --id=node2 --bind-addr=:7001 --grpc-addr=:9001 --http-addr=:8001 --data-dir=/tmp/cete/node2 --peer-grpc-addr=:9000 --cert-file=./etc/cert.pem --key-file=./etc/key.pem --cert-hostname=localhost -$ ./bin/cete start --id=node3 --bind-addr=:7002 --grpc-addr=:9002 --http-addr=:8002 --data-dir=/tmp/cete/node3 --peer-grpc-addr=:9000 --cert-file=./etc/cert.pem --key-file=./etc/key.pem --cert-hostname=localhost +$ ./bin/cete start --id=node1 --raft-address=:7000 --grpc-address=:9000 --http-address=:8000 --data-directory=/tmp/cete/node1 --peer-grpc-address=:9000 --certificate-file=./etc/cert.pem --key-file=./etc/key.pem --common-name=localhost +$ ./bin/cete start --id=node2 --raft-address=:7001 --grpc-address=:9001 --http-address=:8001 --data-directory=/tmp/cete/node2 --peer-grpc-address=:9000 --certificate-file=./etc/cert.pem --key-file=./etc/key.pem --common-name=localhost +$ ./bin/cete start --id=node3 --raft-address=:7002 --grpc-address=:9002 --http-address=:8002 --data-directory=/tmp/cete/node3 --peer-grpc-address=:9000 --certificate-file=./etc/cert.pem --key-file=./etc/key.pem --common-name=localhost ``` You can access the cluster by adding a flag, such as the following command: ```bash -$ ./bin/cete cluster --grpc-addr=:9000 --cert-file=./cert.pem --cert-hostname=localhost | jq . +$ ./bin/cete cluster --grpc-address=:9000 --certificate-file=./cert.pem --common-name=localhost | jq . ``` or diff --git a/client/grpc_client.go b/client/grpc_client.go index 257cca5..3e8aa3c 100644 --- a/client/grpc_client.go +++ b/client/grpc_client.go @@ -25,15 +25,15 @@ type GRPCClient struct { logger *log.Logger } -func NewGRPCClient(address string) (*GRPCClient, error) { - return NewGRPCClientWithContext(address, context.Background()) +func NewGRPCClient(grpc_address string) (*GRPCClient, error) { + return NewGRPCClientWithContext(grpc_address, context.Background()) } -func NewGRPCClientWithContext(address string, baseCtx context.Context) (*GRPCClient, error) { - return NewGRPCClientWithContextTLS(address, baseCtx, "", "") +func NewGRPCClientWithContext(grpc_address string, baseCtx context.Context) (*GRPCClient, error) { + return NewGRPCClientWithContextTLS(grpc_address, baseCtx, "", "") } -func NewGRPCClientWithContextTLS(address string, baseCtx context.Context, certFile string, certHostname string) (*GRPCClient, error) { +func NewGRPCClientWithContextTLS(grpcAddress string, baseCtx context.Context, certificateFile string, commonName string) (*GRPCClient, error) { dialOpts := []grpc.DialOption{ grpc.WithDefaultCallOptions( grpc.MaxCallSendMsgSize(math.MaxInt64), @@ -50,17 +50,17 @@ func NewGRPCClientWithContextTLS(address string, baseCtx context.Context, certFi ctx, cancel := context.WithCancel(baseCtx) - if certFile == "" { + if certificateFile == "" { dialOpts = append(dialOpts, grpc.WithInsecure()) } else { - creds, err := credentials.NewClientTLSFromFile(certFile, certHostname) + creds, err := credentials.NewClientTLSFromFile(certificateFile, commonName) if err != nil { return nil, err } dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds)) } - conn, err := grpc.DialContext(ctx, address, dialOpts...) + conn, err := grpc.DialContext(ctx, grpcAddress, dialOpts...) if err != nil { cancel() return nil, err diff --git a/cmd/start.go b/cmd/start.go index e2bb429..d7c3afc 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -110,10 +110,10 @@ var ( joinRequest := &protobuf.JoinRequest{ Id: id, Node: &protobuf.Node{ - BindAddr: raftAddress, + RaftAddress: raftAddress, Metadata: &protobuf.Metadata{ - GrpcAddr: grpcAddress, - HttpAddr: httpAddress, + GrpcAddress: grpcAddress, + HttpAddress: httpAddress, }, }, } diff --git a/marshaler/util_test.go b/marshaler/util_test.go index a2d23cc..d2025bf 100644 --- a/marshaler/util_test.go +++ b/marshaler/util_test.go @@ -32,11 +32,11 @@ func TestMarshalAny(t *testing.T) { // test kvs.Node node := &protobuf.Node{ - BindAddr: ":7000", - State: "Leader", + RaftAddress: ":7000", + State: "Leader", Metadata: &protobuf.Metadata{ - GrpcAddr: ":9000", - HttpAddr: ":8000", + GrpcAddress: ":9000", + HttpAddress: ":8000", }, } @@ -52,7 +52,7 @@ func TestMarshalAny(t *testing.T) { t.Errorf("expected content to see %s, saw %s", expectedType, actualType) } - expectedValue = []byte(`{"bind_addr":":7000","metadata":{"grpc_addr":":9000","http_addr":":8000"},"state":"Leader"}`) + expectedValue = []byte(`{"raft_address":":7000","metadata":{"grpc_address":":9000","http_address":":8000"},"state":"Leader"}`) actualValue = nodeAny.Value if !bytes.Equal(expectedValue, actualValue) { t.Errorf("expected content to see %v, saw %v", expectedValue, actualValue) @@ -85,7 +85,7 @@ func TestUnmarshalAny(t *testing.T) { // raft.Node dataAny = &any.Any{ TypeUrl: "protobuf.Node", - Value: []byte(`{"bind_addr":":7000","metadata":{"grpc_addr":":9000","http_addr":":8000"},"state":"Leader"}`), + Value: []byte(`{"raft_address":":7000","metadata":{"grpc_address":":9000","http_address":":8000"},"state":"Leader"}`), } data, err = MarshalAny(dataAny) @@ -94,14 +94,14 @@ func TestUnmarshalAny(t *testing.T) { } node := data.(*protobuf.Node) - if node.BindAddr != ":7000" { - t.Errorf("expected content to see %v, saw %v", ":6060", node.BindAddr) + if node.RaftAddress != ":7000" { + t.Errorf("expected content to see %v, saw %v", ":7000", node.RaftAddress) } - if node.Metadata.GrpcAddr != ":9000" { - t.Errorf("expected content to see %v, saw %v", ":5050", node.BindAddr) + if node.Metadata.GrpcAddress != ":9000" { + t.Errorf("expected content to see %v, saw %v", ":9000", node.Metadata.GrpcAddress) } - if node.Metadata.HttpAddr != ":8000" { - t.Errorf("expected content to see %v, saw %v", ":5050", node.BindAddr) + if node.Metadata.HttpAddress != ":8000" { + t.Errorf("expected content to see %v, saw %v", ":8000", node.Metadata.HttpAddress) } if node.State != "Leader" { t.Errorf("expected content to see %v, saw %v", "Leader", node.State) diff --git a/protobuf/kvs.pb.go b/protobuf/kvs.pb.go index a7a8ecb..3bb6c8d 100644 --- a/protobuf/kvs.pb.go +++ b/protobuf/kvs.pb.go @@ -141,8 +141,8 @@ func (m *ReadinessCheckResponse) GetReady() bool { } type Metadata struct { - GrpcAddr string `protobuf:"bytes,1,opt,name=grpc_addr,json=grpcAddr,proto3" json:"grpc_addr,omitempty"` - HttpAddr string `protobuf:"bytes,2,opt,name=http_addr,json=httpAddr,proto3" json:"http_addr,omitempty"` + GrpcAddress string `protobuf:"bytes,1,opt,name=grpc_address,json=grpcAddress,proto3" json:"grpc_address,omitempty"` + HttpAddress string `protobuf:"bytes,2,opt,name=http_address,json=httpAddress,proto3" json:"http_address,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -173,22 +173,22 @@ func (m *Metadata) XXX_DiscardUnknown() { var xxx_messageInfo_Metadata proto.InternalMessageInfo -func (m *Metadata) GetGrpcAddr() string { +func (m *Metadata) GetGrpcAddress() string { if m != nil { - return m.GrpcAddr + return m.GrpcAddress } return "" } -func (m *Metadata) GetHttpAddr() string { +func (m *Metadata) GetHttpAddress() string { if m != nil { - return m.HttpAddr + return m.HttpAddress } return "" } type Node struct { - BindAddr string `protobuf:"bytes,1,opt,name=bind_addr,json=bindAddr,proto3" json:"bind_addr,omitempty"` + RaftAddress string `protobuf:"bytes,1,opt,name=raft_address,json=raftAddress,proto3" json:"raft_address,omitempty"` Metadata *Metadata `protobuf:"bytes,2,opt,name=metadata,proto3" json:"metadata,omitempty"` State string `protobuf:"bytes,3,opt,name=state,proto3" json:"state,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -221,9 +221,9 @@ func (m *Node) XXX_DiscardUnknown() { var xxx_messageInfo_Node proto.InternalMessageInfo -func (m *Node) GetBindAddr() string { +func (m *Node) GetRaftAddress() string { if m != nil { - return m.BindAddr + return m.RaftAddress } return "" } @@ -903,66 +903,66 @@ func init() { proto.RegisterFile("protobuf/kvs.proto", fileDescriptor_431078ad7b var fileDescriptor_431078ad7b21f851 = []byte{ // 955 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x96, 0x5d, 0x6e, 0xdb, 0x46, - 0x10, 0xc7, 0xa3, 0x2f, 0x4b, 0x1e, 0xc9, 0x36, 0x33, 0x96, 0x5d, 0x85, 0x4e, 0x1d, 0x67, 0x03, - 0xb4, 0xae, 0x5a, 0x89, 0x8d, 0x5b, 0xf4, 0xc3, 0x68, 0x1e, 0x52, 0xc5, 0x08, 0xd0, 0x28, 0x8d, - 0x41, 0xb5, 0x29, 0xd0, 0x17, 0x63, 0x25, 0x4e, 0x25, 0x42, 0x12, 0xc9, 0x92, 0x2b, 0x05, 0x42, - 0x90, 0x97, 0x5e, 0xa1, 0xe8, 0x65, 0x7a, 0x8d, 0x5e, 0xa1, 0x07, 0x29, 0x76, 0xb9, 0x14, 0xa5, - 0x48, 0x6c, 0xf2, 0x64, 0xed, 0xce, 0xcc, 0x6f, 0xfe, 0x9c, 0x9d, 0x19, 0x18, 0x30, 0x08, 0x7d, - 0xe1, 0xf7, 0x67, 0xbf, 0x59, 0xe3, 0x79, 0xd4, 0x56, 0x07, 0x2c, 0x8c, 0xe7, 0x91, 0x79, 0x67, - 0xe8, 0xfb, 0xc3, 0x09, 0x59, 0x4b, 0x3b, 0xf7, 0x16, 0xb1, 0xdd, 0x3c, 0x79, 0xdb, 0x44, 0xd3, - 0x40, 0x24, 0xc6, 0xbb, 0xda, 0xc8, 0x03, 0xd7, 0xe2, 0x9e, 0xe7, 0x0b, 0x2e, 0x5c, 0xdf, 0xd3, - 0x68, 0xf3, 0x33, 0xf5, 0x67, 0xd0, 0x1a, 0x92, 0xd7, 0x8a, 0x5e, 0xf1, 0xe1, 0x90, 0x42, 0xcb, - 0x0f, 0x94, 0xc7, 0xa6, 0x37, 0x6b, 0xc1, 0x51, 0xd7, 0x9d, 0x93, 0x47, 0x51, 0xd4, 0x19, 0xd1, - 0x60, 0x6c, 0x53, 0x14, 0xf8, 0x5e, 0x44, 0x58, 0x87, 0x12, 0x9f, 0xb8, 0x73, 0x6a, 0xe4, 0xce, - 0x72, 0xe7, 0x15, 0x3b, 0x3e, 0xb0, 0x36, 0x1c, 0xdb, 0xc4, 0x1d, 0x77, 0xab, 0x7f, 0x48, 0xdc, - 0x59, 0x24, 0xfe, 0xea, 0xc0, 0x9e, 0x40, 0xe5, 0x39, 0x09, 0xee, 0x70, 0xc1, 0xf1, 0x04, 0x76, - 0x87, 0x61, 0x30, 0xb8, 0xe1, 0x8e, 0x13, 0x2a, 0xaf, 0x5d, 0xbb, 0x22, 0x2f, 0x1e, 0x3b, 0x4e, - 0x28, 0x8d, 0x23, 0x21, 0x82, 0xd8, 0x98, 0x8f, 0x8d, 0xf2, 0x42, 0x1a, 0x99, 0x03, 0xc5, 0x1f, - 0x7d, 0x87, 0xa4, 0x53, 0xdf, 0xf5, 0x9c, 0x35, 0x82, 0xbc, 0x50, 0x84, 0x4f, 0xa0, 0x32, 0xd5, - 0xa9, 0x14, 0xa0, 0x7a, 0xb1, 0xd7, 0x96, 0x05, 0x4f, 0xf2, 0xdb, 0x4b, 0xb3, 0xd4, 0x1a, 0x09, - 0x2e, 0xa8, 0x51, 0x50, 0x8c, 0xf8, 0xc0, 0xfe, 0xca, 0x41, 0xb9, 0x33, 0x99, 0x45, 0x82, 0x42, - 0x6c, 0x41, 0xc9, 0xf3, 0x1d, 0x8a, 0x1a, 0xb9, 0xb3, 0xc2, 0x79, 0xf5, 0xe2, 0x03, 0x45, 0xd2, - 0xc6, 0xb6, 0xd4, 0x12, 0x5d, 0x79, 0x22, 0x5c, 0xd8, 0xb1, 0x17, 0x1e, 0xc3, 0xce, 0x84, 0xb8, - 0x43, 0x89, 0x74, 0x7d, 0x32, 0x3b, 0x00, 0xa9, 0x33, 0x1a, 0x50, 0x18, 0xd3, 0x42, 0x0b, 0x97, - 0x3f, 0xf1, 0x1e, 0x94, 0xe6, 0x7c, 0x32, 0x23, 0x2d, 0x78, 0x57, 0xa5, 0x91, 0x11, 0x76, 0x7c, - 0x7f, 0x99, 0xff, 0x26, 0xc7, 0xbe, 0x83, 0xea, 0x0f, 0xbe, 0xeb, 0xd9, 0xf4, 0xfb, 0x8c, 0x22, - 0x81, 0xfb, 0x90, 0x77, 0x1d, 0x0d, 0xc9, 0xbb, 0x0e, 0x7e, 0x08, 0x45, 0x29, 0x62, 0x13, 0xa1, - 0xae, 0xd9, 0x29, 0xd4, 0xba, 0xc4, 0xe7, 0x94, 0x11, 0xce, 0x5a, 0x50, 0x53, 0xde, 0xc9, 0x3b, - 0x26, 0xb8, 0xdc, 0x76, 0xdc, 0xb7, 0x70, 0xa0, 0xcb, 0xb0, 0x8c, 0xf8, 0x08, 0xca, 0x83, 0xf8, - 0x4a, 0x07, 0xd5, 0x56, 0xab, 0x65, 0x27, 0x46, 0x76, 0x0a, 0xf0, 0x94, 0x44, 0xa2, 0x63, 0xa3, - 0x18, 0xec, 0x01, 0x54, 0x95, 0x3d, 0x6d, 0xa8, 0xb8, 0x36, 0xd2, 0xa5, 0xa6, 0x0b, 0xc2, 0xbe, - 0x04, 0xe8, 0xfd, 0x0f, 0x24, 0x8d, 0xca, 0xaf, 0x46, 0xdd, 0x87, 0xbd, 0x27, 0x34, 0x21, 0x41, - 0xd9, 0xd9, 0x5f, 0x00, 0xf6, 0x48, 0x2c, 0x9b, 0x25, 0xa3, 0xd8, 0xef, 0xdf, 0x64, 0xec, 0x63, - 0x38, 0x8a, 0x73, 0xbe, 0x83, 0x29, 0xfb, 0xae, 0x74, 0x35, 0x27, 0x4f, 0xe0, 0x03, 0x28, 0x8a, - 0x45, 0x10, 0x7f, 0xf1, 0xfe, 0xc5, 0x81, 0x22, 0x2b, 0x4b, 0xfb, 0xa7, 0x45, 0x40, 0xb6, 0x32, - 0xe2, 0x39, 0x14, 0x57, 0xd2, 0xd7, 0xdb, 0xf1, 0x32, 0x68, 0x27, 0x9b, 0xa2, 0xfd, 0xd8, 0x5b, - 0xd8, 0xca, 0x83, 0x3d, 0x82, 0xa2, 0x8c, 0xc3, 0x2a, 0x94, 0x7f, 0xf6, 0xc6, 0x9e, 0xff, 0xca, - 0x33, 0x6e, 0x61, 0x05, 0x8a, 0xb2, 0x9b, 0x8c, 0x1c, 0xee, 0x42, 0x49, 0x75, 0x86, 0x91, 0xc7, - 0x32, 0x14, 0x7a, 0x24, 0x8c, 0x02, 0x02, 0xec, 0xc4, 0xa2, 0x8d, 0x22, 0x7b, 0x08, 0x7b, 0xbf, - 0x70, 0x31, 0x18, 0x2d, 0x5f, 0xe4, 0x0c, 0x4a, 0x24, 0xd5, 0xe8, 0x67, 0x86, 0x54, 0x9f, 0x1d, - 0x1b, 0xd8, 0xa7, 0x70, 0xf0, 0x9c, 0x44, 0xe8, 0x0e, 0xa2, 0x65, 0x50, 0x03, 0xca, 0xd3, 0xf8, - 0x4a, 0x3f, 0x64, 0x72, 0x64, 0x5f, 0x41, 0xed, 0x19, 0x2d, 0x5e, 0xca, 0x07, 0xba, 0xe6, 0x6e, - 0xf8, 0xbe, 0x8f, 0x79, 0xf1, 0x77, 0x19, 0x0a, 0xcf, 0x5e, 0xf6, 0xf0, 0x06, 0xf6, 0xd6, 0x56, - 0x17, 0x1e, 0x6f, 0xd4, 0xe2, 0x4a, 0x6e, 0x4d, 0xd3, 0x54, 0x42, 0xb7, 0xae, 0x39, 0x66, 0xfe, - 0xf1, 0xcf, 0xbf, 0x7f, 0xe6, 0xeb, 0x88, 0xd6, 0xfc, 0xa1, 0x35, 0xd1, 0x2e, 0x37, 0x03, 0xc5, - 0xeb, 0xc3, 0xfe, 0xfa, 0xb2, 0xcb, 0xcc, 0x70, 0xa2, 0x32, 0x6c, 0xdf, 0x8c, 0xec, 0x44, 0xa5, - 0x38, 0xc2, 0x43, 0x99, 0x22, 0x4c, 0x7c, 0x74, 0x8e, 0x8e, 0x5e, 0x6d, 0x59, 0xe4, 0xdb, 0xe9, - 0x00, 0x26, 0x3c, 0x43, 0xf1, 0x00, 0x2b, 0x92, 0x27, 0x87, 0x12, 0xaf, 0xe3, 0x37, 0x45, 0x43, - 0x39, 0xaf, 0x2c, 0x0b, 0x33, 0x03, 0xcb, 0x4e, 0x15, 0xa3, 0x61, 0x1a, 0x92, 0xa1, 0x07, 0xd4, - 0x7a, 0xed, 0x3a, 0x6f, 0x2e, 0xd5, 0x98, 0x63, 0x37, 0x5d, 0x85, 0x59, 0xca, 0xea, 0x6b, 0x53, - 0x9e, 0x88, 0x3b, 0x54, 0xe0, 0x3d, 0xac, 0xae, 0x80, 0xb1, 0xab, 0x3b, 0x0d, 0xe3, 0xaf, 0x59, - 0xdd, 0x47, 0x99, 0x0a, 0x1b, 0x0a, 0x84, 0xcd, 0x0d, 0x85, 0x78, 0x0d, 0x95, 0x9e, 0xc7, 0x83, - 0x68, 0xe4, 0x8b, 0x4c, 0x71, 0x59, 0xd4, 0xba, 0xa2, 0xee, 0x63, 0x4d, 0x52, 0xa3, 0x84, 0xd2, - 0x81, 0xc2, 0x53, 0x12, 0x18, 0x0f, 0x5c, 0xba, 0xa3, 0x4c, 0x23, 0xbd, 0xd0, 0x9f, 0x77, 0x47, - 0xc5, 0x1f, 0xe2, 0x6d, 0x19, 0x2f, 0x87, 0xcc, 0x7a, 0x3d, 0xa6, 0xc5, 0xa3, 0x66, 0xf3, 0x0d, - 0x76, 0xd5, 0x0c, 0x69, 0x48, 0xba, 0xa3, 0x32, 0xa5, 0xdc, 0x55, 0xa8, 0x63, 0x73, 0x13, 0x75, - 0x99, 0x6b, 0xe2, 0x8b, 0x64, 0x10, 0x11, 0x15, 0x70, 0x6d, 0x7d, 0x65, 0x32, 0xb5, 0xbc, 0xe6, - 0x16, 0x79, 0x5f, 0x43, 0x49, 0x4d, 0x73, 0x66, 0xc9, 0xe2, 0x3c, 0x6b, 0x13, 0xcf, 0x6e, 0x7d, - 0x9e, 0x93, 0xad, 0xa0, 0x67, 0xfa, 0x1d, 0xad, 0xf0, 0xd6, 0xe4, 0xaf, 0xb7, 0x82, 0x1e, 0xfa, - 0xef, 0xef, 0xff, 0x7a, 0x6f, 0xe8, 0x8a, 0xd1, 0xac, 0xdf, 0x1e, 0xf8, 0x53, 0x6b, 0xea, 0x47, - 0xb3, 0x31, 0xb7, 0x06, 0x24, 0xd2, 0x7f, 0x75, 0xfa, 0x3b, 0xea, 0xd7, 0x17, 0xff, 0x05, 0x00, - 0x00, 0xff, 0xff, 0x3f, 0x16, 0x4d, 0xa0, 0x38, 0x09, 0x00, 0x00, + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x56, 0xdd, 0x6e, 0x1b, 0x55, + 0x10, 0xae, 0xff, 0x62, 0x67, 0xec, 0x24, 0xdb, 0x89, 0x13, 0xdc, 0x4d, 0x49, 0x9b, 0x53, 0x09, + 0x82, 0xc1, 0x5e, 0x1a, 0x10, 0x3f, 0x11, 0xbd, 0x28, 0x26, 0xaa, 0x44, 0x5d, 0x1a, 0xad, 0xa1, + 0x48, 0xdc, 0x44, 0x27, 0xde, 0xa9, 0xbd, 0xd8, 0xde, 0x5d, 0x76, 0x8f, 0x5d, 0x59, 0x55, 0x6f, + 0x78, 0x05, 0xc4, 0xcb, 0xf0, 0x1a, 0xbc, 0x02, 0x0f, 0x82, 0xce, 0xd9, 0xb3, 0x5e, 0x3b, 0xf6, + 0xb6, 0xbd, 0x8a, 0xcf, 0xcc, 0x37, 0xdf, 0x7c, 0x3b, 0x7f, 0x0a, 0x60, 0x10, 0xfa, 0xc2, 0xbf, + 0x9e, 0xbe, 0xb4, 0x46, 0xb3, 0xa8, 0xad, 0x1e, 0x58, 0x18, 0xcd, 0x22, 0xf3, 0xce, 0xc0, 0xf7, + 0x07, 0x63, 0xb2, 0x16, 0x7e, 0xee, 0xcd, 0x63, 0xbf, 0x79, 0x74, 0xd3, 0x45, 0x93, 0x40, 0x24, + 0xce, 0xbb, 0xda, 0xc9, 0x03, 0xd7, 0xe2, 0x9e, 0xe7, 0x0b, 0x2e, 0x5c, 0xdf, 0xd3, 0xd4, 0xe6, + 0x67, 0xea, 0x4f, 0xbf, 0x35, 0x20, 0xaf, 0x15, 0xbd, 0xe2, 0x83, 0x01, 0x85, 0x96, 0x1f, 0x28, + 0xc4, 0x3a, 0x9a, 0xb5, 0xe0, 0xa0, 0xeb, 0xce, 0xc8, 0xa3, 0x28, 0xea, 0x0c, 0xa9, 0x3f, 0xb2, + 0x29, 0x0a, 0x7c, 0x2f, 0x22, 0xac, 0x43, 0x89, 0x8f, 0xdd, 0x19, 0x35, 0x72, 0xf7, 0x73, 0xa7, + 0x15, 0x3b, 0x7e, 0xb0, 0x36, 0x1c, 0xda, 0xc4, 0x1d, 0x77, 0x23, 0x3e, 0x24, 0xee, 0xcc, 0x13, + 0xbc, 0x7a, 0xb0, 0x4b, 0xa8, 0x3c, 0x23, 0xc1, 0x1d, 0x2e, 0x38, 0x9e, 0x40, 0x6d, 0x10, 0x06, + 0xfd, 0x2b, 0xee, 0x38, 0x21, 0x45, 0x91, 0x02, 0x6e, 0xdb, 0x55, 0x69, 0x7b, 0x1c, 0x9b, 0x24, + 0x64, 0x28, 0x44, 0xb0, 0x80, 0xe4, 0x63, 0x88, 0xb4, 0x69, 0x08, 0xfb, 0x1d, 0x8a, 0x3f, 0xf9, + 0x0e, 0x49, 0x68, 0xc8, 0x5f, 0x8a, 0x9b, 0x6c, 0xd2, 0x96, 0xb0, 0x7d, 0x02, 0x95, 0x89, 0x4e, + 0xae, 0x98, 0xaa, 0x67, 0x3b, 0x6d, 0xd9, 0x82, 0x44, 0x91, 0xbd, 0x70, 0x4b, 0xf5, 0x91, 0xe0, + 0x82, 0x1a, 0x05, 0x45, 0x13, 0x3f, 0xd8, 0xdf, 0x39, 0x28, 0x77, 0xc6, 0xd3, 0x48, 0x50, 0x88, + 0x2d, 0x28, 0x79, 0xbe, 0x43, 0x32, 0x51, 0xe1, 0xb4, 0x7a, 0xf6, 0x81, 0x62, 0xd2, 0xce, 0xb6, + 0x54, 0x14, 0x5d, 0x78, 0x22, 0x9c, 0xdb, 0x31, 0x0a, 0x0f, 0x61, 0x6b, 0x4c, 0xdc, 0xa1, 0x50, + 0x7f, 0x83, 0x7e, 0x99, 0x1d, 0x80, 0x14, 0x8c, 0x06, 0x14, 0x46, 0x34, 0xd7, 0xda, 0xe5, 0x4f, + 0xbc, 0x07, 0xa5, 0x19, 0x1f, 0x4f, 0x49, 0x0b, 0xde, 0x56, 0x69, 0x64, 0x84, 0x1d, 0xdb, 0xcf, + 0xf3, 0xdf, 0xe4, 0xd8, 0x77, 0x50, 0xfd, 0xd1, 0x77, 0x3d, 0x9b, 0xfe, 0x98, 0x52, 0x24, 0x70, + 0x17, 0xf2, 0xae, 0xa3, 0x49, 0xf2, 0xae, 0x83, 0x1f, 0x42, 0x51, 0x8a, 0x58, 0xa7, 0x50, 0x66, + 0x76, 0x0c, 0xb5, 0x2e, 0xf1, 0x19, 0x65, 0x84, 0xb3, 0x16, 0xd4, 0x14, 0x3a, 0xe9, 0x6c, 0x42, + 0x97, 0xdb, 0x4c, 0xf7, 0x2d, 0xec, 0xe9, 0x32, 0x2c, 0x22, 0x3e, 0x82, 0x72, 0x3f, 0x36, 0xe9, + 0xa0, 0xda, 0x72, 0xb5, 0xec, 0xc4, 0xc9, 0x8e, 0x01, 0x9e, 0x90, 0x48, 0x74, 0xac, 0x15, 0x83, + 0x3d, 0x80, 0xaa, 0xf2, 0xa7, 0x23, 0x16, 0xd7, 0x46, 0x42, 0x6a, 0xba, 0x20, 0xec, 0x4b, 0x80, + 0xde, 0x5b, 0x48, 0xd2, 0xa8, 0xfc, 0x72, 0xd4, 0x09, 0xec, 0xfc, 0x40, 0x63, 0x12, 0x94, 0x9d, + 0xfd, 0x39, 0x60, 0x8f, 0xc4, 0x62, 0x58, 0x32, 0x8a, 0xfd, 0xfe, 0x43, 0xc6, 0x3e, 0x86, 0x83, + 0x38, 0xe7, 0x3b, 0x38, 0xe5, 0xdc, 0x95, 0x2e, 0x66, 0xe4, 0x09, 0x7c, 0x00, 0x45, 0x31, 0x0f, + 0xe2, 0x2f, 0xde, 0x3d, 0xdb, 0x53, 0xcc, 0xca, 0xd3, 0xfe, 0x79, 0x1e, 0x90, 0xad, 0x9c, 0x78, + 0x0a, 0xc5, 0xa5, 0xf4, 0xf5, 0x76, 0x7c, 0x1e, 0xda, 0xc9, 0xed, 0x68, 0x3f, 0xf6, 0xe6, 0xb6, + 0x42, 0xb0, 0x47, 0x50, 0x94, 0x71, 0x58, 0x85, 0xf2, 0x2f, 0xde, 0xc8, 0xf3, 0x5f, 0x79, 0xc6, + 0x2d, 0xac, 0x40, 0x51, 0x4e, 0x93, 0x91, 0xc3, 0x6d, 0x28, 0xa9, 0xc9, 0x30, 0xf2, 0x58, 0x86, + 0x42, 0x8f, 0x84, 0x51, 0x40, 0x80, 0xad, 0x58, 0xb4, 0x51, 0x64, 0x0f, 0x61, 0xe7, 0x57, 0x2e, + 0xfa, 0xc3, 0x45, 0x47, 0xee, 0x43, 0x89, 0xa4, 0x1a, 0xdd, 0x66, 0x48, 0xf5, 0xd9, 0xb1, 0x83, + 0x7d, 0x0a, 0x7b, 0xcf, 0x48, 0x84, 0x6e, 0x3f, 0x5a, 0x04, 0x35, 0xa0, 0x3c, 0x89, 0x4d, 0xba, + 0x91, 0xc9, 0x93, 0x7d, 0x05, 0xb5, 0xa7, 0x34, 0x7f, 0x21, 0x1b, 0x74, 0xc9, 0xdd, 0xf0, 0x7d, + 0x9b, 0x79, 0xf6, 0x4f, 0x19, 0x0a, 0x4f, 0x5f, 0xf4, 0xf0, 0x0a, 0x76, 0x56, 0x8e, 0x19, 0x1e, + 0xae, 0xd5, 0xe2, 0x42, 0xde, 0x51, 0xd3, 0x54, 0x42, 0x37, 0x1e, 0x3e, 0x66, 0xfe, 0xf9, 0xef, + 0x7f, 0x7f, 0xe5, 0xeb, 0x88, 0xd6, 0xec, 0xa1, 0x35, 0xd6, 0x90, 0xab, 0xbe, 0xe2, 0xbb, 0x86, + 0xdd, 0xd5, 0xf3, 0x97, 0x99, 0xe1, 0x48, 0x65, 0xd8, 0x7c, 0x2b, 0xd9, 0x91, 0x4a, 0x71, 0x80, + 0xfb, 0x32, 0x45, 0x98, 0x60, 0x74, 0x8e, 0x8e, 0x3e, 0x70, 0x59, 0xcc, 0xb7, 0xd3, 0x05, 0x4c, + 0xf8, 0x0c, 0xc5, 0x07, 0x58, 0x91, 0x7c, 0x72, 0x29, 0xf1, 0x32, 0xee, 0x29, 0x1a, 0x0a, 0xbc, + 0x74, 0x2c, 0xcc, 0x0c, 0x5a, 0x76, 0xac, 0x38, 0x1a, 0xa6, 0x21, 0x39, 0xf4, 0x82, 0x5a, 0xaf, + 0x5d, 0xe7, 0xcd, 0xb9, 0x5a, 0x73, 0xec, 0xa6, 0xa7, 0x30, 0x4b, 0x59, 0x7d, 0x65, 0xcb, 0x13, + 0x71, 0xfb, 0x8a, 0x78, 0x07, 0xab, 0x4b, 0xc4, 0xd8, 0xd5, 0x93, 0x86, 0xf1, 0xd7, 0x2c, 0xdf, + 0xa3, 0x4c, 0x85, 0x0d, 0x45, 0x84, 0xcd, 0x35, 0x85, 0x78, 0x09, 0x95, 0x9e, 0xc7, 0x83, 0x68, + 0xe8, 0x8b, 0x4c, 0x71, 0x59, 0xac, 0x75, 0xc5, 0xba, 0x8b, 0x35, 0xc9, 0x1a, 0x25, 0x2c, 0x1d, + 0x28, 0x3c, 0x21, 0x81, 0xf1, 0xc2, 0xa5, 0x37, 0xca, 0x34, 0x52, 0x83, 0xfe, 0xbc, 0x3b, 0x2a, + 0x7e, 0x1f, 0x6f, 0xcb, 0x78, 0xb9, 0x64, 0xd6, 0xeb, 0x11, 0xcd, 0x1f, 0x35, 0x9b, 0x6f, 0xb0, + 0xab, 0x76, 0x48, 0x93, 0xa4, 0x37, 0x2a, 0x53, 0xca, 0x5d, 0x45, 0x75, 0x68, 0xae, 0x53, 0x9d, + 0xe7, 0x9a, 0xf8, 0x3c, 0x59, 0x44, 0x44, 0x45, 0xb8, 0x72, 0xbe, 0x32, 0x39, 0xb5, 0xbc, 0xe6, + 0x06, 0x79, 0x5f, 0x43, 0x49, 0x6d, 0x73, 0x66, 0xc9, 0xe2, 0x3c, 0x2b, 0x1b, 0xcf, 0x6e, 0x7d, + 0x9e, 0x93, 0xa3, 0xa0, 0x77, 0xfa, 0x1d, 0xa3, 0x70, 0x63, 0xf3, 0x57, 0x47, 0x41, 0x2f, 0xfd, + 0xf7, 0x27, 0xbf, 0xdd, 0x1b, 0xb8, 0x62, 0x38, 0xbd, 0x6e, 0xf7, 0xfd, 0x89, 0x35, 0xf1, 0xa3, + 0xe9, 0x88, 0x5b, 0x7d, 0x12, 0xe9, 0x3f, 0x3f, 0xd7, 0x5b, 0xea, 0xd7, 0x17, 0xff, 0x07, 0x00, + 0x00, 0xff, 0xff, 0x89, 0x53, 0x3c, 0xa1, 0x4a, 0x09, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/protobuf/kvs.proto b/protobuf/kvs.proto index 1921fec..ebda213 100644 --- a/protobuf/kvs.proto +++ b/protobuf/kvs.proto @@ -85,12 +85,12 @@ message ReadinessCheckResponse { } message Metadata { - string grpc_addr = 1; - string http_addr = 2; + string grpc_address = 1; + string http_address = 2; } message Node { - string bind_addr = 1; + string raft_address = 1; Metadata metadata = 2; string state = 3; } diff --git a/server/grpc_gateway.go b/server/grpc_gateway.go index 29f6c5d..a89ecac 100644 --- a/server/grpc_gateway.go +++ b/server/grpc_gateway.go @@ -33,20 +33,20 @@ func responseFilter(ctx context.Context, w http.ResponseWriter, resp proto.Messa } type GRPCGateway struct { - grpcGatewayAddr string - grpcAddr string + httpAddress string + grpcAddress string cancel context.CancelFunc listener net.Listener mux *runtime.ServeMux - certFile string - keyFile string + certificateFile string + keyFile string logger *zap.Logger } -func NewGRPCGateway(grpcGatewayAddr string, grpcAddr string, certFile string, keyFile string, certHostname string, logger *zap.Logger) (*GRPCGateway, error) { +func NewGRPCGateway(httpAddress string, grpcAddress string, certificateFile string, keyFile string, commonName string, logger *zap.Logger) (*GRPCGateway, error) { dialOpts := []grpc.DialOption{ grpc.WithDefaultCallOptions( grpc.MaxCallSendMsgSize(math.MaxInt64), @@ -69,52 +69,52 @@ func NewGRPCGateway(grpcGatewayAddr string, grpcAddr string, certFile string, ke runtime.WithForwardResponseOption(responseFilter), ) - if certFile == "" { + if certificateFile == "" { dialOpts = append(dialOpts, grpc.WithInsecure()) } else { - creds, err := credentials.NewClientTLSFromFile(certFile, certHostname) + creds, err := credentials.NewClientTLSFromFile(certificateFile, commonName) if err != nil { return nil, err } dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds)) } - err := protobuf.RegisterKVSHandlerFromEndpoint(ctx, mux, grpcAddr, dialOpts) + err := protobuf.RegisterKVSHandlerFromEndpoint(ctx, mux, grpcAddress, dialOpts) if err != nil { logger.Error("failed to register KVS handler from endpoint", zap.Error(err)) return nil, err } - listener, err := net.Listen("tcp", grpcGatewayAddr) + listener, err := net.Listen("tcp", httpAddress) if err != nil { logger.Error("failed to create key value store service", zap.Error(err)) return nil, err } return &GRPCGateway{ - grpcGatewayAddr: grpcGatewayAddr, - grpcAddr: grpcAddr, + httpAddress: httpAddress, + grpcAddress: grpcAddress, listener: listener, mux: mux, cancel: cancel, - certFile: certFile, + certificateFile: certificateFile, keyFile: keyFile, logger: logger, }, nil } func (s *GRPCGateway) Start() error { - if s.certFile == "" && s.keyFile == "" { + if s.certificateFile == "" && s.keyFile == "" { go func() { _ = http.Serve(s.listener, s.mux) }() } else { go func() { - _ = http.ServeTLS(s.listener, s.mux, s.certFile, s.keyFile) + _ = http.ServeTLS(s.listener, s.mux, s.certificateFile, s.keyFile) }() } - s.logger.Info("gRPC gateway started", zap.String("addr", s.grpcGatewayAddr)) + s.logger.Info("gRPC gateway started", zap.String("http_address", s.httpAddress)) return nil } @@ -123,9 +123,9 @@ func (s *GRPCGateway) Stop() error { err := s.listener.Close() if err != nil { - s.logger.Error("failed to close listener", zap.String("addr", s.listener.Addr().String()), zap.Error(err)) + s.logger.Error("failed to close listener", zap.String("http_address", s.listener.Addr().String()), zap.Error(err)) } - s.logger.Info("gRPC gateway stopped", zap.String("addr", s.grpcGatewayAddr)) + s.logger.Info("gRPC gateway stopped", zap.String("http_address", s.httpAddress)) return nil } diff --git a/server/grpc_server.go b/server/grpc_server.go index 4c2c2d2..83556f6 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -17,10 +17,10 @@ import ( ) type GRPCServer struct { - address string - service *GRPCService - server *grpc.Server - listener net.Listener + grpcAddress string + service *GRPCService + server *grpc.Server + listener net.Listener certFile string keyFile string @@ -29,7 +29,7 @@ type GRPCServer struct { logger *zap.Logger } -func NewGRPCServer(address string, raftServer *RaftServer, certFile string, keyFile string, certHostname string, logger *zap.Logger) (*GRPCServer, error) { +func NewGRPCServer(grpcAddress string, raftServer *RaftServer, certificateFile string, keyFile string, commonName string, logger *zap.Logger) (*GRPCServer, error) { grpcLogger := logger.Named("grpc") opts := []grpc.ServerOption{ @@ -58,11 +58,11 @@ func NewGRPCServer(address string, raftServer *RaftServer, certFile string, keyF ), } - if certFile == "" && keyFile == "" { + if certificateFile == "" && keyFile == "" { logger.Info("disabling TLS") } else { logger.Info("enabling TLS") - creds, err := credentials.NewServerTLSFromFile(certFile, keyFile) + creds, err := credentials.NewServerTLSFromFile(certificateFile, keyFile) if err != nil { logger.Error("failed to create credentials", zap.Error(err)) } @@ -73,7 +73,7 @@ func NewGRPCServer(address string, raftServer *RaftServer, certFile string, keyF opts..., ) - service, err := NewGRPCService(raftServer, certFile, certHostname, logger) + service, err := NewGRPCService(raftServer, certificateFile, commonName, logger) if err != nil { logger.Error("failed to create key value store service", zap.Error(err)) return nil, err @@ -85,20 +85,20 @@ func NewGRPCServer(address string, raftServer *RaftServer, certFile string, keyF metric.GrpcMetrics.InitializeMetrics(server) grpc_prometheus.Register(server) - listener, err := net.Listen("tcp", address) + listener, err := net.Listen("tcp", grpcAddress) if err != nil { - logger.Error("failed to create listener", zap.String("address", address), zap.Error(err)) + logger.Error("failed to create listener", zap.String("grpc_address", grpcAddress), zap.Error(err)) return nil, err } return &GRPCServer{ - address: address, + grpcAddress: grpcAddress, service: service, server: server, listener: listener, - certFile: certFile, + certFile: certificateFile, keyFile: keyFile, - certHostname: certHostname, + certHostname: commonName, logger: logger, }, nil } @@ -112,7 +112,7 @@ func (s *GRPCServer) Start() error { _ = s.server.Serve(s.listener) }() - s.logger.Info("gRPC server started", zap.String("addr", s.address)) + s.logger.Info("gRPC server started", zap.String("grpc_address", s.grpcAddress)) return nil } @@ -124,6 +124,6 @@ func (s *GRPCServer) Stop() error { //s.server.GracefulStop() s.server.Stop() - s.logger.Info("gRPC server stopped", zap.String("addr", s.address)) + s.logger.Info("gRPC server stopped", zap.String("grpc_address", s.grpcAddress)) return nil } diff --git a/server/grpc_service.go b/server/grpc_service.go index 1ad08d9..881a48f 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -19,10 +19,10 @@ import ( ) type GRPCService struct { - raftServer *RaftServer - certFile string - certHostname string - logger *zap.Logger + raftServer *RaftServer + certificateFile string + commonName string + logger *zap.Logger watchMutex sync.RWMutex watchChans map[chan protobuf.WatchResponse]struct{} @@ -33,12 +33,12 @@ type GRPCService struct { watchClusterDoneCh chan struct{} } -func NewGRPCService(raftServer *RaftServer, certFile string, certHostname string, logger *zap.Logger) (*GRPCService, error) { +func NewGRPCService(raftServer *RaftServer, certificateFile string, commonName string, logger *zap.Logger) (*GRPCService, error) { return &GRPCService{ - raftServer: raftServer, - certFile: certFile, - certHostname: certHostname, - logger: logger, + raftServer: raftServer, + certificateFile: certificateFile, + commonName: commonName, + logger: logger, watchChans: make(map[chan protobuf.WatchResponse]struct{}), @@ -105,34 +105,34 @@ func (s *GRPCService) startWatchCluster(checkInterval time.Duration) { s.logger.Warn("failed to get cluster info", zap.String("err", err.Error())) } for id, node := range nodes { - if id == s.raftServer.nodeId { + if id == s.raftServer.id { continue } - if node.Metadata == nil || node.Metadata.GrpcAddr == "" { + if node.Metadata == nil || node.Metadata.GrpcAddress == "" { s.logger.Debug("gRPC address missing", zap.String("id", id)) continue } if c, ok := s.peerClients[id]; ok { - if c.Target() != node.Metadata.GrpcAddr { - s.logger.Debug("close client", zap.String("id", id), zap.String("grpc_addr", c.Target())) + if c.Target() != node.Metadata.GrpcAddress { + s.logger.Debug("close client", zap.String("id", id), zap.String("grpc_address", c.Target())) delete(s.peerClients, id) if err := c.Close(); err != nil { - s.logger.Warn("failed to close client", zap.String("id", id), zap.String("grpc_addr", c.Target()), zap.Error(err)) + s.logger.Warn("failed to close client", zap.String("id", id), zap.String("grpc_address", c.Target()), zap.Error(err)) } - s.logger.Debug("create client", zap.String("id", id), zap.String("grpc_addr", node.Metadata.GrpcAddr)) - if newClient, err := client.NewGRPCClientWithContextTLS(node.Metadata.GrpcAddr, context.TODO(), s.certFile, s.certHostname); err == nil { + s.logger.Debug("create client", zap.String("id", id), zap.String("grpc_address", node.Metadata.GrpcAddress)) + if newClient, err := client.NewGRPCClientWithContextTLS(node.Metadata.GrpcAddress, context.TODO(), s.certificateFile, s.commonName); err == nil { s.peerClients[id] = newClient } else { - s.logger.Warn("failed to create client", zap.String("id", id), zap.String("grpc_addr", c.Target()), zap.Error(err)) + s.logger.Warn("failed to create client", zap.String("id", id), zap.String("grpc_address", c.Target()), zap.Error(err)) } } } else { - s.logger.Debug("create client", zap.String("id", id), zap.String("grpc_addr", node.Metadata.GrpcAddr)) - if newClient, err := client.NewGRPCClientWithContextTLS(node.Metadata.GrpcAddr, context.TODO(), s.certFile, s.certHostname); err == nil { + s.logger.Debug("create client", zap.String("id", id), zap.String("grpc_address", node.Metadata.GrpcAddress)) + if newClient, err := client.NewGRPCClientWithContextTLS(node.Metadata.GrpcAddress, context.TODO(), s.certificateFile, s.commonName); err == nil { s.peerClients[id] = newClient } else { - s.logger.Warn("failed to create client", zap.String("id", id), zap.String("grpc_addr", c.Target()), zap.Error(err)) + s.logger.Warn("failed to create client", zap.String("id", id), zap.String("grpc_address", c.Target()), zap.Error(err)) } } } @@ -140,10 +140,10 @@ func (s *GRPCService) startWatchCluster(checkInterval time.Duration) { // close clients for non-existent peer nodes for id, c := range s.peerClients { if _, exist := nodes[id]; !exist { - s.logger.Debug("close client", zap.String("id", id), zap.String("grpc_addr", c.Target())) + s.logger.Debug("close client", zap.String("id", id), zap.String("grpc_address", c.Target())) delete(s.peerClients, id) if err := c.Close(); err != nil { - s.logger.Warn("failed to close old client", zap.String("id", id), zap.String("grpc_addr", c.Target()), zap.Error(err)) + s.logger.Warn("failed to close old client", zap.String("id", id), zap.String("grpc_address", c.Target()), zap.Error(err)) } } } @@ -165,10 +165,10 @@ func (s *GRPCService) stopWatchCluster() { s.logger.Info("close all peer clients") for id, c := range s.peerClients { - s.logger.Debug("close client", zap.String("id", id), zap.String("grpc_addr", c.Target())) + s.logger.Debug("close client", zap.String("id", id), zap.String("grpc_address", c.Target())) delete(s.peerClients, id) if err := c.Close(); err != nil { - s.logger.Warn("failed to close client", zap.String("id", id), zap.String("grpc_addr", c.Target()), zap.Error(err)) + s.logger.Warn("failed to close client", zap.String("id", id), zap.String("grpc_address", c.Target()), zap.Error(err)) } } } @@ -214,7 +214,7 @@ func (s *GRPCService) Join(ctx context.Context, req *protobuf.JoinRequest) (*emp c := s.peerClients[clusterResp.Cluster.Leader] err = c.Join(req) if err != nil { - s.logger.Error("failed to forward request", zap.String("grpc_addr", c.Target()), zap.Error(err)) + s.logger.Error("failed to forward request", zap.String("grpc_address", c.Target()), zap.Error(err)) return resp, status.Error(codes.Internal, err.Error()) } @@ -248,7 +248,7 @@ func (s *GRPCService) Leave(ctx context.Context, req *protobuf.LeaveRequest) (*e c := s.peerClients[clusterResp.Cluster.Leader] err = c.Leave(req) if err != nil { - s.logger.Error("failed to forward request", zap.String("grpc_addr", c.Target()), zap.Error(err)) + s.logger.Error("failed to forward request", zap.String("grpc_address", c.Target()), zap.Error(err)) return resp, status.Error(codes.Internal, err.Error()) } @@ -290,14 +290,14 @@ func (s *GRPCService) Cluster(ctx context.Context, req *empty.Empty) (*protobuf. } for id, node := range nodes { - if id == s.raftServer.nodeId { + if id == s.raftServer.id { node.State = s.raftServer.StateStr() } else { c := s.peerClients[id] nodeResp, err := c.Node() if err != nil { node.State = raft.Shutdown.String() - s.logger.Error("failed to get node info", zap.String("grpc_addr", node.Metadata.GrpcAddr), zap.String("err", err.Error())) + s.logger.Error("failed to get node info", zap.String("grpc_address", node.Metadata.GrpcAddress), zap.String("err", err.Error())) } else { node.State = nodeResp.Node.State } @@ -362,7 +362,7 @@ func (s *GRPCService) Set(ctx context.Context, req *protobuf.SetRequest) (*empty c := s.peerClients[clusterResp.Cluster.Leader] err = c.Set(req) if err != nil { - s.logger.Error("failed to forward request", zap.String("grpc_addr", c.Target()), zap.Error(err)) + s.logger.Error("failed to forward request", zap.String("grpc_address", c.Target()), zap.Error(err)) return resp, status.Error(codes.Internal, err.Error()) } @@ -391,7 +391,7 @@ func (s *GRPCService) Delete(ctx context.Context, req *protobuf.DeleteRequest) ( c := s.peerClients[clusterResp.Cluster.Leader] err = c.Delete(req) if err != nil { - s.logger.Error("failed to forward request", zap.String("grpc_addr", c.Target()), zap.Error(err)) + s.logger.Error("failed to forward request", zap.String("grpc_address", c.Target()), zap.Error(err)) return resp, status.Error(codes.Internal, err.Error()) } diff --git a/server/raft_server.go b/server/raft_server.go index fa997ec..a637014 100644 --- a/server/raft_server.go +++ b/server/raft_server.go @@ -22,11 +22,11 @@ import ( ) type RaftServer struct { - nodeId string - bindAddr string - dataDir string - bootstrap bool - logger *zap.Logger + id string + raftAddress string + dataDirectory string + bootstrap bool + logger *zap.Logger fsm *RaftFSM @@ -39,8 +39,8 @@ type RaftServer struct { applyCh chan *protobuf.Event } -func NewRaftServer(nodeId string, bindAddr string, dataDir string, bootstrap bool, logger *zap.Logger) (*RaftServer, error) { - fsmPath := filepath.Join(dataDir, "kvs") +func NewRaftServer(id string, raftAddress string, dataDirectory string, bootstrap bool, logger *zap.Logger) (*RaftServer, error) { + fsmPath := filepath.Join(dataDirectory, "kvs") fsm, err := NewRaftFSM(fsmPath, logger) if err != nil { logger.Error("failed to create FSM", zap.String("path", fsmPath), zap.Error(err)) @@ -48,12 +48,12 @@ func NewRaftServer(nodeId string, bindAddr string, dataDir string, bootstrap boo } return &RaftServer{ - nodeId: nodeId, - bindAddr: bindAddr, - dataDir: dataDir, - bootstrap: bootstrap, - fsm: fsm, - logger: logger, + id: id, + raftAddress: raftAddress, + dataDirectory: dataDirectory, + bootstrap: bootstrap, + fsm: fsm, + logger: logger, watchClusterStopCh: make(chan struct{}), watchClusterDoneCh: make(chan struct{}), @@ -64,30 +64,30 @@ func NewRaftServer(nodeId string, bindAddr string, dataDir string, bootstrap boo func (s *RaftServer) Start() error { config := raft.DefaultConfig() - config.LocalID = raft.ServerID(s.nodeId) + config.LocalID = raft.ServerID(s.id) config.SnapshotThreshold = 1024 config.LogOutput = ioutil.Discard - addr, err := net.ResolveTCPAddr("tcp", s.bindAddr) + addr, err := net.ResolveTCPAddr("tcp", s.raftAddress) if err != nil { - s.logger.Error("failed to resolve TCP address", zap.String("tcp", s.bindAddr), zap.Error(err)) + s.logger.Error("failed to resolve TCP address", zap.String("raft_address", s.raftAddress), zap.Error(err)) return err } - s.transport, err = raft.NewTCPTransport(s.bindAddr, addr, 3, 10*time.Second, ioutil.Discard) + s.transport, err = raft.NewTCPTransport(s.raftAddress, addr, 3, 10*time.Second, ioutil.Discard) if err != nil { - s.logger.Error("failed to create TCP transport", zap.String("tcp", s.bindAddr), zap.Error(err)) + s.logger.Error("failed to create TCP transport", zap.String("raft_address", s.raftAddress), zap.Error(err)) return err } // create snapshot store - snapshotStore, err := raft.NewFileSnapshotStore(s.dataDir, 2, ioutil.Discard) + snapshotStore, err := raft.NewFileSnapshotStore(s.dataDirectory, 2, ioutil.Discard) if err != nil { - s.logger.Error("failed to create file snapshot store", zap.String("path", s.dataDir), zap.Error(err)) + s.logger.Error("failed to create file snapshot store", zap.String("path", s.dataDirectory), zap.Error(err)) return err } - logStorePath := filepath.Join(s.dataDir, "raft", "log") + logStorePath := filepath.Join(s.dataDirectory, "raft", "log") err = os.MkdirAll(logStorePath, 0755) if err != nil { s.logger.Fatal(err.Error()) @@ -107,7 +107,7 @@ func (s *RaftServer) Start() error { return err } - stableStorePath := filepath.Join(s.dataDir, "raft", "stable") + stableStorePath := filepath.Join(s.dataDirectory, "raft", "stable") err = os.MkdirAll(stableStorePath, 0755) if err != nil { s.logger.Fatal(err.Error()) @@ -150,7 +150,7 @@ func (s *RaftServer) Start() error { s.startWatchCluster(500 * time.Millisecond) }() - s.logger.Info("Raft server started", zap.String("addr", s.bindAddr)) + s.logger.Info("Raft server started", zap.String("raft_address", s.raftAddress)) return nil } @@ -168,7 +168,7 @@ func (s *RaftServer) Stop() error { if future := s.raft.Shutdown(); future.Error() != nil { s.logger.Info("failed to shutdown Raft", zap.Error(future.Error())) } - s.logger.Info("Raft has shutdown", zap.String("addr", s.bindAddr)) + s.logger.Info("Raft has shutdown", zap.String("raft_address", s.raftAddress)) return nil } @@ -206,79 +206,79 @@ func (s *RaftServer) startWatchCluster(checkInterval time.Duration) { switch raftStats["state"] { case "Follower": - metric.RaftStateMetric.WithLabelValues(s.nodeId).Set(float64(raft.Follower)) + metric.RaftStateMetric.WithLabelValues(s.id).Set(float64(raft.Follower)) case "Candidate": - metric.RaftStateMetric.WithLabelValues(s.nodeId).Set(float64(raft.Candidate)) + metric.RaftStateMetric.WithLabelValues(s.id).Set(float64(raft.Candidate)) case "Leader": - metric.RaftStateMetric.WithLabelValues(s.nodeId).Set(float64(raft.Leader)) + metric.RaftStateMetric.WithLabelValues(s.id).Set(float64(raft.Leader)) case "Shutdown": - metric.RaftStateMetric.WithLabelValues(s.nodeId).Set(float64(raft.Shutdown)) + metric.RaftStateMetric.WithLabelValues(s.id).Set(float64(raft.Shutdown)) } if term, err := strconv.ParseFloat(raftStats["term"], 64); err == nil { - metric.RaftTermMetric.WithLabelValues(s.nodeId).Set(term) + metric.RaftTermMetric.WithLabelValues(s.id).Set(term) } if lastLogIndex, err := strconv.ParseFloat(raftStats["last_log_index"], 64); err == nil { - metric.RaftLastLogIndexMetric.WithLabelValues(s.nodeId).Set(lastLogIndex) + metric.RaftLastLogIndexMetric.WithLabelValues(s.id).Set(lastLogIndex) } if lastLogTerm, err := strconv.ParseFloat(raftStats["last_log_term"], 64); err == nil { - metric.RaftLastLogTermMetric.WithLabelValues(s.nodeId).Set(lastLogTerm) + metric.RaftLastLogTermMetric.WithLabelValues(s.id).Set(lastLogTerm) } if commitIndex, err := strconv.ParseFloat(raftStats["commit_index"], 64); err == nil { - metric.RaftCommitIndexMetric.WithLabelValues(s.nodeId).Set(commitIndex) + metric.RaftCommitIndexMetric.WithLabelValues(s.id).Set(commitIndex) } if appliedIndex, err := strconv.ParseFloat(raftStats["applied_index"], 64); err == nil { - metric.RaftAppliedIndexMetric.WithLabelValues(s.nodeId).Set(appliedIndex) + metric.RaftAppliedIndexMetric.WithLabelValues(s.id).Set(appliedIndex) } if fsmPending, err := strconv.ParseFloat(raftStats["fsm_pending"], 64); err == nil { - metric.RaftFsmPendingMetric.WithLabelValues(s.nodeId).Set(fsmPending) + metric.RaftFsmPendingMetric.WithLabelValues(s.id).Set(fsmPending) } if lastSnapshotIndex, err := strconv.ParseFloat(raftStats["last_snapshot_index"], 64); err == nil { - metric.RaftLastSnapshotIndexMetric.WithLabelValues(s.nodeId).Set(lastSnapshotIndex) + metric.RaftLastSnapshotIndexMetric.WithLabelValues(s.id).Set(lastSnapshotIndex) } if lastSnapshotTerm, err := strconv.ParseFloat(raftStats["last_snapshot_term"], 64); err == nil { - metric.RaftLastSnapshotTermMetric.WithLabelValues(s.nodeId).Set(lastSnapshotTerm) + metric.RaftLastSnapshotTermMetric.WithLabelValues(s.id).Set(lastSnapshotTerm) } if latestConfigurationIndex, err := strconv.ParseFloat(raftStats["latest_configuration_index"], 64); err == nil { - metric.RaftLatestConfigurationIndexMetric.WithLabelValues(s.nodeId).Set(latestConfigurationIndex) + metric.RaftLatestConfigurationIndexMetric.WithLabelValues(s.id).Set(latestConfigurationIndex) } if numPeers, err := strconv.ParseFloat(raftStats["num_peers"], 64); err == nil { - metric.RaftNumPeersMetric.WithLabelValues(s.nodeId).Set(numPeers) + metric.RaftNumPeersMetric.WithLabelValues(s.id).Set(numPeers) } if lastContact, err := strconv.ParseFloat(raftStats["last_contact"], 64); err == nil { - metric.RaftLastContactMetric.WithLabelValues(s.nodeId).Set(lastContact) + metric.RaftLastContactMetric.WithLabelValues(s.id).Set(lastContact) } if nodes, err := s.Nodes(); err == nil { - metric.RaftNumNodesMetric.WithLabelValues(s.nodeId).Set(float64(len(nodes))) + metric.RaftNumNodesMetric.WithLabelValues(s.id).Set(float64(len(nodes))) } kvsStats := s.fsm.Stats() if numReads, err := strconv.ParseFloat(kvsStats["num_reads"], 64); err == nil { - metric.KvsNumReadsMetric.WithLabelValues(s.nodeId).Set(numReads) + metric.KvsNumReadsMetric.WithLabelValues(s.id).Set(numReads) } if numWrites, err := strconv.ParseFloat(kvsStats["num_writes"], 64); err == nil { - metric.KvsNumWritesMetric.WithLabelValues(s.nodeId).Set(numWrites) + metric.KvsNumWritesMetric.WithLabelValues(s.id).Set(numWrites) } if numBytesRead, err := strconv.ParseFloat(kvsStats["num_bytes_read"], 64); err == nil { - metric.KvsNumBytesReadMetric.WithLabelValues(s.nodeId).Set(numBytesRead) + metric.KvsNumBytesReadMetric.WithLabelValues(s.id).Set(numBytesRead) } if numBytesWritten, err := strconv.ParseFloat(kvsStats["num_bytes_written"], 64); err == nil { - metric.KvsNumBytesWrittenMetric.WithLabelValues(s.nodeId).Set(numBytesWritten) + metric.KvsNumBytesWrittenMetric.WithLabelValues(s.id).Set(numBytesWritten) } var numLsmGets map[string]interface{} @@ -296,39 +296,39 @@ func (s *RaftServer) startWatchCluster(checkInterval time.Duration) { } if numGets, err := strconv.ParseFloat(kvsStats["num_gets"], 64); err == nil { - metric.KvsNumGetsMetric.WithLabelValues(s.nodeId).Set(numGets) + metric.KvsNumGetsMetric.WithLabelValues(s.id).Set(numGets) } if numPuts, err := strconv.ParseFloat(kvsStats["num_puts"], 64); err == nil { - metric.KvsNumPutsMetric.WithLabelValues(s.nodeId).Set(numPuts) + metric.KvsNumPutsMetric.WithLabelValues(s.id).Set(numPuts) } if numBlockedPuts, err := strconv.ParseFloat(kvsStats["num_blocked_puts"], 64); err == nil { - metric.KvsNumBlockedPutsMetric.WithLabelValues(s.nodeId).Set(numBlockedPuts) + metric.KvsNumBlockedPutsMetric.WithLabelValues(s.id).Set(numBlockedPuts) } if numMemtablesGets, err := strconv.ParseFloat(kvsStats["num_memtables_gets"], 64); err == nil { - metric.KvsNumMemtablesGetsMetric.WithLabelValues(s.nodeId).Set(numMemtablesGets) + metric.KvsNumMemtablesGetsMetric.WithLabelValues(s.id).Set(numMemtablesGets) } var lsmSize map[string]interface{} if err := json.Unmarshal([]byte(kvsStats["lsm_size"]), &lsmSize); err == nil { for key, value := range lsmSize { - metric.KvsLSMSizeMetric.WithLabelValues(s.nodeId, key).Set(value.(float64)) + metric.KvsLSMSizeMetric.WithLabelValues(s.id, key).Set(value.(float64)) } } var vlogSize map[string]interface{} if err := json.Unmarshal([]byte(kvsStats["vlog_size"]), &vlogSize); err == nil { for key, value := range vlogSize { - metric.KvsVlogSizeMetric.WithLabelValues(s.nodeId, key).Set(value.(float64)) + metric.KvsVlogSizeMetric.WithLabelValues(s.id, key).Set(value.(float64)) } } var pendingWrites map[string]interface{} if err := json.Unmarshal([]byte(kvsStats["pending_writes"]), &pendingWrites); err == nil { for key, value := range pendingWrites { - metric.KvsPendingWritesMetric.WithLabelValues(s.nodeId, key).Set(value.(float64)) + metric.KvsPendingWritesMetric.WithLabelValues(s.id, key).Set(value.(float64)) } } } @@ -357,7 +357,7 @@ func (s *RaftServer) LeaderAddress(timeout time.Duration) (raft.ServerAddress, e case <-ticker.C: leaderAddr := s.raft.Leader() if leaderAddr != "" { - s.logger.Info("detected a leader address", zap.String("addr", string(leaderAddr))) + s.logger.Debug("detected a leader address", zap.String("raft_address", string(leaderAddr))) return leaderAddr, nil } case <-timer.C: @@ -472,13 +472,13 @@ func (s *RaftServer) Join(id string, node *protobuf.Node) error { } if nodeExists { - s.logger.Debug("node already exists", zap.String("id", id), zap.String("bind_addr", node.BindAddr)) + s.logger.Debug("node already exists", zap.String("id", id), zap.String("raft_address", node.RaftAddress)) } else { - if future := s.raft.AddVoter(raft.ServerID(id), raft.ServerAddress(node.BindAddr), 0, 0); future.Error() != nil { - s.logger.Error("failed to add voter", zap.String("id", id), zap.String("bind_addr", node.BindAddr), zap.Error(future.Error())) + if future := s.raft.AddVoter(raft.ServerID(id), raft.ServerAddress(node.RaftAddress), 0, 0); future.Error() != nil { + s.logger.Error("failed to add voter", zap.String("id", id), zap.String("raft_address", node.RaftAddress), zap.Error(future.Error())) return future.Error() } - s.logger.Info("node has successfully joined", zap.String("id", id), zap.String("bind_addr", node.BindAddr)) + s.logger.Info("node has successfully joined", zap.String("id", id), zap.String("raft_address", node.RaftAddress)) } if err := s.join(id, node.Metadata); err != nil { @@ -556,7 +556,7 @@ func (s *RaftServer) Node() (*protobuf.Node, error) { return nil, err } - node, ok := nodes[s.nodeId] + node, ok := nodes[s.id] if !ok { return nil, errors.ErrNotFound } @@ -576,8 +576,8 @@ func (s *RaftServer) Nodes() (map[string]*protobuf.Node, error) { nodes := make(map[string]*protobuf.Node, 0) for _, server := range cf.Configuration().Servers { nodes[string(server.ID)] = &protobuf.Node{ - BindAddr: string(server.Address), - Metadata: s.fsm.getMetadata(string(server.ID)), + RaftAddress: string(server.Address), + Metadata: s.fsm.getMetadata(string(server.ID)), } }