From 015665227c3d96391316e4ad340a1b2b940ff2ad Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 15 Dec 2023 12:03:28 +0530 Subject: [PATCH] chore: reduce refactor (#92) Signed-off-by: Yashash H L --- pkg/reducer/examples/counter/go.mod | 2 +- pkg/reducer/examples/counter/go.sum | 4 ++-- pkg/reducer/examples/counter/main.go | 2 +- pkg/reducer/examples/sum/go.mod | 2 +- pkg/reducer/examples/sum/go.sum | 4 ++-- pkg/reducer/examples/sum/main.go | 18 ++++++++++++++---- pkg/reducer/interface.go | 27 ++++++++++++++++++++++++--- pkg/reducer/server.go | 4 ++-- pkg/reducer/server_test.go | 7 ++++--- pkg/reducer/service.go | 8 +++++--- pkg/reducer/service_test.go | 20 ++++++++++---------- 11 files changed, 66 insertions(+), 32 deletions(-) diff --git a/pkg/reducer/examples/counter/go.mod b/pkg/reducer/examples/counter/go.mod index 8b968e1e..94c9f552 100644 --- a/pkg/reducer/examples/counter/go.mod +++ b/pkg/reducer/examples/counter/go.mod @@ -2,7 +2,7 @@ module even_odd go 1.20 -require github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe +require github.com/numaproj/numaflow-go v0.5.3-0.20231215063037-12be0a69e374 require ( github.com/golang/protobuf v1.5.3 // indirect diff --git a/pkg/reducer/examples/counter/go.sum b/pkg/reducer/examples/counter/go.sum index 18c03144..8859f958 100644 --- a/pkg/reducer/examples/counter/go.sum +++ b/pkg/reducer/examples/counter/go.sum @@ -4,8 +4,8 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe h1:nK/BGffgwQ4L9pyllwzSZttPxMf+OOqK3DOP97KZdRk= -github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe/go.mod h1:zcJq1YAA/jnxCQLW7EFK4+HXWCd2QtW4tMOvRjHFa2g= +github.com/numaproj/numaflow-go v0.5.3-0.20231215063037-12be0a69e374 h1:y3/8VzqE/9tHv36eh/X3P+rrqZa2ieOIJZBTCGIT+NY= +github.com/numaproj/numaflow-go v0.5.3-0.20231215063037-12be0a69e374/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= diff --git a/pkg/reducer/examples/counter/main.go b/pkg/reducer/examples/counter/main.go index a3868732..1ce455e3 100644 --- a/pkg/reducer/examples/counter/main.go +++ b/pkg/reducer/examples/counter/main.go @@ -20,5 +20,5 @@ func reduceCounter(_ context.Context, keys []string, reduceCh <-chan reducer.Dat } func main() { - reducer.NewServer(reducer.ReducerFunc(reduceCounter)).Start(context.Background()) + reducer.NewServer(reducer.SimpleCreatorWithReduceFn(reduceCounter)).Start(context.Background()) } diff --git a/pkg/reducer/examples/sum/go.mod b/pkg/reducer/examples/sum/go.mod index 8b968e1e..686149cd 100644 --- a/pkg/reducer/examples/sum/go.mod +++ b/pkg/reducer/examples/sum/go.mod @@ -2,7 +2,7 @@ module even_odd go 1.20 -require github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe +require github.com/numaproj/numaflow-go v0.5.3-0.20231214163007-161ba6e207df require ( github.com/golang/protobuf v1.5.3 // indirect diff --git a/pkg/reducer/examples/sum/go.sum b/pkg/reducer/examples/sum/go.sum index 18c03144..655c2f0f 100644 --- a/pkg/reducer/examples/sum/go.sum +++ b/pkg/reducer/examples/sum/go.sum @@ -4,8 +4,8 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe h1:nK/BGffgwQ4L9pyllwzSZttPxMf+OOqK3DOP97KZdRk= -github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe/go.mod h1:zcJq1YAA/jnxCQLW7EFK4+HXWCd2QtW4tMOvRjHFa2g= +github.com/numaproj/numaflow-go v0.5.3-0.20231214163007-161ba6e207df h1:SPBNZRGNIyMRr/KY9SnVuyYkyBShyQ0qlbKqhUIlIe0= +github.com/numaproj/numaflow-go v0.5.3-0.20231214163007-161ba6e207df/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= diff --git a/pkg/reducer/examples/sum/main.go b/pkg/reducer/examples/sum/main.go index cd2dffa8..efc91e00 100644 --- a/pkg/reducer/examples/sum/main.go +++ b/pkg/reducer/examples/sum/main.go @@ -9,8 +9,17 @@ import ( "github.com/numaproj/numaflow-go/pkg/reducer" ) +// SumReducerCreator implements the reducer.ReducerCreator interface which creates a reducer +type SumReducerCreator struct { +} + +func (s *SumReducerCreator) Create() reducer.Reducer { + return &Sum{} +} + // Sum is a reducer that sum up the values for the given keys type Sum struct { + sum int } func (s *Sum) Reduce(ctx context.Context, keys []string, reduceCh <-chan reducer.Datum, md reducer.Metadata) reducer.Messages { @@ -19,10 +28,11 @@ func (s *Sum) Reduce(ctx context.Context, keys []string, reduceCh <-chan reducer _ = intervalWindow var resultKeys = keys var resultVal []byte - var sum = 0 // sum up the values for d := range reduceCh { val := d.Value() + + // event time and watermark can be fetched from the datum eventTime := d.EventTime() _ = eventTime watermark := d.Watermark() @@ -33,14 +43,14 @@ func (s *Sum) Reduce(ctx context.Context, keys []string, reduceCh <-chan reducer fmt.Printf("unable to convert the value to int: %v\n", err) continue } - sum += v + s.sum += v } - resultVal = []byte(strconv.Itoa(sum)) + resultVal = []byte(strconv.Itoa(s.sum)) return reducer.MessagesBuilder().Append(reducer.NewMessage(resultVal).WithKeys(resultKeys)) } func main() { - err := reducer.NewServer(&Sum{}).Start(context.Background()) + err := reducer.NewServer(&SumReducerCreator{}).Start(context.Background()) if err != nil { log.Panic("unable to start the server due to: ", err) } diff --git a/pkg/reducer/interface.go b/pkg/reducer/interface.go index 35b37bbb..1a7a4419 100644 --- a/pkg/reducer/interface.go +++ b/pkg/reducer/interface.go @@ -23,15 +23,36 @@ type IntervalWindow interface { EndTime() time.Time } +// ReducerCreator is the interface which is used to create a Reducer. +type ReducerCreator interface { + // Create creates a Reducer, will be invoked once for every keyed window. + Create() Reducer +} + +// simpleReducerCreator is an implementation of ReducerCreator, which creates a Reducer for the given function. +type simpleReducerCreator struct { + f func(context.Context, []string, <-chan Datum, Metadata) Messages +} + +// Create creates a Reducer for the given function. +func (s *simpleReducerCreator) Create() Reducer { + return reducerFn(s.f) +} + +// SimpleCreatorWithReduceFn creates a simple ReducerCreator for the given reduce function. +func SimpleCreatorWithReduceFn(f func(context.Context, []string, <-chan Datum, Metadata) Messages) ReducerCreator { + return &simpleReducerCreator{f: f} +} + // Reducer is the interface of reduce function implementation. type Reducer interface { Reduce(ctx context.Context, keys []string, reduceCh <-chan Datum, md Metadata) Messages } -// ReducerFunc is a utility type used to convert a Reduce function to a Reducer. -type ReducerFunc func(ctx context.Context, keys []string, reduceCh <-chan Datum, md Metadata) Messages +// reducerFn is a utility type used to convert a Reduce function to a Reducer. +type reducerFn func(ctx context.Context, keys []string, reduceCh <-chan Datum, md Metadata) Messages // Reduce implements the function of reduce function. -func (rf ReducerFunc) Reduce(ctx context.Context, keys []string, reduceCh <-chan Datum, md Metadata) Messages { +func (rf reducerFn) Reduce(ctx context.Context, keys []string, reduceCh <-chan Datum, md Metadata) Messages { return rf(ctx, keys, reduceCh, md) } diff --git a/pkg/reducer/server.go b/pkg/reducer/server.go index 81421481..6b8ef5aa 100644 --- a/pkg/reducer/server.go +++ b/pkg/reducer/server.go @@ -18,14 +18,14 @@ type server struct { } // NewServer creates a new reduce server. -func NewServer(r Reducer, inputOptions ...Option) numaflow.Server { +func NewServer(r ReducerCreator, inputOptions ...Option) numaflow.Server { opts := DefaultOptions() for _, inputOption := range inputOptions { inputOption(opts) } s := new(server) s.svc = new(Service) - s.svc.Reducer = r + s.svc.CreateReduceHandler = r s.opts = opts return s } diff --git a/pkg/reducer/server_test.go b/pkg/reducer/server_test.go index 511bebb8..e5a29b2f 100644 --- a/pkg/reducer/server_test.go +++ b/pkg/reducer/server_test.go @@ -21,17 +21,18 @@ func TestReduceServer_Start(t *testing.T) { _ = os.RemoveAll(serverInfoFile.Name()) }() - var reduceHandler = ReducerFunc(func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages { + var rfn = func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages { sum := 0 for val := range rch { msgVal, _ := strconv.Atoi(string(val.Value())) sum += msgVal } return MessagesBuilder().Append(NewMessage([]byte(strconv.Itoa(sum))).WithKeys([]string{keys[0] + "_test"})) - }) + } + // note: using actual uds connection ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second) defer cancel() - err := NewServer(reduceHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx) + err := NewServer(SimpleCreatorWithReduceFn(rfn), WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx) assert.NoError(t, err) } diff --git a/pkg/reducer/service.go b/pkg/reducer/service.go index 76610ef6..4e4321dd 100644 --- a/pkg/reducer/service.go +++ b/pkg/reducer/service.go @@ -30,8 +30,7 @@ const ( // Service implements the proto gen server interface and contains the reduce operation handler. type Service struct { reducepb.UnimplementedReduceServer - - Reducer Reducer + CreateReduceHandler ReducerCreator } // IsReady returns true to indicate the gRPC connection is ready. @@ -109,7 +108,10 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error { // we stream the messages to the user by writing messages // to channel and wait until we get the result and stream // the result back to the client (numaflow). - messages := fs.Reducer.Reduce(ctx, k, ch, md) + + // create a new reducer, since we got a new key + reducer := fs.CreateReduceHandler.Create() + messages := reducer.Reduce(ctx, k, ch, md) datumList := buildDatumList(messages) // stream.Send() is not thread safe. diff --git a/pkg/reducer/service_test.go b/pkg/reducer/service_test.go index 8eb6e488..98deeb56 100644 --- a/pkg/reducer/service_test.go +++ b/pkg/reducer/service_test.go @@ -55,21 +55,21 @@ func TestService_ReduceFn(t *testing.T) { tests := []struct { name string - handler Reducer + handler func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages input []*reducepb.ReduceRequest expected *reducepb.ReduceResponse expectedErr bool }{ { name: "reduce_fn_forward_msg_same_keys", - handler: ReducerFunc(func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages { + handler: func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages { sum := 0 for val := range rch { msgVal, _ := strconv.Atoi(string(val.Value())) sum += msgVal } return MessagesBuilder().Append(NewMessage([]byte(strconv.Itoa(sum))).WithKeys([]string{keys[0] + "_test"})) - }), + }, input: []*reducepb.ReduceRequest{ { Keys: []string{"client"}, @@ -102,14 +102,14 @@ func TestService_ReduceFn(t *testing.T) { }, { name: "reduce_fn_forward_msg_multiple_keys", - handler: ReducerFunc(func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages { + handler: func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages { sum := 0 for val := range rch { msgVal, _ := strconv.Atoi(string(val.Value())) sum += msgVal } return MessagesBuilder().Append(NewMessage([]byte(strconv.Itoa(sum))).WithKeys([]string{keys[0] + "_test"})) - }), + }, input: []*reducepb.ReduceRequest{ { Keys: []string{"client1"}, @@ -168,14 +168,14 @@ func TestService_ReduceFn(t *testing.T) { }, { name: "reduce_fn_forward_msg_forward_to_all", - handler: ReducerFunc(func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages { + handler: func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages { sum := 0 for val := range rch { msgVal, _ := strconv.Atoi(string(val.Value())) sum += msgVal } return MessagesBuilder().Append(NewMessage([]byte(strconv.Itoa(sum)))) - }), + }, input: []*reducepb.ReduceRequest{ { Keys: []string{"client"}, @@ -207,14 +207,14 @@ func TestService_ReduceFn(t *testing.T) { }, { name: "reduce_fn_forward_msg_drop_msg", - handler: ReducerFunc(func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages { + handler: func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages { sum := 0 for val := range rch { msgVal, _ := strconv.Atoi(string(val.Value())) sum += msgVal } return MessagesBuilder().Append(MessageToDrop()) - }), + }, input: []*reducepb.ReduceRequest{ { Keys: []string{"client"}, @@ -249,7 +249,7 @@ func TestService_ReduceFn(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { fs := &Service{ - Reducer: tt.handler, + CreateReduceHandler: SimpleCreatorWithReduceFn(tt.handler), } // here's a trick for testing: // because we are not using gRPC, we directly set a new incoming ctx