Skip to content

Commit

Permalink
Merge pull request #177 from cube2222/stream-ids
Browse files Browse the repository at this point in the history
Add stream IDs, proper storage prefixing, recovering from crashes.
  • Loading branch information
cube2222 authored Feb 16, 2020
2 parents ec8af60 + d949e63 commit 4fcd990
Show file tree
Hide file tree
Showing 48 changed files with 418 additions and 140 deletions.
8 changes: 7 additions & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,16 @@ func (app *App) RunPlan(ctx context.Context, stateStorage storage.Storage, plan
return errors.Wrap(err, "couldn't materialize the physical plan into an execution plan")
}

stream, err := exec.Get(ctx, variables)
programID := &execution.StreamID{Id: ""}

tx := stateStorage.BeginTransaction()
stream, err := exec.Get(storage.InjectStateTransaction(ctx, tx), variables, programID)
if err != nil {
return errors.Wrap(err, "couldn't get record stream from execution plan")
}
if err := tx.Commit(); err != nil {
return errors.Wrap(err, "couldn't commit transaction to get record stream from execution plan")
}

var rec *execution.Record
for {
Expand Down
11 changes: 9 additions & 2 deletions execution/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package execution

import (
"github.com/cube2222/octosql"
"github.com/cube2222/octosql/streaming/storage"

"context"

Expand All @@ -16,8 +17,14 @@ func NewDistinct(child Node) *Distinct {
return &Distinct{child: child}
}

func (node *Distinct) Get(ctx context.Context, variables octosql.Variables) (RecordStream, error) {
stream, err := node.child.Get(ctx, variables)
func (node *Distinct) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, error) {
tx := storage.GetStateTransactionFromContext(ctx)
sourceStreamID, err := GetSourceStreamID(tx.WithPrefix(streamID.AsPrefix()), octosql.MakePhantom())
if err != nil {
return nil, errors.Wrap(err, "couldn't get source stream ID")
}

stream, err := node.child.Get(ctx, variables, sourceStreamID)
if err != nil {
return nil, errors.Wrap(err, "couldn't get stream for child node in distinct")
}
Expand Down
41 changes: 24 additions & 17 deletions execution/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,19 @@ type IntermediateRecordStore interface {
type PullEngine struct {
irs IntermediateRecordStore
source RecordStream
sourceStoragePrefix []byte
lastCommittedWatermark time.Time
watermarkSource WatermarkSource
storage storage.Storage
streamID *StreamID
}

func NewPullEngine(irs IntermediateRecordStore, storage storage.Storage, source RecordStream, sourceStoragePrefix []byte, watermarkSource WatermarkSource) *PullEngine {
func NewPullEngine(irs IntermediateRecordStore, storage storage.Storage, source RecordStream, streamID *StreamID, watermarkSource WatermarkSource) *PullEngine {
return &PullEngine{
irs: irs,
storage: storage,
source: source,
sourceStoragePrefix: sourceStoragePrefix,
watermarkSource: watermarkSource,
irs: irs,
storage: storage,
source: source,
streamID: streamID,
watermarkSource: watermarkSource,
}
}

Expand Down Expand Up @@ -127,43 +127,46 @@ func (engine *PullEngine) Run(ctx context.Context) {
} else if err != nil {
tx.Abort()
log.Println(err)
return // TODO: Error propagation? Add this to the underlying queue as an ErrorElement? How to do this well?
return // TODO: Error propagation? Add this to the underlying queue as an ErrorElement? How to do this well? Send it to the underlying IRS like a watermark?
}
}
}

func (engine *PullEngine) loop(ctx context.Context, tx storage.StateTransaction) error {
sourcePrefixedTx := tx.WithPrefix(engine.sourceStoragePrefix)
// This is a transaction prefixed with the current node StreamID,
// which should be used for all storage operations of this node.
// Source streams will get the raw, non-prefixed, transaction.
prefixedTx := tx.WithPrefix(engine.streamID.AsPrefix())

watermark, err := engine.watermarkSource.GetWatermark(ctx, sourcePrefixedTx)
watermark, err := engine.watermarkSource.GetWatermark(ctx, tx)
if err != nil {
return errors.Wrap(err, "couldn't get current watermark from source")
}
if watermark.After(engine.lastCommittedWatermark) {
err := engine.irs.UpdateWatermark(ctx, tx, watermark)
err := engine.irs.UpdateWatermark(ctx, prefixedTx, watermark)
if err != nil {
return errors.Wrap(err, "couldn't update watermark in intermediate record store")
}
engine.lastCommittedWatermark = watermark // TODO: last _commited_ watermark :( this is not committed
return nil
}

record, err := engine.source.Next(storage.InjectStateTransaction(ctx, sourcePrefixedTx))
record, err := engine.source.Next(storage.InjectStateTransaction(ctx, tx))
if err != nil {
if err == ErrEndOfStream {
err := engine.irs.UpdateWatermark(ctx, tx, maxWatermark)
err := engine.irs.UpdateWatermark(ctx, prefixedTx, maxWatermark)
if err != nil {
return errors.Wrap(err, "couldn't mark end of stream max watermark in intermediate record store")
}
err = engine.irs.MarkEndOfStream(ctx, tx)
err = engine.irs.MarkEndOfStream(ctx, prefixedTx)
if err != nil {
return errors.Wrap(err, "couldn't mark end of stream in intermediate record store")
}
return ErrEndOfStream
}
return errors.Wrap(err, "couldn't get next record")
}
err = engine.irs.AddRecord(ctx, tx, 0, record)
err = engine.irs.AddRecord(ctx, prefixedTx, 0, record)
if err != nil {
return errors.Wrap(err, "couldn't add record to intermediate record store")
}
Expand All @@ -173,7 +176,9 @@ func (engine *PullEngine) loop(ctx context.Context, tx storage.StateTransaction)

func (engine *PullEngine) Next(ctx context.Context) (*Record, error) {
tx := storage.GetStateTransactionFromContext(ctx)
rec, err := engine.irs.Next(ctx, tx)
prefixedTx := tx.WithPrefix(engine.streamID.AsPrefix())

rec, err := engine.irs.Next(ctx, prefixedTx)
if err != nil {
if err == ErrEndOfStream {
return nil, ErrEndOfStream
Expand All @@ -184,7 +189,9 @@ func (engine *PullEngine) Next(ctx context.Context) (*Record, error) {
}

func (engine *PullEngine) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error) {
return engine.irs.GetWatermark(ctx, tx)
prefixedTx := tx.WithPrefix(engine.streamID.AsPrefix())

return engine.irs.GetWatermark(ctx, prefixedTx)
}

func (engine *PullEngine) Close() error {
Expand Down
4 changes: 2 additions & 2 deletions execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type Node interface {
Get(ctx context.Context, variables octosql.Variables) (RecordStream, error)
Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, error)
}

type Expression interface {
Expand Down Expand Up @@ -71,7 +71,7 @@ func NewNodeExpression(node Node) *NodeExpression {
}

func (ne *NodeExpression) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error) {
records, err := ne.node.Get(ctx, variables)
records, err := ne.node.Get(ctx, variables, GetRawStreamID()) // TODO: Think about this.
if err != nil {
return octosql.ZeroValue(), errors.Wrap(err, "couldn't get record stream")
}
Expand Down
78 changes: 78 additions & 0 deletions execution/execution.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions execution/execution.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
syntax = "proto3";
package execution;
option go_package = "github.com/cube2222/octosql/execution";

// StreamID is a unique identifier for a RecordStream node.
// This StreamID should prefix all state storage keys this node uses.
message StreamID {
string id = 1;
}
15 changes: 11 additions & 4 deletions execution/filter.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package execution

import (
"github.com/cube2222/octosql"

"context"

"github.com/cube2222/octosql"
"github.com/cube2222/octosql/streaming/storage"

"github.com/pkg/errors"
)

Expand All @@ -17,8 +18,14 @@ func NewFilter(formula Formula, child Node) *Filter {
return &Filter{formula: formula, source: child}
}

func (node *Filter) Get(ctx context.Context, variables octosql.Variables) (RecordStream, error) {
recordStream, err := node.source.Get(ctx, variables)
func (node *Filter) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, error) {
tx := storage.GetStateTransactionFromContext(ctx)
sourceStreamID, err := GetSourceStreamID(tx.WithPrefix(streamID.AsPrefix()), octosql.MakePhantom())
if err != nil {
return nil, errors.Wrap(err, "couldn't get source stream ID")
}

recordStream, err := node.source.Get(ctx, variables, sourceStreamID)
if err != nil {
return nil, errors.Wrap(err, "couldn't get record stream")
}
Expand Down
24 changes: 14 additions & 10 deletions execution/group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ type Aggregate interface {
type TriggerPrototype func(ctx context.Context, variables octosql.Variables) (Trigger, error)

type GroupBy struct {
storage storage.Storage
source Node
sourceStoragePrefix []byte
key []Expression
storage storage.Storage
source Node
key []Expression

fields []octosql.VariableName
aggregatePrototypes []AggregatePrototype
Expand All @@ -37,12 +36,18 @@ type GroupBy struct {
triggerPrototype TriggerPrototype
}

func NewGroupBy(storage storage.Storage, source Node, sourceStoragePrefix []byte, key []Expression, fields []octosql.VariableName, aggregatePrototypes []AggregatePrototype, eventTimeField octosql.VariableName, as []octosql.VariableName, outEventTimeField octosql.VariableName, triggerPrototype TriggerPrototype) *GroupBy {
return &GroupBy{storage: storage, source: source, sourceStoragePrefix: sourceStoragePrefix, key: key, fields: fields, aggregatePrototypes: aggregatePrototypes, eventTimeField: eventTimeField, as: as, outEventTimeField: outEventTimeField, triggerPrototype: triggerPrototype}
func NewGroupBy(storage storage.Storage, source Node, key []Expression, fields []octosql.VariableName, aggregatePrototypes []AggregatePrototype, eventTimeField octosql.VariableName, as []octosql.VariableName, outEventTimeField octosql.VariableName, triggerPrototype TriggerPrototype) *GroupBy {
return &GroupBy{storage: storage, source: source, key: key, fields: fields, aggregatePrototypes: aggregatePrototypes, eventTimeField: eventTimeField, as: as, outEventTimeField: outEventTimeField, triggerPrototype: triggerPrototype}
}

func (node *GroupBy) Get(ctx context.Context, variables octosql.Variables) (RecordStream, error) {
source, err := node.source.Get(ctx, variables)
func (node *GroupBy) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, error) {
tx := storage.GetStateTransactionFromContext(ctx)
sourceStreamID, err := GetSourceStreamID(tx.WithPrefix(streamID.AsPrefix()), octosql.MakePhantom())
if err != nil {
return nil, errors.Wrap(err, "couldn't get source stream ID")
}

source, err := node.source.Get(ctx, variables, sourceStreamID)
if err != nil {
return nil, errors.Wrap(err, "couldn't get stream for source in group by")
}
Expand Down Expand Up @@ -85,14 +90,13 @@ func (node *GroupBy) Get(ctx context.Context, variables octosql.Variables) (Reco
outputFieldNames: outputFieldNames,
}
processFunc := &ProcessByKey{
stateStorage: node.storage,
eventTimeField: node.eventTimeField,
trigger: trigger,
keyExpression: node.key,
processFunction: groupBy,
variables: variables,
}
groupByPullEngine := NewPullEngine(processFunc, node.storage, source, node.sourceStoragePrefix, &ZeroWatermarkSource{})
groupByPullEngine := NewPullEngine(processFunc, node.storage, source, streamID, &ZeroWatermarkSource{})
go groupByPullEngine.Run(ctx) // TODO: .Close() should kill this context and the goroutine.

return groupByPullEngine, nil
Expand Down
Loading

0 comments on commit 4fcd990

Please sign in to comment.