forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
graphite.go
122 lines (109 loc) · 2.79 KB
/
graphite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package graphite
import (
"errors"
"log"
"math/rand"
"net"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)
type Graphite struct {
// URL is only for backwards compatability
Servers []string
Prefix string
Template string
Timeout int
conns []net.Conn
}
var sampleConfig = `
## TCP endpoint for your graphite instance.
## If multiple endpoints are configured, output will be load balanced.
## Only one of the endpoints will be written to with each iteration.
servers = ["localhost:2003"]
## Prefix metrics name
prefix = ""
## Graphite output template
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
template = "host.tags.measurement.field"
## timeout in seconds for the write connection to graphite
timeout = 2
`
func (g *Graphite) Connect() error {
// Set default values
if g.Timeout <= 0 {
g.Timeout = 2
}
if len(g.Servers) == 0 {
g.Servers = append(g.Servers, "localhost:2003")
}
// Get Connections
var conns []net.Conn
for _, server := range g.Servers {
conn, err := net.DialTimeout("tcp", server, time.Duration(g.Timeout)*time.Second)
if err == nil {
conns = append(conns, conn)
}
}
g.conns = conns
return nil
}
func (g *Graphite) Close() error {
// Closing all connections
for _, conn := range g.conns {
conn.Close()
}
return nil
}
func (g *Graphite) SampleConfig() string {
return sampleConfig
}
func (g *Graphite) Description() string {
return "Configuration for Graphite server to send metrics to"
}
// Choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error.
func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Prepare data
var batch []byte
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template)
if err != nil {
return err
}
for _, metric := range metrics {
buf, err := s.Serialize(metric)
if err != nil {
log.Printf("E! Error serializing some metrics to graphite: %s", err.Error())
}
batch = append(batch, buf...)
}
// This will get set to nil if a successful write occurs
err = errors.New("Could not write to any Graphite server in cluster\n")
// Send data to a random server
p := rand.Perm(len(g.conns))
for _, n := range p {
if g.Timeout > 0 {
g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
}
if _, e := g.conns[n].Write(batch); e != nil {
// Error
log.Println("E! Graphite Error: " + e.Error())
// Let's try the next one
} else {
// Success
err = nil
break
}
}
// try to reconnect
if err != nil {
g.Connect()
}
return err
}
func init() {
outputs.Add("graphite", func() telegraf.Output {
return &Graphite{}
})
}