Skip to content

Commit

Permalink
Add UDP as a sink protocol (#95)
Browse files Browse the repository at this point in the history
* sink: add UDP support

This commit adds support for UDP statsd sinks with the STATSD_PROTOCOL
environment variable and the WithStatsdProtocol() SinkOption.

* tests: don't run TestHTTPHandler_WrapResponse in parallel

This test is incredibly fast so doesn't need to run in parallel and I'm
trying to reduce the number of goroutines we have in flight at any time
during out tests.
  • Loading branch information
charlievieth authored Apr 16, 2020
1 parent 079323d commit 02891db
Show file tree
Hide file tree
Showing 6 changed files with 628 additions and 381 deletions.
123 changes: 78 additions & 45 deletions tcp_sink.go → net_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,85 +20,117 @@ const (

flushInterval = time.Second
logOnEveryNDroppedBytes = 1 << 15 // Log once per 32kb of dropped stats
defaultBufferSize = 1 << 16
approxMaxMemBytes = 1 << 22
chanSize = approxMaxMemBytes / defaultBufferSize
defaultBufferSizeTCP = 1 << 16

// 1432 bytes is optimal for regular networks with an MTU of 1500 and
// is to prevent fragmenting UDP datagrams
defaultBufferSizeUDP = 1432

approxMaxMemBytes = 1 << 22
chanSize = approxMaxMemBytes / defaultBufferSizeTCP
)

// An SinkOption configures a Sink.
type SinkOption interface {
apply(*tcpStatsdSink)
apply(*netSink)
}

// sinkOptionFunc wraps a func so it satisfies the Option interface.
type sinkOptionFunc func(*tcpStatsdSink)
type sinkOptionFunc func(*netSink)

func (f sinkOptionFunc) apply(sink *tcpStatsdSink) {
func (f sinkOptionFunc) apply(sink *netSink) {
f(sink)
}

// WithStatsdHost sets the host of the statsd sink otherwise the host is
// read from the environment variable "STATSD_HOST".
func WithStatsdHost(host string) SinkOption {
return sinkOptionFunc(func(sink *tcpStatsdSink) {
sink.statsdHost = host
return sinkOptionFunc(func(sink *netSink) {
sink.conf.StatsdHost = host
})
}

// WithStatsdProtocol sets the network protocol ("udp" or "tcp") of the statsd
// sink otherwise the protocol is read from the environment variable
// "STATSD_PROTOCOL".
func WithStatsdProtocol(protocol string) SinkOption {
return sinkOptionFunc(func(sink *netSink) {
sink.conf.StatsdProtocol = protocol
})
}

// WithStatsdPort sets the port of the statsd sink otherwise the port is
// read from the environment variable "STATSD_PORT".
func WithStatsdPort(port int) SinkOption {
return sinkOptionFunc(func(sink *tcpStatsdSink) {
sink.statsdPort = port
return sinkOptionFunc(func(sink *netSink) {
sink.conf.StatsdPort = port
})
}

// WithLogger configures the sink to use the provided logger otherwise
// the standard logrus logger is used.
func WithLogger(log *logger.Logger) SinkOption {
// TODO (CEV): use the zap.Logger
return sinkOptionFunc(func(sink *tcpStatsdSink) {
return sinkOptionFunc(func(sink *netSink) {
sink.log = log
})
}

// NewTCPStatsdSink returns a FlushableSink that is backed by a buffered writer
// and a separate goroutine that flushes those buffers to a statsd connection.
// NewTCPStatsdSink returns a new NetStink. This function name exists for
// backwards compatibility.
func NewTCPStatsdSink(opts ...SinkOption) FlushableSink {
outc := make(chan *bytes.Buffer, chanSize) // TODO(btc): parameterize
writer := sinkWriter{
outc: outc,
}
// TODO (CEV): this auto loading from the env is bad and should be removed.
conf := GetSettings()
s := &tcpStatsdSink{
outc: outc,
// TODO(btc): parameterize size
bufWriter: bufio.NewWriterSize(&writer, defaultBufferSize),
return NewNetSink(opts...)
}

// NewNetSink returns a FlushableSink that writes to a statsd sink over the
// network. By default settings are taken from the enviornment, but can be
// overridden via SinkOptions.
func NewNetSink(opts ...SinkOption) FlushableSink {
s := &netSink{
// arbitrarily buffered
doFlush: make(chan chan struct{}, 8),

// CEV: default to the standard logger to match the legacy implementation.
log: logger.StandardLogger(),
statsdHost: conf.StatsdHost,
statsdPort: conf.StatsdPort,
log: logger.StandardLogger(),

// TODO (CEV): auto loading from the env is bad and should be removed.
conf: GetSettings(),
}
for _, opt := range opts {
opt.apply(s)
}

// Calculate buffer size based on protocol, for UDP we want to pick a
// buffer size that will prevent datagram fragmentation.
var bufSize int
switch s.conf.StatsdProtocol {
case "udp", "udp4", "udp6":
bufSize = defaultBufferSizeUDP
default:
bufSize = defaultBufferSizeTCP
}

outc := make(chan *bytes.Buffer, approxMaxMemBytes/bufSize)
writer := &sinkWriter{
outc: outc,
}
s.outc = outc

s.bufWriter = bufio.NewWriterSize(writer, bufSize)

go s.run()
return s
}

type tcpStatsdSink struct {
type netSink struct {
conn net.Conn
outc chan *bytes.Buffer
mu sync.Mutex
bufWriter *bufio.Writer
doFlush chan chan struct{}
droppedBytes uint64
log *logger.Logger
statsdHost string
statsdPort int
conf Settings
}

type sinkWriter struct {
Expand All @@ -117,7 +149,7 @@ func (w *sinkWriter) Write(p []byte) (int, error) {
}
}

func (s *tcpStatsdSink) Flush() {
func (s *netSink) Flush() {
if s.flush() != nil {
return // nothing we can do
}
Expand All @@ -126,7 +158,7 @@ func (s *tcpStatsdSink) Flush() {
<-ch
}

func (s *tcpStatsdSink) flush() error {
func (s *netSink) flush() error {
s.mu.Lock()
err := s.bufWriter.Flush()
if err != nil {
Expand All @@ -136,7 +168,7 @@ func (s *tcpStatsdSink) flush() error {
return err
}

func (s *tcpStatsdSink) drainFlushQueue() {
func (s *netSink) drainFlushQueue() {
// Limit the number of items we'll flush to prevent this from possibly
// hanging when the flush channel is saturated with sends.
doFlush := s.doFlush
Expand All @@ -152,7 +184,7 @@ func (s *tcpStatsdSink) drainFlushQueue() {
}

// s.mu should be held
func (s *tcpStatsdSink) handleFlushErrorSize(err error, dropped int) {
func (s *netSink) handleFlushErrorSize(err error, dropped int) {
d := uint64(dropped)
if (s.droppedBytes+d)%logOnEveryNDroppedBytes > s.droppedBytes%logOnEveryNDroppedBytes {
s.log.WithField("total_dropped_bytes", s.droppedBytes+d).
Expand All @@ -167,11 +199,11 @@ func (s *tcpStatsdSink) handleFlushErrorSize(err error, dropped int) {
}

// s.mu should be held
func (s *tcpStatsdSink) handleFlushError(err error) {
func (s *netSink) handleFlushError(err error) {
s.handleFlushErrorSize(err, s.bufWriter.Buffered())
}

func (s *tcpStatsdSink) writeBuffer(b *buffer) {
func (s *netSink) writeBuffer(b *buffer) {
s.mu.Lock()
if s.bufWriter.Available() < b.Len() {
if err := s.bufWriter.Flush(); err != nil {
Expand All @@ -186,7 +218,7 @@ func (s *tcpStatsdSink) writeBuffer(b *buffer) {
s.mu.Unlock()
}

func (s *tcpStatsdSink) flushUint64(name, suffix string, u uint64) {
func (s *netSink) flushUint64(name, suffix string, u uint64) {
b := pbFree.Get().(*buffer)

b.WriteString(name)
Expand All @@ -200,7 +232,7 @@ func (s *tcpStatsdSink) flushUint64(name, suffix string, u uint64) {
pbFree.Put(b)
}

func (s *tcpStatsdSink) flushFloat64(name, suffix string, f float64) {
func (s *netSink) flushFloat64(name, suffix string, f float64) {
b := pbFree.Get().(*buffer)

b.WriteString(name)
Expand All @@ -214,15 +246,15 @@ func (s *tcpStatsdSink) flushFloat64(name, suffix string, f float64) {
pbFree.Put(b)
}

func (s *tcpStatsdSink) FlushCounter(name string, value uint64) {
func (s *netSink) FlushCounter(name string, value uint64) {
s.flushUint64(name, "|c\n", value)
}

func (s *tcpStatsdSink) FlushGauge(name string, value uint64) {
func (s *netSink) FlushGauge(name string, value uint64) {
s.flushUint64(name, "|g\n", value)
}

func (s *tcpStatsdSink) FlushTimer(name string, value float64) {
func (s *netSink) FlushTimer(name string, value float64) {
// Since we mistakenly use floating point values to represent time
// durations this method is often passed an integer encoded as a
// float. Formatting integers is much faster (>2x) than formatting
Expand All @@ -235,8 +267,8 @@ func (s *tcpStatsdSink) FlushTimer(name string, value float64) {
}
}

func (s *tcpStatsdSink) run() {
addr := net.JoinHostPort(s.statsdHost, strconv.Itoa(s.statsdPort))
func (s *netSink) run() {
addr := net.JoinHostPort(s.conf.StatsdHost, strconv.Itoa(s.conf.StatsdPort))

var reconnectFailed bool // true if last reconnect failed

Expand Down Expand Up @@ -287,7 +319,8 @@ func (s *tcpStatsdSink) run() {

// writeToConn writes the buffer to the underlying conn. May only be called
// from run().
func (s *tcpStatsdSink) writeToConn(buf *bytes.Buffer) {
func (s *netSink) writeToConn(buf *bytes.Buffer) {

len := buf.Len()

// TODO (CEV): parameterize timeout
Expand All @@ -304,9 +337,9 @@ func (s *tcpStatsdSink) writeToConn(buf *bytes.Buffer) {
}
}

func (s *tcpStatsdSink) connect(address string) error {
func (s *netSink) connect(address string) error {
// TODO (CEV): parameterize timeout
conn, err := net.DialTimeout("tcp", address, defaultDialTimeout)
conn, err := net.DialTimeout(s.conf.StatsdProtocol, address, defaultDialTimeout)
if err == nil {
s.conn = conn
}
Expand Down
Loading

0 comments on commit 02891db

Please sign in to comment.