Skip to content

Commit

Permalink
Add grpc method SetConsoleState
Browse files Browse the repository at this point in the history
This commit define the grpc method SetConsoleState to support
set the console state remotely.
  • Loading branch information
chenglch committed Nov 2, 2017
1 parent df9103b commit cc8afb3
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 121 deletions.
4 changes: 0 additions & 4 deletions api/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ func (api *NodeApi) put(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
plog.Debug(fmt.Sprintf("Receive %s request %s %v from %s.", req.Method, req.URL.Path, vars, req.RemoteAddr))
var err error
if !nodeManager.Exists(vars["node"]) {
plog.HandleHttp(w, req, http.StatusBadRequest, err)
return
}
if _, ok := req.URL.Query()["state"]; !ok {
err = errors.New("Clould not locate the state parameters from URL")
plog.HandleHttp(w, req, http.StatusBadRequest, err)
Expand Down
2 changes: 1 addition & 1 deletion console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (c *Console) readTarget() {
b := make([]byte, 4096)
logFile := fmt.Sprintf("%s%c%s.log", serverConfig.Console.LogDir, filepath.Separator, c.node.StorageNode.Name)
msg := fmt.Sprintf("\nConnect to %s at %s\n\n", c.node.StorageNode.Name, time.Now().Format("2006-01-02 15:04:05"))
c.logger(logFile, []byte(msg))
err = c.logger(logFile, []byte(msg))
if err != nil {
plog.WarningNode(c.node.StorageNode.Name, fmt.Sprintf("Failed to log message to %s. Error:%s", logFile, err.Error()))
return
Expand Down
116 changes: 99 additions & 17 deletions console/consolepb/manager.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions console/consolepb/manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package consolepb;

service ConsoleManager {
rpc ShowNode (NodeName) returns (Node) {};
rpc SetConsoleState (NodesState) returns (Result) {};
}

message NodeName {
Expand All @@ -17,3 +18,12 @@ message Node {
bool ondemand = 4;
int32 status = 5;
}

message NodesState {
repeated string names = 1;
string state = 2;
}

message Result {
map<string, string> result = 1;
}
147 changes: 147 additions & 0 deletions console/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package console

import (
"context"
"errors"
"fmt"
"github.com/chenglch/consoleserver/common"
pb "github.com/chenglch/consoleserver/console/consolepb"
net_context "golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"net"
)

type ConsoleRPCServer struct {
port string
host string
server *grpc.Server
}

func newConsoleRPCServer() *ConsoleRPCServer {
return &ConsoleRPCServer{port: serverConfig.Console.RPCPort, host: serverConfig.Global.Host}
}

func (s *ConsoleRPCServer) ShowNode(ctx net_context.Context, rpcNode *pb.NodeName) (*pb.Node, error) {
plog.Debug("Receive the RPC call ShowNode")
nodeManager.RWlock.RLock()
if !nodeManager.Exists(rpcNode.Name) {
nodeManager.RWlock.RUnlock()
return nil, errors.New(fmt.Sprintf("Could not find node %s on %s", rpcNode.Name, serverConfig.Global.Host))
}
node := nodeManager.Nodes[rpcNode.Name]
retNode := pb.Node{Name: node.StorageNode.Name,
Driver: node.StorageNode.Driver,
Params: node.StorageNode.Params,
Ondemand: node.StorageNode.Ondemand,
Status: int32(node.status)}
nodeManager.RWlock.RUnlock()
return &retNode, nil
}

func (s *ConsoleRPCServer) SetConsoleState(ctx net_context.Context, pbNodesStae *pb.NodesState) (*pb.Result, error) {
plog.Debug("Receive the RPC call SetConsoleState")
nodeManager.RWlock.RLock()
names := make([]string, 0)
for _, name := range pbNodesStae.Names {
if !nodeManager.Exists(name) {
plog.ErrorNode(name, fmt.Sprintf("Could not find node on %s", serverConfig.Global.Host))
continue
}
names = append(names, name)
}
nodeManager.RWlock.RUnlock()
result := nodeManager.setConsoleState(names, pbNodesStae.State)
return &pb.Result{Result: result}, nil
}

func (cRPCServer *ConsoleRPCServer) serve() {
var creds credentials.TransportCredentials
var err error
var s *grpc.Server
if serverConfig.Global.SSLCACertFile != "" && serverConfig.Global.SSLKeyFile != "" && serverConfig.Global.SSLCertFile != "" {
tlsConfig, err := common.LoadServerTlsConfig(serverConfig.Global.SSLCertFile,
serverConfig.Global.SSLKeyFile, serverConfig.Global.SSLCACertFile)
if err != nil {
panic(err)
}
creds = credentials.NewTLS(tlsConfig)
}
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", cRPCServer.host, cRPCServer.port))
if err != nil {
panic(err)
}
if creds != nil {
s = grpc.NewServer(grpc.Creds(creds))
} else {
s = grpc.NewServer()
}
pb.RegisterConsoleManagerServer(s, cRPCServer)
plog.Debug(fmt.Sprintf("Rpc server is listening on %s:%s", cRPCServer.host, cRPCServer.port))
go s.Serve(lis)
}

type ConsoleRPCClient struct {
host string
port string
}

func newConsoleRPCClient(host string, port string) *ConsoleRPCClient {
return &ConsoleRPCClient{host: host, port: port}
}

func (cRPCClient *ConsoleRPCClient) connect() (*grpc.ClientConn, error) {
var creds credentials.TransportCredentials
var err error
var conn *grpc.ClientConn
if serverConfig.Global.SSLCACertFile != "" && serverConfig.Global.SSLKeyFile != "" && serverConfig.Global.SSLCertFile != "" {
tlsConfig, err := common.LoadClientTlsConfig(serverConfig.Global.SSLCertFile,
serverConfig.Global.SSLKeyFile, serverConfig.Global.SSLCACertFile, cRPCClient.host)
if err != nil {
panic(err)
}
creds = credentials.NewTLS(tlsConfig)
}
addr := fmt.Sprintf("%s:%s", cRPCClient.host, cRPCClient.port)
if creds != nil {
conn, err = grpc.Dial(addr, grpc.WithTransportCredentials(creds))
} else {
conn, err = grpc.Dial(addr, grpc.WithInsecure())
}
if err != nil {
plog.Error(err)
return nil, err
}
plog.Debug(fmt.Sprintf("Connect to %s to call the RPC method", addr))
return conn, nil
}

func (cRPCClient *ConsoleRPCClient) ShowNode(name string) (*pb.Node, error) {
conn, err := cRPCClient.connect()
if err != nil {
return nil, err
}
defer conn.Close()
c := pb.NewConsoleManagerClient(conn)
node, err := c.ShowNode(context.Background(), &pb.NodeName{Name: name})
if err != nil {
plog.Error(err)
return nil, err
}
return node, nil
}

func (cRPCClient *ConsoleRPCClient) SetConsoleState(names []string, state string) (map[string]string, error) {
conn, err := cRPCClient.connect()
if err != nil {
return nil, err
}
defer conn.Close()
c := pb.NewConsoleManagerClient(conn)
pbResult, err := c.SetConsoleState(context.Background(), &pb.NodesState{Names: names, State: state})
if err != nil {
plog.Error(err)
return nil, err
}
return pbResult.Result, nil
}
Loading

0 comments on commit cc8afb3

Please sign in to comment.