Skip to content
This repository has been archived by the owner on Aug 3, 2023. It is now read-only.

Commit

Permalink
Merge pull request #36 from openfresh/feature/request_stream
Browse files Browse the repository at this point in the history
modified event transmission method to gRPC request stream
  • Loading branch information
stormcat24 authored May 29, 2017
2 parents 4f90994 + e722236 commit 9942376
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 56 deletions.
28 changes: 27 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,15 @@ func main() {
},
}

ss, err := client.Events(ctx, &req)
ss, err := client.Events(ctx)
if err != nil {
log.Fatal(err)
}

// subscribe event
if err := ss.Send(&req); err != nil {
log.Fatal(err)
}

for {
resp, err := ss.Recv()
Expand All @@ -157,6 +162,27 @@ func main() {
}
```

### unsubscribe

`Events` request is stream. If you unsubscribe event, set empty event data.

```
req := proto.Request{
// empty events
Events: []*proto.EventType{},
}
ss, err := client.Events(ctx)
if err != nil {
log.Fatal(err)
}
// unsubscribe event
if err := ss.Send(&req); err != nil {
log.Fatal(err)
}
```

# Usage Publisher

You publish events to the channel that Plasma subscribes according to the following JSON Schema.
Expand Down
18 changes: 18 additions & 0 deletions manager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ func (c *Client) ReceivePayload() <-chan event.Payload {
return c.payloadChan
}

func (c *Client) SetEvents(events []string) {
c.events = events
}

func NewClient(events []string) Client {
return Client{
events: events,
Expand Down Expand Up @@ -59,6 +63,20 @@ func (cm *ClientManager) RemoveClient(client Client) {
close(client.payloadChan)
}

func (cm *ClientManager) DeleteEvents(client *Client) {
for _, e := range client.events {
clients, ok := cm.clientsTable[e]
if !ok {
continue
}
clients.mu.Lock()
delete(clients.clients, client.payloadChan)
clients.mu.Unlock()

delete(cm.clientsTable, e)
}
}

const eventSeparator = ":"

func (cm *ClientManager) createEvents(request string) []string {
Expand Down
47 changes: 26 additions & 21 deletions protobuf/stream.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion protobuf/stream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ option objc_class_prefix = "PLASMA";
package proto;

service StreamService {
rpc Events(Request) returns (stream Payload) {}
rpc Events(stream Request) returns (stream Payload) {}

}

Expand Down
105 changes: 73 additions & 32 deletions server/grpc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"io"
"time"

"go.uber.org/zap"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/openfresh/plasma/protobuf"
"github.com/openfresh/plasma/pubsub"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
)

Expand Down Expand Up @@ -61,25 +63,34 @@ func NewGRPCServer(opt Option) (*GRPCServer, error) {
return gs, nil
}

type refreshEvents struct {
client *manager.Client
events []string
}

type StreamServer struct {
clientManager *manager.ClientManager
newClients chan manager.Client
removeClients chan manager.Client
payloads chan event.Payload
pubsub pubsub.PubSuber
accessLogger *zap.Logger
errorLogger *zap.Logger
clientManager *manager.ClientManager
newClients chan manager.Client
removeClients chan manager.Client
payloads chan event.Payload
resfreshEvents chan refreshEvents
errChan chan error
pubsub pubsub.PubSuber
accessLogger *zap.Logger
errorLogger *zap.Logger
}

func NewStreamServer(opt Option) *StreamServer {
ss := &StreamServer{
clientManager: manager.NewClientManager(),
newClients: make(chan manager.Client),
removeClients: make(chan manager.Client),
payloads: make(chan event.Payload),
pubsub: opt.PubSuber,
accessLogger: opt.AccessLogger,
errorLogger: opt.ErrorLogger,
clientManager: manager.NewClientManager(),
newClients: make(chan manager.Client),
removeClients: make(chan manager.Client),
payloads: make(chan event.Payload),
errChan: make(chan error),
resfreshEvents: make(chan refreshEvents),
pubsub: opt.PubSuber,
accessLogger: opt.AccessLogger,
errorLogger: opt.ErrorLogger,
}
ss.pubsub.Subscribe(func(payload event.Payload) {
ss.payloads <- payload
Expand All @@ -101,35 +112,65 @@ func (ss *StreamServer) Run() {
metrics.DecConnection()
case payload := <-ss.payloads:
ss.clientManager.SendPayload(payload)
case re := <-ss.resfreshEvents:
ss.clientManager.DeleteEvents(re.client)
re.client.SetEvents(re.events)
ss.clientManager.AddClient(*re.client)
}
}
}()
}

func (ss *StreamServer) Events(request *proto.Request, es proto.StreamService_EventsServer) error {
ss.accessLogger.Info("gRPC",
zap.Array("request-events", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error {
for _, e := range request.Events {
enc.AppendString(e.Type)
func (ss *StreamServer) Events(es proto.StreamService_EventsServer) error {
client := manager.NewClient([]string{})
ss.newClients <- client
go func() {
for {
request, err := es.Recv()
if err == io.EOF {
<-es.Context().Done()
return
}
return nil
})),
)
if request == nil || request.Events == nil {
return errors.New("request can't be nil")
}

l := len(request.Events)
events := make([]string, l)
for i := 0; i < l; i++ {
events[i] = request.Events[i].Type
}
if err != nil {
if grpc.Code(err) != codes.Canceled {
ss.errChan <- errors.Wrap(err, "Recv error")
return
} else {
<-es.Context().Done()
return
}
}

client := manager.NewClient(events)
ss.newClients <- client
ss.accessLogger.Info("gRPC",
zap.Array("request-events", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error {
for _, e := range request.Events {
enc.AppendString(e.Type)
}
return nil
})),
)
if request.Events == nil {
ss.errChan <- errors.New("event can't be nil")
return
}

l := len(request.Events)
events := make([]string, l)
for i := 0; i < l; i++ {
events[i] = request.Events[i].Type
}
ss.resfreshEvents <- refreshEvents{
client: &client,
events: events,
}
}
}()

for {
select {
case err := <-ss.errChan:
return err
case pl, open := <-client.ReceivePayload():
if !open {
return nil
Expand Down
6 changes: 5 additions & 1 deletion server/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ func TestGRPCEvents(t *testing.T) {
defer conn.Close()
client := proto.NewStreamServiceClient(conn)
ctx := context.Background()
ss, err := client.Events(ctx, &cases[i].req)
ss, err := client.Events(ctx)
if err := ss.Send(&cases[i].req); err != nil {
require.NoError(err)
}

require.NoError(err)
isFirst := true
for cases[i].expectCount != cases[i].actualCount {
Expand Down

0 comments on commit 9942376

Please sign in to comment.