Skip to content

Latest commit

 

History

History
84 lines (64 loc) · 1.98 KB

README.md

File metadata and controls

84 lines (64 loc) · 1.98 KB

flightorder GoDoc

This package allows to do [ordered input] -> [parallel processing] -> [ordered output] in a streaming manner.

The name was inspired by golang.org/x/sync/singleflight package.

Installation

go get github.com/go-faster/flightorder@latest

Example

package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"

	"github.com/go-faster/flightorder"
)

func main() {
	input := []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
	processingOrder, output := processInput(context.TODO(), input)
	fmt.Printf("input:     %v\n", input)
	fmt.Printf("processed: %v\n", processingOrder)
	fmt.Printf("output:    %v\n", output)
}

func processInput(ctx context.Context, input []int) (processing, output []int) {
	route := flightorder.NewRoute(flightorder.RouteParams{})

	var (
		mux sync.Mutex
		wg  sync.WaitGroup
	)

	wg.Add(len(input))
	for _, v := range input {
		ticket := route.Ticket()
		go func(t *flightorder.Ticket, v int) {
			defer wg.Done()
			time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))

			mux.Lock()
			processing = append(processing, v)
			mux.Unlock()

			_ = route.CompleteTicket(ctx, t, func(context.Context) error {
				mux.Lock()
				output = append(output, v)
				mux.Unlock()
				return nil
			})
		}(ticket, v)
	}

	wg.Wait()
	return
}

Output:

input:     [1 2 3 4 5 6 7 8 9]
processed: [3 1 9 7 6 2 5 4 8]
output:    [1 2 3 4 5 6 7 8 9]

Motivation

Sending logs from a single file to multiple kafka brokers concurrently while preserving at-least-once delivery guarantees:

  • Logs are sent to multiple kafka brokers in parallel to enhance throughput.
  • File offsets are committed in the exact order they are read to ensure at-least-once delivery guarantees and prevent data loss in case of shipper or broker failures.

License

Source code is available under Apache License 2.0