-
Notifications
You must be signed in to change notification settings - Fork 17
/
baker.go
59 lines (45 loc) · 1.59 KB
/
baker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/*
Package baker provides types and functions to build a pipeline for the processing of structured data.
Structured data is represented by the Record interface. LogLine implements that interface and
represents a csv record.
Using the functions in the package one can build and run a Topology, reading its configuration
from a TOML file.
The package doesn't include any component. They can be found in their respective packages
(./input, ./filter, ./output and ./upload).
The README file in the project repository provides additional information and examples:
https://github.com/AdRoll/baker/blob/main/README.md
*/
package baker
import (
"fmt"
"github.com/sirupsen/logrus"
)
// Main runs the topology corresponding to the provided configuration.
// Depending on the input, it either blocks forever (daemon) or terminates when
// all the records have been processed (batch).
func Main(cfg *Config) error {
topology, err := NewTopologyFromConfig(cfg)
if err != nil {
return fmt.Errorf("can't create topology: %s", err)
}
// Start the topology
topology.Start()
// Now setup the wait condition for exiting the process in case the topology
// ends by itself.
topdone := make(chan bool)
go func() {
topology.Wait()
topdone <- true
}()
// Begin dump statistics
stats := NewStatsDumper(topology)
stopStats := stats.Run()
// Block until topology termination.
<-topdone
// Stop the stats dumping goroutine (this also prints stats one last time).
stopStats()
if err := topology.Metrics.Close(); err != nil {
logrus.WithError(err).Warnf("error closing metrics client")
}
return topology.Error()
}