Skip to content

Commit

Permalink
Place the errors field first in bulk response (#98303)
Browse files Browse the repository at this point in the history
The bulk response is a map of fields that is inherently unordered.
However, we have some control over the order in which fields are
serialized. This commit places the errors field first to allow clients
who want to incrementally parse the response until the errors field is
encountered potentially encounter the field earlier.

There is also a test added to inform us if a future version of jackson
were to change the order of serialization.
  • Loading branch information
Tim-Brooks authored Aug 9, 2023
1 parent 5206587 commit 6a1bdb0
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ public RestStatus status() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ERRORS, hasFailures());
builder.field(TOOK, tookInMillis);
if (ingestTookInMillis != BulkResponse.NO_INGEST_TOOK) {
builder.field(INGEST_TOOK, ingestTookInMillis);
}
builder.field(ERRORS, hasFailures());
builder.startArray(ITEMS);
for (BulkItemResponse item : this) {
item.toXContent(builder, params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.elasticsearch.action.bulk.BulkResponse.NO_INGEST_TOOK;
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
import static org.hamcrest.Matchers.equalTo;

public class BulkResponseTests extends ESTestCase {

Expand All @@ -47,17 +48,7 @@ public void testToAndFromXContent() throws IOException {
DocWriteRequest.OpType opType = randomFrom(DocWriteRequest.OpType.values());

if (frequently()) {
Tuple<? extends DocWriteResponse, ? extends DocWriteResponse> randomDocWriteResponses = null;
if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
randomDocWriteResponses = IndexResponseTests.randomIndexResponse();
} else if (opType == DocWriteRequest.OpType.DELETE) {
randomDocWriteResponses = DeleteResponseTests.randomDeleteResponse();
} else if (opType == DocWriteRequest.OpType.UPDATE) {
randomDocWriteResponses = UpdateResponseTests.randomUpdateResponse(xContentType);
} else {
fail("Test does not support opType [" + opType + "]");
}

Tuple<? extends DocWriteResponse, ? extends DocWriteResponse> randomDocWriteResponses = success(opType, xContentType);
bulkItems[i] = BulkItemResponse.success(i, opType, randomDocWriteResponses.v1());
expectedBulkItems[i] = BulkItemResponse.success(i, opType, randomDocWriteResponses.v2());
} else {
Expand Down Expand Up @@ -97,4 +88,64 @@ public void testToAndFromXContent() throws IOException {
BytesReference expectedFinalBytes = toXContent(parsedBulkResponse, xContentType, humanReadable);
assertToXContentEquivalent(expectedFinalBytes, finalBytes, xContentType);
}

public void testToXContentPlacesErrorsFirst() throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
boolean humanReadable = randomBoolean();

boolean errors = false;
long took = randomFrom(randomNonNegativeLong(), -1L);
long ingestTook = randomFrom(randomNonNegativeLong(), NO_INGEST_TOOK);
int nbBulkItems = randomIntBetween(1, 10);

BulkItemResponse[] bulkItems = new BulkItemResponse[nbBulkItems];

for (int i = 0; i < nbBulkItems; i++) {
DocWriteRequest.OpType opType = randomFrom(DocWriteRequest.OpType.values());
if (frequently()) {
Tuple<? extends DocWriteResponse, ? extends DocWriteResponse> randomDocWriteResponses = success(opType, xContentType);

BulkItemResponse success = BulkItemResponse.success(i, opType, randomDocWriteResponses.v1());
bulkItems[i] = success;
} else {
errors = true;
String index = randomAlphaOfLength(5);
String id = randomAlphaOfLength(5);

Tuple<Throwable, ElasticsearchException> failures = randomExceptions();

Exception bulkItemCause = (Exception) failures.v1();
bulkItems[i] = BulkItemResponse.failure(i, opType, new BulkItemResponse.Failure(index, id, bulkItemCause));
}
}

BulkResponse bulkResponse = new BulkResponse(bulkItems, took, ingestTook);
BytesReference originalBytes = toXContent(bulkResponse, xContentType, ToXContent.EMPTY_PARAMS, humanReadable);

try (XContentParser parser = createParser(xContentType.xContent(), originalBytes)) {
assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_OBJECT));
XContentParser.Token firstField = parser.nextToken();
assertThat(firstField, equalTo(XContentParser.Token.FIELD_NAME));
assertThat(parser.currentName(), equalTo("errors"));
assertThat(parser.nextToken(), equalTo(XContentParser.Token.VALUE_BOOLEAN));
assertThat(parser.booleanValue(), equalTo(errors));
}
}

private static Tuple<? extends DocWriteResponse, ? extends DocWriteResponse> success(
DocWriteRequest.OpType opType,
XContentType xContentType
) {
Tuple<? extends DocWriteResponse, ? extends DocWriteResponse> randomDocWriteResponses = null;
if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
randomDocWriteResponses = IndexResponseTests.randomIndexResponse();
} else if (opType == DocWriteRequest.OpType.DELETE) {
randomDocWriteResponses = DeleteResponseTests.randomDeleteResponse();
} else if (opType == DocWriteRequest.OpType.UPDATE) {
randomDocWriteResponses = UpdateResponseTests.randomUpdateResponse(xContentType);
} else {
fail("Test does not support opType [" + opType + "]");
}
return randomDocWriteResponses;
}
}

0 comments on commit 6a1bdb0

Please sign in to comment.