diff --git a/pkg/kv/kvpb/batch_generated.go b/pkg/kv/kvpb/batch_generated.go index b721790a728f..15a57c7516af 100644 --- a/pkg/kv/kvpb/batch_generated.go +++ b/pkg/kv/kvpb/batch_generated.go @@ -830,12 +830,46 @@ type linkExternalSSTableResponseAlloc struct { resp LinkExternalSSTableResponse } +func allocBatchResponse(nResps int) *BatchResponse { + if nResps <= 1 { + alloc := new(struct { + br BatchResponse + resps [1]ResponseUnion + }) + alloc.br.Responses = alloc.resps[:nResps] + return &alloc.br + } else if nResps <= 2 { + alloc := new(struct { + br BatchResponse + resps [2]ResponseUnion + }) + alloc.br.Responses = alloc.resps[:nResps] + return &alloc.br + } else if nResps <= 4 { + alloc := new(struct { + br BatchResponse + resps [4]ResponseUnion + }) + alloc.br.Responses = alloc.resps[:nResps] + return &alloc.br + } else if nResps <= 8 { + alloc := new(struct { + br BatchResponse + resps [8]ResponseUnion + }) + alloc.br.Responses = alloc.resps[:nResps] + return &alloc.br + } + br := &BatchResponse{} + br.Responses = make([]ResponseUnion, nResps) + return br +} + // CreateReply creates replies for each of the contained requests, wrapped in a // BatchResponse. The response objects are batch allocated to minimize // allocation overhead. func (ba *BatchRequest) CreateReply() *BatchResponse { - br := &BatchResponse{} - br.Responses = make([]ResponseUnion, len(ba.Requests)) + br := allocBatchResponse(len(ba.Requests)) counts := ba.getReqCounts() diff --git a/pkg/kv/kvpb/gen/main.go b/pkg/kv/kvpb/gen/main.go index aac45035210f..a24ef985a038 100644 --- a/pkg/kv/kvpb/gen/main.go +++ b/pkg/kv/kvpb/gen/main.go @@ -366,13 +366,49 @@ func (ba *BatchRequest) WriteSummary(b *strings.Builder) { allocTypes[resV.variantName] = allocName } + fmt.Fprint(f, ` +func allocBatchResponse(nResps int) *BatchResponse { + if nResps <= 1 { + alloc := new(struct { + br BatchResponse + resps [1]ResponseUnion + }) + alloc.br.Responses = alloc.resps[:nResps] + return &alloc.br + } else if nResps <= 2 { + alloc := new(struct { + br BatchResponse + resps [2]ResponseUnion + }) + alloc.br.Responses = alloc.resps[:nResps] + return &alloc.br + } else if nResps <= 4 { + alloc := new(struct { + br BatchResponse + resps [4]ResponseUnion + }) + alloc.br.Responses = alloc.resps[:nResps] + return &alloc.br + } else if nResps <= 8 { + alloc := new(struct { + br BatchResponse + resps [8]ResponseUnion + }) + alloc.br.Responses = alloc.resps[:nResps] + return &alloc.br + } + br := &BatchResponse{} + br.Responses = make([]ResponseUnion, nResps) + return br +} +`) + fmt.Fprint(f, ` // CreateReply creates replies for each of the contained requests, wrapped in a // BatchResponse. The response objects are batch allocated to minimize // allocation overhead. func (ba *BatchRequest) CreateReply() *BatchResponse { - br := &BatchResponse{} - br.Responses = make([]ResponseUnion, len(ba.Requests)) + br := allocBatchResponse(len(ba.Requests)) counts := ba.getReqCounts() diff --git a/pkg/server/node.go b/pkg/server/node.go index 5f5a969990a7..0d1b8e422020 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1877,7 +1877,14 @@ func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchR func (n *Node) BatchStream(stream kvpb.Internal_BatchStreamServer) error { ctx := stream.Context() for { - args, err := stream.Recv() + argsAlloc := new(struct { + args kvpb.BatchRequest + reqs [1]kvpb.RequestUnion + }) + args := &argsAlloc.args + args.Requests = argsAlloc.reqs[:0] + + err := stream.RecvMsg(args) if err != nil { // From grpc.ServerStream.Recv: // > It returns io.EOF when the client has performed a CloseSend.