From d52be42f93d6bfda5677906030e9c0ad70ab38e9 Mon Sep 17 00:00:00 2001 From: Jinzheng Zhang Date: Mon, 2 Nov 2015 16:12:44 +0800 Subject: [PATCH 1/5] Drop empty batch when outputting to ES, fix #1783, fix #1630 --- plugins/elasticsearch/elasticsearch.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/elasticsearch/elasticsearch.go b/plugins/elasticsearch/elasticsearch.go index ccc8d40fe..a0664a0c4 100644 --- a/plugins/elasticsearch/elasticsearch.go +++ b/plugins/elasticsearch/elasticsearch.go @@ -381,6 +381,10 @@ func (h *HttpBulkIndexer) Index(body []byte) (err error, retry bool) { var response_body []byte var response_body_json map[string]interface{} + if len(body) == 0 { + return nil, false + } + url := fmt.Sprintf("%s://%s%s%s", h.Protocol, h.Domain, h.Path, "/_bulk") // Creating ElasticSearch Bulk HTTP request From 7fe13585a06a52e5a9eea0311c87aa8f93ebfd3f Mon Sep 17 00:00:00 2001 From: Jinzheng Zhang Date: Tue, 17 Nov 2015 19:11:02 +0800 Subject: [PATCH 2/5] Fix append function isn't in a synchronized method, fix #1786 --- plugins/elasticsearch/elasticsearch.go | 28 +++++++++++++++----------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/plugins/elasticsearch/elasticsearch.go b/plugins/elasticsearch/elasticsearch.go index a0664a0c4..a22074ace 100644 --- a/plugins/elasticsearch/elasticsearch.go +++ b/plugins/elasticsearch/elasticsearch.go @@ -41,6 +41,11 @@ type ESBatch struct { batch []byte } +type MsgPack struct { + bytes []byte + queueCursor string +} + // Output plugin that index messages to an elasticsearch cluster. // Largely based on FileOutput plugin. type ElasticSearchOutput struct { @@ -48,6 +53,7 @@ type ElasticSearchOutput struct { dropMessageCount int64 count int64 backChan chan []byte + recvChan chan *MsgPack batchChan chan ESBatch // Chan to pass completed batches outBatch []byte queueCursor string @@ -59,7 +65,6 @@ type ElasticSearchOutput struct { reportLock sync.Mutex stopChan chan bool flushTicker *time.Ticker - flushManual chan struct{} } // ConfigStruct for ElasticSearchOutput plugin. @@ -118,7 +123,7 @@ func (o *ElasticSearchOutput) Init(config interface{}) (err error) { o.batchChan = make(chan ESBatch) o.backChan = make(chan []byte, 2) - o.flushManual = make(chan struct{}) + o.recvChan = make(chan *MsgPack, 1024) var serverUrl *url.URL if serverUrl, err = url.Parse(o.conf.Server); err == nil { @@ -185,14 +190,9 @@ func (o *ElasticSearchOutput) ProcessMessage(pack *PipelinePack) error { } if outBytes != nil { - o.outBatch = append(o.outBatch, outBytes...) - o.queueCursor = pack.QueueCursor - o.count++ - if len(o.outBatch) > 0 && o.bulkIndexer.CheckFlush(int(o.count), len(o.outBatch)) { - o.flushManual <- struct{}{} - <-o.flushManual // block until the batch is sent. - } + o.recvChan <- &MsgPack{bytes: outBytes, queueCursor: pack.QueueCursor} } + return nil } @@ -203,9 +203,13 @@ func (o *ElasticSearchOutput) batchSender() { case <-o.stopChan: ok = false continue - case <-o.flushManual: - o.sendBatch() - o.flushManual <- struct{}{} + case pack := <-o.recvChan: + o.outBatch = append(o.outBatch, pack.bytes...) + o.queueCursor = pack.queueCursor + o.count++ + if len(o.outBatch) > 0 && o.bulkIndexer.CheckFlush(int(o.count), len(o.outBatch)) { + o.sendBatch() + } case <-o.flushTicker.C: if len(o.outBatch) > 0 { o.sendBatch() From 595506020c8afa07dec5158d0ed255cbfec28916 Mon Sep 17 00:00:00 2001 From: Jinzheng Zhang Date: Mon, 2 Nov 2015 11:56:31 +0800 Subject: [PATCH 3/5] Only log error message when response code not equal 200 --- plugins/elasticsearch/elasticsearch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/elasticsearch/elasticsearch.go b/plugins/elasticsearch/elasticsearch.go index a22074ace..e0ad4d1ae 100644 --- a/plugins/elasticsearch/elasticsearch.go +++ b/plugins/elasticsearch/elasticsearch.go @@ -426,7 +426,7 @@ func (h *HttpBulkIndexer) Index(body []byte) (err error, retry bool) { response.Status, string(response_body)), true } json_errors, ok := response_body_json["errors"].(bool) - if ok && json_errors { + if ok && json_errors && response.StatusCode != 200 { return fmt.Errorf( "ElasticSearch server reported error within JSON. Status: %s. Body: %s", response.Status, string(response_body)), false From 67087f73adefa349e0d67a5ba890ea4bd6eb6b3f Mon Sep 17 00:00:00 2001 From: Jinzheng Zhang Date: Fri, 20 Nov 2015 10:42:39 +0800 Subject: [PATCH 4/5] Use struct value instead of pointer to avoid garbage collection --- plugins/elasticsearch/elasticsearch.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/elasticsearch/elasticsearch.go b/plugins/elasticsearch/elasticsearch.go index e0ad4d1ae..ade263c1e 100644 --- a/plugins/elasticsearch/elasticsearch.go +++ b/plugins/elasticsearch/elasticsearch.go @@ -53,7 +53,7 @@ type ElasticSearchOutput struct { dropMessageCount int64 count int64 backChan chan []byte - recvChan chan *MsgPack + recvChan chan MsgPack batchChan chan ESBatch // Chan to pass completed batches outBatch []byte queueCursor string @@ -123,7 +123,7 @@ func (o *ElasticSearchOutput) Init(config interface{}) (err error) { o.batchChan = make(chan ESBatch) o.backChan = make(chan []byte, 2) - o.recvChan = make(chan *MsgPack, 1024) + o.recvChan = make(chan MsgPack, 1024) var serverUrl *url.URL if serverUrl, err = url.Parse(o.conf.Server); err == nil { @@ -190,7 +190,7 @@ func (o *ElasticSearchOutput) ProcessMessage(pack *PipelinePack) error { } if outBytes != nil { - o.recvChan <- &MsgPack{bytes: outBytes, queueCursor: pack.QueueCursor} + o.recvChan <- MsgPack{bytes: outBytes, queueCursor: pack.QueueCursor} } return nil From 221bc94a11a1836143304fa7f512cde3c1941918 Mon Sep 17 00:00:00 2001 From: Rob Miller Date: Fri, 20 Nov 2015 22:01:00 -0800 Subject: [PATCH 5/5] Use less buffering on the ESOutput recvChan, and update changelog. --- CHANGES.txt | 5 +++++ plugins/elasticsearch/elasticsearch.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGES.txt b/CHANGES.txt index d0a956481..4d7a7fbe7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -26,6 +26,11 @@ Features Bug Handling ------------ +* Fixed issue where ElasticSearchOutput was trying to send zero length requests + (#1783). + +* Fixed race condition in ElasticSearchOutput (#1786). + * AMQPInput `Run` method now returns an error when the input channel closes but `Stop` hasn't been called to successfully trigger restarts (#1757). diff --git a/plugins/elasticsearch/elasticsearch.go b/plugins/elasticsearch/elasticsearch.go index ade263c1e..75a02abdd 100644 --- a/plugins/elasticsearch/elasticsearch.go +++ b/plugins/elasticsearch/elasticsearch.go @@ -123,7 +123,7 @@ func (o *ElasticSearchOutput) Init(config interface{}) (err error) { o.batchChan = make(chan ESBatch) o.backChan = make(chan []byte, 2) - o.recvChan = make(chan MsgPack, 1024) + o.recvChan = make(chan MsgPack, 100) var serverUrl *url.URL if serverUrl, err = url.Parse(o.conf.Server); err == nil {