diff --git a/CHANGES.txt b/CHANGES.txt index d0a956481..4d7a7fbe7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -26,6 +26,11 @@ Features Bug Handling ------------ +* Fixed issue where ElasticSearchOutput was trying to send zero length requests + (#1783). + +* Fixed race condition in ElasticSearchOutput (#1786). + * AMQPInput `Run` method now returns an error when the input channel closes but `Stop` hasn't been called to successfully trigger restarts (#1757). diff --git a/plugins/elasticsearch/elasticsearch.go b/plugins/elasticsearch/elasticsearch.go index ccc8d40fe..75a02abdd 100644 --- a/plugins/elasticsearch/elasticsearch.go +++ b/plugins/elasticsearch/elasticsearch.go @@ -41,6 +41,11 @@ type ESBatch struct { batch []byte } +type MsgPack struct { + bytes []byte + queueCursor string +} + // Output plugin that index messages to an elasticsearch cluster. // Largely based on FileOutput plugin. type ElasticSearchOutput struct { @@ -48,6 +53,7 @@ type ElasticSearchOutput struct { dropMessageCount int64 count int64 backChan chan []byte + recvChan chan MsgPack batchChan chan ESBatch // Chan to pass completed batches outBatch []byte queueCursor string @@ -59,7 +65,6 @@ type ElasticSearchOutput struct { reportLock sync.Mutex stopChan chan bool flushTicker *time.Ticker - flushManual chan struct{} } // ConfigStruct for ElasticSearchOutput plugin. @@ -118,7 +123,7 @@ func (o *ElasticSearchOutput) Init(config interface{}) (err error) { o.batchChan = make(chan ESBatch) o.backChan = make(chan []byte, 2) - o.flushManual = make(chan struct{}) + o.recvChan = make(chan MsgPack, 100) var serverUrl *url.URL if serverUrl, err = url.Parse(o.conf.Server); err == nil { @@ -185,14 +190,9 @@ func (o *ElasticSearchOutput) ProcessMessage(pack *PipelinePack) error { } if outBytes != nil { - o.outBatch = append(o.outBatch, outBytes...) - o.queueCursor = pack.QueueCursor - o.count++ - if len(o.outBatch) > 0 && o.bulkIndexer.CheckFlush(int(o.count), len(o.outBatch)) { - o.flushManual <- struct{}{} - <-o.flushManual // block until the batch is sent. - } + o.recvChan <- MsgPack{bytes: outBytes, queueCursor: pack.QueueCursor} } + return nil } @@ -203,9 +203,13 @@ func (o *ElasticSearchOutput) batchSender() { case <-o.stopChan: ok = false continue - case <-o.flushManual: - o.sendBatch() - o.flushManual <- struct{}{} + case pack := <-o.recvChan: + o.outBatch = append(o.outBatch, pack.bytes...) + o.queueCursor = pack.queueCursor + o.count++ + if len(o.outBatch) > 0 && o.bulkIndexer.CheckFlush(int(o.count), len(o.outBatch)) { + o.sendBatch() + } case <-o.flushTicker.C: if len(o.outBatch) > 0 { o.sendBatch() @@ -381,6 +385,10 @@ func (h *HttpBulkIndexer) Index(body []byte) (err error, retry bool) { var response_body []byte var response_body_json map[string]interface{} + if len(body) == 0 { + return nil, false + } + url := fmt.Sprintf("%s://%s%s%s", h.Protocol, h.Domain, h.Path, "/_bulk") // Creating ElasticSearch Bulk HTTP request @@ -418,7 +426,7 @@ func (h *HttpBulkIndexer) Index(body []byte) (err error, retry bool) { response.Status, string(response_body)), true } json_errors, ok := response_body_json["errors"].(bool) - if ok && json_errors { + if ok && json_errors && response.StatusCode != 200 { return fmt.Errorf( "ElasticSearch server reported error within JSON. Status: %s. Body: %s", response.Status, string(response_body)), false