From 7b41d465cd7426384b15dc199aee98373b2055a7 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Tue, 28 Nov 2023 15:10:06 +0000 Subject: [PATCH] kafka store reading orders fix - channel not returned --- k8s/k8s.go | 7 +++---- model/listing.go | 6 ++---- model/listing_test.go | 3 +-- orderstore/kafkastore.go | 6 +++--- 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/k8s/k8s.go b/k8s/k8s.go index d75abe1..8b916db 100644 --- a/k8s/k8s.go +++ b/k8s/k8s.go @@ -17,7 +17,7 @@ import ( "strconv" ) -func GetServiceAddress( appLabel string) (string, error) { +func GetServiceAddress(appLabel string) (string, error) { clientSet := GetK8sClientSet(false) namespace := "default" @@ -27,7 +27,7 @@ func GetServiceAddress( appLabel string) (string, error) { }) if err != nil { - return "", err + return "", err } if len(list.Items) != 1 { @@ -48,10 +48,9 @@ func GetServiceAddress( appLabel string) (string, error) { } targetAddress := service.Name + ":" + strconv.Itoa(int(podPort)) - return targetAddress, nil + return targetAddress, nil } - func GetK8sClientSet(external bool) *kubernetes.Clientset { var clientSet *kubernetes.Clientset if external { diff --git a/model/listing.go b/model/listing.go index 1e7d1b5..35462e2 100644 --- a/model/listing.go +++ b/model/listing.go @@ -25,8 +25,7 @@ func (m *Listing) RoundToNearestTick(price float64) (*Decimal64, error) { return nil, fmt.Errorf("no tick table entry for price:%v", price) } - -func (m *Listing) GetTickSizeForPriceLevel( price float64) (*Decimal64, error) { +func (m *Listing) GetTickSizeForPriceLevel(price float64) (*Decimal64, error) { for _, entry := range m.TickSize.Entries { delta := entry.TickSize.ToFloat() / 1000 @@ -35,14 +34,13 @@ func (m *Listing) GetTickSizeForPriceLevel( price float64) (*Decimal64, error) if compare(price, lowerBound, delta) >= 0 && compare(price, upperBound, delta) <= 0 { - return entry.TickSize, nil + return entry.TickSize, nil } } return nil, fmt.Errorf("no tick table entry for price:%v", price) } - func compare(f1 float64, f2 float64, delta float64) int { diff := f1 - f2 diff --git a/model/listing_test.go b/model/listing_test.go index 09872ed..4db885d 100644 --- a/model/listing_test.go +++ b/model/listing_test.go @@ -2,7 +2,7 @@ package model import "testing" -var tickSizeTable = &TickSizeTable{ +var tickSizeTable = &TickSizeTable{ Entries: []*TickSizeEntry{ { LowerPriceBound: &Decimal64{Mantissa: -100, Exponent: 0}, @@ -63,7 +63,6 @@ func TestListing_GetTickSizeForPriceLevel(t *testing.T) { }) } - } func TestListing_RoundToTickSize(t *testing.T) { diff --git a/orderstore/kafkastore.go b/orderstore/kafkastore.go index a20597e..e35f167 100644 --- a/orderstore/kafkastore.go +++ b/orderstore/kafkastore.go @@ -98,7 +98,7 @@ func (ks *KafkaStore) SubscribeToAllOrders(ctx context.Context, createdAfter tim return nil, nil, fmt.Errorf("failed to restore order state from store: %w", err) } - outChan := make(chan<- *model.Order, bufferSize) + outChan := make(chan *model.Order, bufferSize) go func() { defer func() { close(outChan) @@ -107,7 +107,7 @@ func (ks *KafkaStore) SubscribeToAllOrders(ctx context.Context, createdAfter tim } }() for { - msg, err := reader.ReadMessage(context.Background()) + msg, err := reader.ReadMessage(ctx) if err != nil { slog.Error("failed to read message", "error", err) return @@ -123,7 +123,7 @@ func (ks *KafkaStore) SubscribeToAllOrders(ctx context.Context, createdAfter tim } }() - return initialState, nil, nil + return initialState, outChan, nil }