Skip to content

Commit

Permalink
Merge pull request #5 from ettec/kafka-store-fix
Browse files Browse the repository at this point in the history
kafka store reading orders fix - channel not returned
  • Loading branch information
ettec authored Nov 28, 2023
2 parents 9883eb3 + 7b41d46 commit 2a72a40
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 13 deletions.
7 changes: 3 additions & 4 deletions k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"strconv"
)

func GetServiceAddress( appLabel string) (string, error) {
func GetServiceAddress(appLabel string) (string, error) {
clientSet := GetK8sClientSet(false)

namespace := "default"
Expand All @@ -27,7 +27,7 @@ func GetServiceAddress( appLabel string) (string, error) {
})

if err != nil {
return "", err
return "", err
}

if len(list.Items) != 1 {
Expand All @@ -48,10 +48,9 @@ func GetServiceAddress( appLabel string) (string, error) {
}

targetAddress := service.Name + ":" + strconv.Itoa(int(podPort))
return targetAddress, nil
return targetAddress, nil
}


func GetK8sClientSet(external bool) *kubernetes.Clientset {
var clientSet *kubernetes.Clientset
if external {
Expand Down
6 changes: 2 additions & 4 deletions model/listing.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ func (m *Listing) RoundToNearestTick(price float64) (*Decimal64, error) {
return nil, fmt.Errorf("no tick table entry for price:%v", price)
}


func (m *Listing) GetTickSizeForPriceLevel( price float64) (*Decimal64, error) {
func (m *Listing) GetTickSizeForPriceLevel(price float64) (*Decimal64, error) {

for _, entry := range m.TickSize.Entries {
delta := entry.TickSize.ToFloat() / 1000
Expand All @@ -35,14 +34,13 @@ func (m *Listing) GetTickSizeForPriceLevel( price float64) (*Decimal64, error)

if compare(price, lowerBound, delta) >= 0 &&
compare(price, upperBound, delta) <= 0 {
return entry.TickSize, nil
return entry.TickSize, nil
}
}

return nil, fmt.Errorf("no tick table entry for price:%v", price)
}


func compare(f1 float64, f2 float64, delta float64) int {

diff := f1 - f2
Expand Down
3 changes: 1 addition & 2 deletions model/listing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package model

import "testing"

var tickSizeTable = &TickSizeTable{
var tickSizeTable = &TickSizeTable{
Entries: []*TickSizeEntry{
{
LowerPriceBound: &Decimal64{Mantissa: -100, Exponent: 0},
Expand Down Expand Up @@ -63,7 +63,6 @@ func TestListing_GetTickSizeForPriceLevel(t *testing.T) {
})
}


}

func TestListing_RoundToTickSize(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions orderstore/kafkastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (ks *KafkaStore) SubscribeToAllOrders(ctx context.Context, createdAfter tim
return nil, nil, fmt.Errorf("failed to restore order state from store: %w", err)
}

outChan := make(chan<- *model.Order, bufferSize)
outChan := make(chan *model.Order, bufferSize)
go func() {
defer func() {
close(outChan)
Expand All @@ -107,7 +107,7 @@ func (ks *KafkaStore) SubscribeToAllOrders(ctx context.Context, createdAfter tim
}
}()
for {
msg, err := reader.ReadMessage(context.Background())
msg, err := reader.ReadMessage(ctx)
if err != nil {
slog.Error("failed to read message", "error", err)
return
Expand All @@ -123,7 +123,7 @@ func (ks *KafkaStore) SubscribeToAllOrders(ctx context.Context, createdAfter tim
}
}()

return initialState, nil, nil
return initialState, outChan, nil

}

Expand Down

0 comments on commit 2a72a40

Please sign in to comment.