-
Notifications
You must be signed in to change notification settings - Fork 1
/
influxdb.go
79 lines (67 loc) · 2.35 KB
/
influxdb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package main
import (
"github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
cdns "github.com/niclabs/dnszeppelin"
"log"
"sync"
"time"
)
type Database interface{
createClient(d *database)
createApi(d *database)
InfluxCollect(resultChannel chan cdns.DNSResult, exiting chan bool, wg *sync.WaitGroup, wsize, batchSize uint, m *maps)
InfluxStore(m *maps, batch []cdns.DNSResult) error
StoreEachMap(mapa map[string]int, tipo1, tipo2 string , now time.Time)
}
type DefaultDB struct{
influxdb string
influxtoken string
influxorg string
influxbucket string
}
type database struct{
c influxdb2.Client
api api.WriteAPI
}
func (db DefaultDB) createClient(d *database){
client := influxdb2.NewClient(db.influxdb, db.influxtoken)
d.c = client
}
func (db DefaultDB) createApi(d *database){
api := (d.c).WriteAPI(db.influxorg, db.influxbucket)
d.api = api
}
func (d database) InfluxCollect(resultChannel chan cdns.DNSResult, exiting chan bool, wg *sync.WaitGroup, wsize, batchSize uint, m *maps){
wg.Add(1)
defer wg.Done()
// Connect to InfluxDB
client := d.c
defer client.Close()
batch := make([]cdns.DNSResult, 0, batchSize)
ticker := time.NewTicker(time.Duration(wsize) * time.Second)
defer ticker.Stop()
for {
select {
case data := <-resultChannel:
batch = append(batch, data)
case <-ticker.C:
err := InfluxAgg(batch, m)
err1 := d.InfluxStore(m, batch)
if err != nil {
log.Fatal("Error writing to DB:", err)
exiting <- true
return
} else if err1 != nil{
log.Fatal("Error writing to DB:", err1)
exiting <- true
return
} else {
batch = make([]cdns.DNSResult, 0, batchSize)
}
case <-exiting:
exiting <- true
return
}
}
}