-
Notifications
You must be signed in to change notification settings - Fork 16
/
main.go
132 lines (104 loc) · 3.62 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright Contributors to the Open Cluster Management project
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"
"github.com/stolostron/search-collector/pkg/config"
"github.com/stolostron/search-collector/pkg/informer"
lease "github.com/stolostron/search-collector/pkg/lease"
rec "github.com/stolostron/search-collector/pkg/reconciler"
tr "github.com/stolostron/search-collector/pkg/transforms"
"github.com/golang/glog"
"github.com/stolostron/search-collector/pkg/send"
"k8s.io/apimachinery/pkg/util/wait"
)
const (
AddonName = "search-collector"
LeaseDurationSeconds = 60
)
// getMainContext returns a context that is canceled on SIGINT or SIGTERM signals. If a second signal is received,
// it exits directly.
// This was inspired by controller-runtime.
func getMainContext() context.Context {
ctx, cancel := context.WithCancel(context.Background())
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
cancel()
<-c
os.Exit(1) // Second signal. Exit directly.
}()
return ctx
}
func main() {
// init logs
flag.Parse()
// Glog by default logs to a file. Change it so that by default it all goes to stderr. (no option for stdout).
err := flag.Lookup("logtostderr").Value.Set("true")
if err != nil {
fmt.Println("Error setting default flag:", err) // Uses fmt.Println in case something is wrong with glog args
os.Exit(1)
}
defer glog.Flush() // This should ensure that everything makes it out on to the console if the program crashes.
// determine number of CPUs available.
// We make that many goroutines for transformation and reconciliation,
// so that we take maximum advantage of whatever hardware we're on
numThreads := runtime.NumCPU()
glog.Info("Starting Search Collector")
if commit, ok := os.LookupEnv("VCS_REF"); ok {
glog.Info("Built from git commit: ", commit)
}
if !config.Cfg.DeployedInHub {
leaseReconciler := lease.LeaseReconciler{
HubKubeClient: config.GetKubeClient(config.Cfg.AggregatorConfig),
LocalKubeClient: config.GetKubeClient(config.GetKubeConfig()),
LeaseName: AddonName,
ClusterName: config.Cfg.ClusterName,
LeaseDurationSeconds: int32(LeaseDurationSeconds),
}
glog.Info("Create/Update lease for search")
go wait.Forever(leaseReconciler.Reconcile, time.Duration(leaseReconciler.LeaseDurationSeconds)*time.Second)
}
// Create input channel
transformChannel := make(chan *tr.Event)
// Create transformers
upsertTransformer := tr.NewTransformer(transformChannel, make(chan tr.NodeEvent), numThreads)
// Init reconciler
reconciler := rec.NewReconciler()
reconciler.Input = upsertTransformer.Output
// Create Sender, attached to transformer
sender := send.NewSender(reconciler, config.Cfg.AggregatorURL, config.Cfg.ClusterName)
informersInitialized := make(chan interface{})
mainCtx := getMainContext()
wg := sync.WaitGroup{}
wg.Add(1)
// Start a routine to keep our informers up to date.
go func() {
err := informer.RunInformers(mainCtx, informersInitialized, upsertTransformer, reconciler)
if err != nil {
glog.Errorf("Failed to run the informers: %v", err)
os.Exit(1)
}
wg.Done()
}()
// Wait here until informers have collected the full state of the cluster.
// The initial payload must have the complete state to avoid unecessary deletion
// and recreate of existing rows in the database during the resync.
glog.Info("Waiting for informers to load initial state.")
<-informersInitialized
glog.Info("Starting the sender.")
wg.Add(1)
go func() {
sender.StartSendLoop(mainCtx)
wg.Done()
}()
wg.Wait()
}