Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream IDs, proper storage prefixing, recovering from crashes. #177

Merged
merged 8 commits into from
Feb 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,16 @@ func (app *App) RunPlan(ctx context.Context, stateStorage storage.Storage, plan
return errors.Wrap(err, "couldn't materialize the physical plan into an execution plan")
}

stream, err := exec.Get(ctx, variables)
programID := &execution.StreamID{Id: ""}

tx := stateStorage.BeginTransaction()
stream, err := exec.Get(storage.InjectStateTransaction(ctx, tx), variables, programID)
if err != nil {
return errors.Wrap(err, "couldn't get record stream from execution plan")
}
if err := tx.Commit(); err != nil {
return errors.Wrap(err, "couldn't commit transaction to get record stream from execution plan")
}

var rec *execution.Record
for {
Expand Down
11 changes: 9 additions & 2 deletions execution/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package execution

import (
"github.com/cube2222/octosql"
"github.com/cube2222/octosql/streaming/storage"

"context"

Expand All @@ -16,8 +17,14 @@ func NewDistinct(child Node) *Distinct {
return &Distinct{child: child}
}

func (node *Distinct) Get(ctx context.Context, variables octosql.Variables) (RecordStream, error) {
stream, err := node.child.Get(ctx, variables)
func (node *Distinct) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, error) {
tx := storage.GetStateTransactionFromContext(ctx)
sourceStreamID, err := GetSourceStreamID(tx.WithPrefix(streamID.AsPrefix()), octosql.MakePhantom())
if err != nil {
return nil, errors.Wrap(err, "couldn't get source stream ID")
}

stream, err := node.child.Get(ctx, variables, sourceStreamID)
if err != nil {
return nil, errors.Wrap(err, "couldn't get stream for child node in distinct")
}
Expand Down
41 changes: 24 additions & 17 deletions execution/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,19 @@ type IntermediateRecordStore interface {
type PullEngine struct {
irs IntermediateRecordStore
source RecordStream
sourceStoragePrefix []byte
lastCommittedWatermark time.Time
watermarkSource WatermarkSource
storage storage.Storage
streamID *StreamID
}

func NewPullEngine(irs IntermediateRecordStore, storage storage.Storage, source RecordStream, sourceStoragePrefix []byte, watermarkSource WatermarkSource) *PullEngine {
func NewPullEngine(irs IntermediateRecordStore, storage storage.Storage, source RecordStream, streamID *StreamID, watermarkSource WatermarkSource) *PullEngine {
return &PullEngine{
irs: irs,
storage: storage,
source: source,
sourceStoragePrefix: sourceStoragePrefix,
watermarkSource: watermarkSource,
irs: irs,
storage: storage,
source: source,
streamID: streamID,
watermarkSource: watermarkSource,
}
}

Expand Down Expand Up @@ -127,43 +127,46 @@ func (engine *PullEngine) Run(ctx context.Context) {
} else if err != nil {
tx.Abort()
log.Println(err)
return // TODO: Error propagation? Add this to the underlying queue as an ErrorElement? How to do this well?
return // TODO: Error propagation? Add this to the underlying queue as an ErrorElement? How to do this well? Send it to the underlying IRS like a watermark?
}
}
}

func (engine *PullEngine) loop(ctx context.Context, tx storage.StateTransaction) error {
sourcePrefixedTx := tx.WithPrefix(engine.sourceStoragePrefix)
// This is a transaction prefixed with the current node StreamID,
// which should be used for all storage operations of this node.
// Source streams will get the raw, non-prefixed, transaction.
prefixedTx := tx.WithPrefix(engine.streamID.AsPrefix())

watermark, err := engine.watermarkSource.GetWatermark(ctx, sourcePrefixedTx)
watermark, err := engine.watermarkSource.GetWatermark(ctx, tx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a very subtle change, yet there is nothing that explains it. Could you add a comment, as to explain why the watermark is suddenly created using the original transaction, and not the prefixed one, as was in the previous version?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added necessary comments.

if err != nil {
return errors.Wrap(err, "couldn't get current watermark from source")
}
if watermark.After(engine.lastCommittedWatermark) {
err := engine.irs.UpdateWatermark(ctx, tx, watermark)
err := engine.irs.UpdateWatermark(ctx, prefixedTx, watermark)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here - these changes need commenting in my opinion

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added necessary comments.

if err != nil {
return errors.Wrap(err, "couldn't update watermark in intermediate record store")
}
engine.lastCommittedWatermark = watermark // TODO: last _commited_ watermark :( this is not committed
return nil
}

record, err := engine.source.Next(storage.InjectStateTransaction(ctx, sourcePrefixedTx))
record, err := engine.source.Next(storage.InjectStateTransaction(ctx, tx))
if err != nil {
if err == ErrEndOfStream {
err := engine.irs.UpdateWatermark(ctx, tx, maxWatermark)
err := engine.irs.UpdateWatermark(ctx, prefixedTx, maxWatermark)
if err != nil {
return errors.Wrap(err, "couldn't mark end of stream max watermark in intermediate record store")
}
err = engine.irs.MarkEndOfStream(ctx, tx)
err = engine.irs.MarkEndOfStream(ctx, prefixedTx)
if err != nil {
return errors.Wrap(err, "couldn't mark end of stream in intermediate record store")
}
return ErrEndOfStream
}
return errors.Wrap(err, "couldn't get next record")
}
err = engine.irs.AddRecord(ctx, tx, 0, record)
err = engine.irs.AddRecord(ctx, prefixedTx, 0, record)
if err != nil {
return errors.Wrap(err, "couldn't add record to intermediate record store")
}
Expand All @@ -173,7 +176,9 @@ func (engine *PullEngine) loop(ctx context.Context, tx storage.StateTransaction)

func (engine *PullEngine) Next(ctx context.Context) (*Record, error) {
tx := storage.GetStateTransactionFromContext(ctx)
rec, err := engine.irs.Next(ctx, tx)
prefixedTx := tx.WithPrefix(engine.streamID.AsPrefix())

rec, err := engine.irs.Next(ctx, prefixedTx)
if err != nil {
if err == ErrEndOfStream {
return nil, ErrEndOfStream
Expand All @@ -184,7 +189,9 @@ func (engine *PullEngine) Next(ctx context.Context) (*Record, error) {
}

func (engine *PullEngine) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error) {
return engine.irs.GetWatermark(ctx, tx)
prefixedTx := tx.WithPrefix(engine.streamID.AsPrefix())

return engine.irs.GetWatermark(ctx, prefixedTx)
}

func (engine *PullEngine) Close() error {
Expand Down
4 changes: 2 additions & 2 deletions execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type Node interface {
Get(ctx context.Context, variables octosql.Variables) (RecordStream, error)
Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, error)
}

type Expression interface {
Expand Down Expand Up @@ -71,7 +71,7 @@ func NewNodeExpression(node Node) *NodeExpression {
}

func (ne *NodeExpression) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error) {
records, err := ne.node.Get(ctx, variables)
records, err := ne.node.Get(ctx, variables, GetRawStreamID()) // TODO: Think about this.
if err != nil {
return octosql.ZeroValue(), errors.Wrap(err, "couldn't get record stream")
}
Expand Down
78 changes: 78 additions & 0 deletions execution/execution.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions execution/execution.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
syntax = "proto3";
package execution;
option go_package = "github.com/cube2222/octosql/execution";

// StreamID is a unique identifier for a RecordStream node.
// This StreamID should prefix all state storage keys this node uses.
message StreamID {
string id = 1;
}
15 changes: 11 additions & 4 deletions execution/filter.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package execution

import (
"github.com/cube2222/octosql"

"context"

"github.com/cube2222/octosql"
"github.com/cube2222/octosql/streaming/storage"

"github.com/pkg/errors"
)

Expand All @@ -17,8 +18,14 @@ func NewFilter(formula Formula, child Node) *Filter {
return &Filter{formula: formula, source: child}
}

func (node *Filter) Get(ctx context.Context, variables octosql.Variables) (RecordStream, error) {
recordStream, err := node.source.Get(ctx, variables)
func (node *Filter) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, error) {
tx := storage.GetStateTransactionFromContext(ctx)
sourceStreamID, err := GetSourceStreamID(tx.WithPrefix(streamID.AsPrefix()), octosql.MakePhantom())
if err != nil {
return nil, errors.Wrap(err, "couldn't get source stream ID")
}

recordStream, err := node.source.Get(ctx, variables, sourceStreamID)
if err != nil {
return nil, errors.Wrap(err, "couldn't get record stream")
}
Expand Down
24 changes: 14 additions & 10 deletions execution/group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ type Aggregate interface {
type TriggerPrototype func(ctx context.Context, variables octosql.Variables) (Trigger, error)

type GroupBy struct {
storage storage.Storage
source Node
sourceStoragePrefix []byte
key []Expression
storage storage.Storage
source Node
key []Expression

fields []octosql.VariableName
aggregatePrototypes []AggregatePrototype
Expand All @@ -37,12 +36,18 @@ type GroupBy struct {
triggerPrototype TriggerPrototype
}

func NewGroupBy(storage storage.Storage, source Node, sourceStoragePrefix []byte, key []Expression, fields []octosql.VariableName, aggregatePrototypes []AggregatePrototype, eventTimeField octosql.VariableName, as []octosql.VariableName, outEventTimeField octosql.VariableName, triggerPrototype TriggerPrototype) *GroupBy {
return &GroupBy{storage: storage, source: source, sourceStoragePrefix: sourceStoragePrefix, key: key, fields: fields, aggregatePrototypes: aggregatePrototypes, eventTimeField: eventTimeField, as: as, outEventTimeField: outEventTimeField, triggerPrototype: triggerPrototype}
func NewGroupBy(storage storage.Storage, source Node, key []Expression, fields []octosql.VariableName, aggregatePrototypes []AggregatePrototype, eventTimeField octosql.VariableName, as []octosql.VariableName, outEventTimeField octosql.VariableName, triggerPrototype TriggerPrototype) *GroupBy {
return &GroupBy{storage: storage, source: source, key: key, fields: fields, aggregatePrototypes: aggregatePrototypes, eventTimeField: eventTimeField, as: as, outEventTimeField: outEventTimeField, triggerPrototype: triggerPrototype}
}

func (node *GroupBy) Get(ctx context.Context, variables octosql.Variables) (RecordStream, error) {
source, err := node.source.Get(ctx, variables)
func (node *GroupBy) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, error) {
tx := storage.GetStateTransactionFromContext(ctx)
sourceStreamID, err := GetSourceStreamID(tx.WithPrefix(streamID.AsPrefix()), octosql.MakePhantom())
if err != nil {
return nil, errors.Wrap(err, "couldn't get source stream ID")
}

source, err := node.source.Get(ctx, variables, sourceStreamID)
if err != nil {
return nil, errors.Wrap(err, "couldn't get stream for source in group by")
}
Expand Down Expand Up @@ -85,14 +90,13 @@ func (node *GroupBy) Get(ctx context.Context, variables octosql.Variables) (Reco
outputFieldNames: outputFieldNames,
}
processFunc := &ProcessByKey{
stateStorage: node.storage,
eventTimeField: node.eventTimeField,
trigger: trigger,
keyExpression: node.key,
processFunction: groupBy,
variables: variables,
}
groupByPullEngine := NewPullEngine(processFunc, node.storage, source, node.sourceStoragePrefix, &ZeroWatermarkSource{})
groupByPullEngine := NewPullEngine(processFunc, node.storage, source, streamID, &ZeroWatermarkSource{})
go groupByPullEngine.Run(ctx) // TODO: .Close() should kill this context and the goroutine.

return groupByPullEngine, nil
Expand Down
Loading