Skip to content

Commit

Permalink
Merge pull request #1 from alauda/tmp1
Browse files Browse the repository at this point in the history
Add http sink
  • Loading branch information
liubin authored Feb 8, 2018
2 parents 89ad3bb + a57d70b commit be5b132
Show file tree
Hide file tree
Showing 4 changed files with 303 additions and 24 deletions.
24 changes: 21 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,22 @@ import (
const (
sinkNameElasticSearch = "elasticsearch"
sinkNameKafka = "kafka"
sinkNameHTTPOutput = "http"
sinkNameHTTP = "http"
)

var (
resyncPeriod = flag.Duration("resync-period", 1*time.Minute, "Reflector resync period")
prometheusEndpoint = flag.String("prometheus-endpoint", ":80", "Endpoint on which to "+
"expose Prometheus http handler")
sinkName = flag.String("sink", sinkNameElasticSearch, "Sink type to save the exported events: elasticsearch/kafka/http")
elasticsearchEndpoint = flag.String("elasticsearch", "http://elasticsearch:9200/", "Elasticsearch endpoint")
elasticsearchEndpoint = flag.String("elasticsearch-server", "http://elasticsearch:9200/", "Elasticsearch endpoint")

// for http sink
httpEndpoint = flag.String("http-endpoint", "", "Http endpoint")
httpAuth = flag.String("auth", "token", "Http auth method: basic or token")
httpToken = flag.String("token", "", "Token header and value for http token auth")
httpUsername = flag.String("username", "", "Username for http basic auth")
httpPassword = flag.String("password", "", "Nassword for http basic auth")
)

func newSystemStopChannel() chan struct{} {
Expand Down Expand Up @@ -66,10 +73,21 @@ func main() {
if *sinkName == sinkNameElasticSearch {
config := sinks.DefaultElasticSearchConf()
config.Endpoint = *elasticsearchEndpoint
outSink, err = sinks.NewElasticSearchOut(config)
outSink, err = sinks.NewElasticSearchSink(config)
if err != nil {
glog.Fatalf("Failed to initialize elasticsearch output: %v", err)
}
} else if *sinkName == sinkNameHTTP {
config := sinks.DefaultHTTPConf()
config.Endpoint = httpEndpoint
config.Auth = httpAuth
config.Token = httpToken
config.Username = httpUsername
config.Password = httpPassword
outSink, err = sinks.NewHTTPSink(config)
if err != nil {
glog.Fatalf("Failed to initialize http output: %v", err)
}
} else {
fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0])
flag.PrintDefaults()
Expand Down
42 changes: 21 additions & 21 deletions sinks/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ import (
)

type ElasticSearchConf struct {
Endpoint string
User string
Password string
FlushDelay time.Duration
MaxBufferSize int
MaxConcurrency int
SinkCommonConf
Endpoint string
User string
Password string
}

type ElasticSearchOut struct {
type ElasticSearchSink struct {
config *ElasticSearchConf
esClient *elastic.Client
beforeFirstList bool
Expand All @@ -33,13 +31,15 @@ type ElasticSearchOut struct {

func DefaultElasticSearchConf() *ElasticSearchConf {
return &ElasticSearchConf{
FlushDelay: defaultFlushDelay,
MaxBufferSize: defaultMaxBufferSize,
MaxConcurrency: defaultMaxConcurrency,
SinkCommonConf: SinkCommonConf{
FlushDelay: defaultFlushDelay,
MaxBufferSize: defaultMaxBufferSize,
MaxConcurrency: defaultMaxConcurrency,
},
}
}

func NewElasticSearchOut(config *ElasticSearchConf) (*ElasticSearchOut, error) {
func NewElasticSearchSink(config *ElasticSearchConf) (*ElasticSearchSink, error) {
esClient, err := elastic.NewClient(elastic.SetSniff(false),
elastic.SetHealthcheckTimeoutStartup(10*time.Second), elastic.SetURL(config.Endpoint))
if err != nil {
Expand All @@ -49,7 +49,7 @@ func NewElasticSearchOut(config *ElasticSearchConf) (*ElasticSearchOut, error) {

glog.Infof("NewElasticSearchOut inited.")

return &ElasticSearchOut{
return &ElasticSearchSink{
esClient: esClient,
beforeFirstList: true,
logEntryChannel: make(chan *api_v1.Event, config.MaxBufferSize),
Expand All @@ -61,13 +61,13 @@ func NewElasticSearchOut(config *ElasticSearchConf) (*ElasticSearchOut, error) {
}, nil
}

func (es *ElasticSearchOut) OnAdd(event *api_v1.Event) {
func (es *ElasticSearchSink) OnAdd(event *api_v1.Event) {
ReceivedEntryCount.WithLabelValues(event.Source.Component).Inc()
glog.Infof("OnAdd %v", event)
es.logEntryChannel <- event
}

func (es *ElasticSearchOut) OnUpdate(oldEvent *api_v1.Event, newEvent *api_v1.Event) {
func (es *ElasticSearchSink) OnUpdate(oldEvent *api_v1.Event, newEvent *api_v1.Event) {
var oldCount int32
if oldEvent != nil {
oldCount = oldEvent.Count
Expand All @@ -89,19 +89,19 @@ func (es *ElasticSearchOut) OnUpdate(oldEvent *api_v1.Event, newEvent *api_v1.Ev
es.logEntryChannel <- newEvent
}

func (es *ElasticSearchOut) OnDelete(*api_v1.Event) {
func (es *ElasticSearchSink) OnDelete(*api_v1.Event) {
// Nothing to do here
}

func (es *ElasticSearchOut) OnList(list *api_v1.EventList) {
func (es *ElasticSearchSink) OnList(list *api_v1.EventList) {
// Nothing to do else
glog.Infof("OnList %v", list)
if es.beforeFirstList {
es.beforeFirstList = false
}
}

func (es *ElasticSearchOut) Run(stopCh <-chan struct{}) {
func (es *ElasticSearchSink) Run(stopCh <-chan struct{}) {
glog.Info("Starting Elasticsearch sink")
for {
select {
Expand All @@ -124,13 +124,13 @@ func (es *ElasticSearchOut) Run(stopCh <-chan struct{}) {
}
}

func (es *ElasticSearchOut) flushBuffer() {
func (es *ElasticSearchSink) flushBuffer() {
entries := es.currentBuffer
es.currentBuffer = nil
es.concurrencyChannel <- struct{}{}
go es.sendEntries(entries)
}
func (es *ElasticSearchOut) sendEntries(entries []*api_v1.Event) {
func (es *ElasticSearchSink) sendEntries(entries []*api_v1.Event) {
glog.V(4).Infof("Sending %d entries to Elasticsearch", len(entries))

bulkRequest := es.esClient.Bulk()
Expand All @@ -153,15 +153,15 @@ func (es *ElasticSearchOut) sendEntries(entries []*api_v1.Event) {
glog.V(4).Infof("Successfully sent %d entries to Elasticsearch", len(entries))
}

func (es *ElasticSearchOut) setTimer() {
func (es *ElasticSearchSink) setTimer() {
if es.timer == nil {
es.timer = time.NewTimer(es.config.FlushDelay)
} else {
es.timer.Reset(es.config.FlushDelay)
}
}

func (es *ElasticSearchOut) getTimerChannel() <-chan time.Time {
func (es *ElasticSearchSink) getTimerChannel() <-chan time.Time {
if es.timer == nil {
return es.fakeTimeChannel
}
Expand Down
Loading

0 comments on commit be5b132

Please sign in to comment.