Skip to content

Commit

Permalink
chore: reduce refactor (#92)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Dec 15, 2023
1 parent bd836d0 commit 0156652
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 32 deletions.
2 changes: 1 addition & 1 deletion pkg/reducer/examples/counter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module even_odd

go 1.20

require github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe
require github.com/numaproj/numaflow-go v0.5.3-0.20231215063037-12be0a69e374

require (
github.com/golang/protobuf v1.5.3 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/reducer/examples/counter/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe h1:nK/BGffgwQ4L9pyllwzSZttPxMf+OOqK3DOP97KZdRk=
github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe/go.mod h1:zcJq1YAA/jnxCQLW7EFK4+HXWCd2QtW4tMOvRjHFa2g=
github.com/numaproj/numaflow-go v0.5.3-0.20231215063037-12be0a69e374 h1:y3/8VzqE/9tHv36eh/X3P+rrqZa2ieOIJZBTCGIT+NY=
github.com/numaproj/numaflow-go v0.5.3-0.20231215063037-12be0a69e374/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
Expand Down
2 changes: 1 addition & 1 deletion pkg/reducer/examples/counter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ func reduceCounter(_ context.Context, keys []string, reduceCh <-chan reducer.Dat
}

func main() {
reducer.NewServer(reducer.ReducerFunc(reduceCounter)).Start(context.Background())
reducer.NewServer(reducer.SimpleCreatorWithReduceFn(reduceCounter)).Start(context.Background())
}
2 changes: 1 addition & 1 deletion pkg/reducer/examples/sum/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module even_odd

go 1.20

require github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe
require github.com/numaproj/numaflow-go v0.5.3-0.20231214163007-161ba6e207df

require (
github.com/golang/protobuf v1.5.3 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/reducer/examples/sum/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe h1:nK/BGffgwQ4L9pyllwzSZttPxMf+OOqK3DOP97KZdRk=
github.com/numaproj/numaflow-go v0.4.6-0.20230828035951-6f79b632ecfe/go.mod h1:zcJq1YAA/jnxCQLW7EFK4+HXWCd2QtW4tMOvRjHFa2g=
github.com/numaproj/numaflow-go v0.5.3-0.20231214163007-161ba6e207df h1:SPBNZRGNIyMRr/KY9SnVuyYkyBShyQ0qlbKqhUIlIe0=
github.com/numaproj/numaflow-go v0.5.3-0.20231214163007-161ba6e207df/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
Expand Down
18 changes: 14 additions & 4 deletions pkg/reducer/examples/sum/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,17 @@ import (
"github.com/numaproj/numaflow-go/pkg/reducer"
)

// SumReducerCreator implements the reducer.ReducerCreator interface which creates a reducer
type SumReducerCreator struct {
}

func (s *SumReducerCreator) Create() reducer.Reducer {
return &Sum{}
}

// Sum is a reducer that sum up the values for the given keys
type Sum struct {
sum int
}

func (s *Sum) Reduce(ctx context.Context, keys []string, reduceCh <-chan reducer.Datum, md reducer.Metadata) reducer.Messages {
Expand All @@ -19,10 +28,11 @@ func (s *Sum) Reduce(ctx context.Context, keys []string, reduceCh <-chan reducer
_ = intervalWindow
var resultKeys = keys
var resultVal []byte
var sum = 0
// sum up the values
for d := range reduceCh {
val := d.Value()

// event time and watermark can be fetched from the datum
eventTime := d.EventTime()
_ = eventTime
watermark := d.Watermark()
Expand All @@ -33,14 +43,14 @@ func (s *Sum) Reduce(ctx context.Context, keys []string, reduceCh <-chan reducer
fmt.Printf("unable to convert the value to int: %v\n", err)
continue
}
sum += v
s.sum += v
}
resultVal = []byte(strconv.Itoa(sum))
resultVal = []byte(strconv.Itoa(s.sum))
return reducer.MessagesBuilder().Append(reducer.NewMessage(resultVal).WithKeys(resultKeys))
}

func main() {
err := reducer.NewServer(&Sum{}).Start(context.Background())
err := reducer.NewServer(&SumReducerCreator{}).Start(context.Background())
if err != nil {
log.Panic("unable to start the server due to: ", err)
}
Expand Down
27 changes: 24 additions & 3 deletions pkg/reducer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,36 @@ type IntervalWindow interface {
EndTime() time.Time
}

// ReducerCreator is the interface which is used to create a Reducer.
type ReducerCreator interface {
// Create creates a Reducer, will be invoked once for every keyed window.
Create() Reducer
}

// simpleReducerCreator is an implementation of ReducerCreator, which creates a Reducer for the given function.
type simpleReducerCreator struct {
f func(context.Context, []string, <-chan Datum, Metadata) Messages
}

// Create creates a Reducer for the given function.
func (s *simpleReducerCreator) Create() Reducer {
return reducerFn(s.f)
}

// SimpleCreatorWithReduceFn creates a simple ReducerCreator for the given reduce function.
func SimpleCreatorWithReduceFn(f func(context.Context, []string, <-chan Datum, Metadata) Messages) ReducerCreator {
return &simpleReducerCreator{f: f}
}

// Reducer is the interface of reduce function implementation.
type Reducer interface {
Reduce(ctx context.Context, keys []string, reduceCh <-chan Datum, md Metadata) Messages
}

// ReducerFunc is a utility type used to convert a Reduce function to a Reducer.
type ReducerFunc func(ctx context.Context, keys []string, reduceCh <-chan Datum, md Metadata) Messages
// reducerFn is a utility type used to convert a Reduce function to a Reducer.
type reducerFn func(ctx context.Context, keys []string, reduceCh <-chan Datum, md Metadata) Messages

// Reduce implements the function of reduce function.
func (rf ReducerFunc) Reduce(ctx context.Context, keys []string, reduceCh <-chan Datum, md Metadata) Messages {
func (rf reducerFn) Reduce(ctx context.Context, keys []string, reduceCh <-chan Datum, md Metadata) Messages {
return rf(ctx, keys, reduceCh, md)
}
4 changes: 2 additions & 2 deletions pkg/reducer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ type server struct {
}

// NewServer creates a new reduce server.
func NewServer(r Reducer, inputOptions ...Option) numaflow.Server {
func NewServer(r ReducerCreator, inputOptions ...Option) numaflow.Server {
opts := DefaultOptions()
for _, inputOption := range inputOptions {
inputOption(opts)
}
s := new(server)
s.svc = new(Service)
s.svc.Reducer = r
s.svc.CreateReduceHandler = r
s.opts = opts
return s
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/reducer/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ func TestReduceServer_Start(t *testing.T) {
_ = os.RemoveAll(serverInfoFile.Name())
}()

var reduceHandler = ReducerFunc(func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages {
var rfn = func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages {
sum := 0
for val := range rch {
msgVal, _ := strconv.Atoi(string(val.Value()))
sum += msgVal
}
return MessagesBuilder().Append(NewMessage([]byte(strconv.Itoa(sum))).WithKeys([]string{keys[0] + "_test"}))
})
}

// note: using actual uds connection
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second)
defer cancel()
err := NewServer(reduceHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx)
err := NewServer(SimpleCreatorWithReduceFn(rfn), WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx)
assert.NoError(t, err)
}
8 changes: 5 additions & 3 deletions pkg/reducer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ const (
// Service implements the proto gen server interface and contains the reduce operation handler.
type Service struct {
reducepb.UnimplementedReduceServer

Reducer Reducer
CreateReduceHandler ReducerCreator
}

// IsReady returns true to indicate the gRPC connection is ready.
Expand Down Expand Up @@ -109,7 +108,10 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
// we stream the messages to the user by writing messages
// to channel and wait until we get the result and stream
// the result back to the client (numaflow).
messages := fs.Reducer.Reduce(ctx, k, ch, md)

// create a new reducer, since we got a new key
reducer := fs.CreateReduceHandler.Create()
messages := reducer.Reduce(ctx, k, ch, md)
datumList := buildDatumList(messages)

// stream.Send() is not thread safe.
Expand Down
20 changes: 10 additions & 10 deletions pkg/reducer/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,21 @@ func TestService_ReduceFn(t *testing.T) {

tests := []struct {
name string
handler Reducer
handler func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages
input []*reducepb.ReduceRequest
expected *reducepb.ReduceResponse
expectedErr bool
}{
{
name: "reduce_fn_forward_msg_same_keys",
handler: ReducerFunc(func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages {
handler: func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages {
sum := 0
for val := range rch {
msgVal, _ := strconv.Atoi(string(val.Value()))
sum += msgVal
}
return MessagesBuilder().Append(NewMessage([]byte(strconv.Itoa(sum))).WithKeys([]string{keys[0] + "_test"}))
}),
},
input: []*reducepb.ReduceRequest{
{
Keys: []string{"client"},
Expand Down Expand Up @@ -102,14 +102,14 @@ func TestService_ReduceFn(t *testing.T) {
},
{
name: "reduce_fn_forward_msg_multiple_keys",
handler: ReducerFunc(func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages {
handler: func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages {
sum := 0
for val := range rch {
msgVal, _ := strconv.Atoi(string(val.Value()))
sum += msgVal
}
return MessagesBuilder().Append(NewMessage([]byte(strconv.Itoa(sum))).WithKeys([]string{keys[0] + "_test"}))
}),
},
input: []*reducepb.ReduceRequest{
{
Keys: []string{"client1"},
Expand Down Expand Up @@ -168,14 +168,14 @@ func TestService_ReduceFn(t *testing.T) {
},
{
name: "reduce_fn_forward_msg_forward_to_all",
handler: ReducerFunc(func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages {
handler: func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages {
sum := 0
for val := range rch {
msgVal, _ := strconv.Atoi(string(val.Value()))
sum += msgVal
}
return MessagesBuilder().Append(NewMessage([]byte(strconv.Itoa(sum))))
}),
},
input: []*reducepb.ReduceRequest{
{
Keys: []string{"client"},
Expand Down Expand Up @@ -207,14 +207,14 @@ func TestService_ReduceFn(t *testing.T) {
},
{
name: "reduce_fn_forward_msg_drop_msg",
handler: ReducerFunc(func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages {
handler: func(ctx context.Context, keys []string, rch <-chan Datum, md Metadata) Messages {
sum := 0
for val := range rch {
msgVal, _ := strconv.Atoi(string(val.Value()))
sum += msgVal
}
return MessagesBuilder().Append(MessageToDrop())
}),
},
input: []*reducepb.ReduceRequest{
{
Keys: []string{"client"},
Expand Down Expand Up @@ -249,7 +249,7 @@ func TestService_ReduceFn(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fs := &Service{
Reducer: tt.handler,
CreateReduceHandler: SimpleCreatorWithReduceFn(tt.handler),
}
// here's a trick for testing:
// because we are not using gRPC, we directly set a new incoming ctx
Expand Down

0 comments on commit 0156652

Please sign in to comment.