-
Notifications
You must be signed in to change notification settings - Fork 20
/
writer.go
159 lines (129 loc) · 4.5 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package wire
import (
"context"
"errors"
"github.com/jeroenrinzema/psql-wire/pkg/buffer"
"github.com/jeroenrinzema/psql-wire/pkg/types"
)
// DataWriter represents a writer interface for writing columns and data rows
// using the Postgres wire to the connected client.
type DataWriter interface {
// Row writes a single data row containing the values inside the given slice to
// the underlaying Postgres client. The column headers have to be written before
// sending rows. Each item inside the slice represents a single column value.
// The slice length needs to be the same length as the defined columns. Nil
// values are encoded as NULL values.
Row([]any) error
// Written returns the number of rows written to the client.
Written() uint64
// Empty announces to the client an empty response and that no data rows should
// be expected.
Empty() error
// Columns returns the columns that are currently defined within the writer.
Columns() Columns
// Complete announces to the client that the command has been completed and
// no further data should be expected.
//
// See [CommandComplete] for the expected format for different queries.
//
// [CommandComplete]: https://www.postgresql.org/docs/current/protocol-message-formats.html#PROTOCOL-MESSAGE-FORMATS-COMMANDCOMPLETE
Complete(description string) error
// CopyIn sends a [CopyInResponse] to the client, to initiate a CopyIn
// operation. The copy operation can be used to send large amounts of data to
// the server in a single transaction. A column reader has to be used to read
// the data that is sent by the client to the CopyReader.
CopyIn(format FormatCode) (*CopyReader, error)
}
// ErrDataWritten is returned when an empty result is attempted to be sent to the
// client while data has already been written.
var ErrDataWritten = errors.New("data has already been written")
// ErrClosedWriter is returned when the data writer has been closed.
var ErrClosedWriter = errors.New("closed writer")
// NewDataWriter constructs a new data writer using the given context and
// buffer. The returned writer should be handled with caution as it is not safe
// for concurrent use. Concurrent access to the same data without proper
// synchronization can result in unexpected behavior and data corruption.
func NewDataWriter(ctx context.Context, columns Columns, formats []FormatCode, reader *buffer.Reader, writer *buffer.Writer) DataWriter {
return &dataWriter{
ctx: ctx,
columns: columns,
formats: formats,
client: writer,
reader: reader,
}
}
// dataWriter is a implementation of the DataWriter interface.
type dataWriter struct {
ctx context.Context
columns Columns
formats []FormatCode
client *buffer.Writer
reader *buffer.Reader
closed bool
written uint64
}
func (writer *dataWriter) Columns() Columns {
return writer.columns
}
func (writer *dataWriter) Define(columns Columns) error {
if writer.closed {
return ErrClosedWriter
}
writer.columns = columns
return writer.columns.Define(writer.ctx, writer.client, writer.formats)
}
func (writer *dataWriter) Row(values []any) error {
if writer.closed {
return ErrClosedWriter
}
writer.written++
return writer.columns.Write(writer.ctx, writer.formats, writer.client, values)
}
func (writer *dataWriter) CopyIn(format FormatCode) (*CopyReader, error) {
if writer.closed {
return nil, ErrClosedWriter
}
err := writer.columns.CopyIn(writer.ctx, writer.client, format)
if err != nil {
return nil, err
}
return NewCopyReader(writer.reader, writer.client, writer.columns), nil
}
func (writer *dataWriter) Empty() error {
if writer.closed {
return ErrClosedWriter
}
if writer.written != 0 {
return ErrDataWritten
}
defer writer.close()
return nil
}
func (writer *dataWriter) Written() uint64 {
return writer.written
}
func (writer *dataWriter) Complete(description string) error {
if writer.closed {
return ErrClosedWriter
}
if writer.written == 0 && writer.columns != nil {
err := writer.Empty()
if err != nil {
return err
}
}
defer writer.close()
return commandComplete(writer.client, description)
}
func (writer *dataWriter) close() {
writer.closed = true
}
// commandComplete announces that the requested command has successfully been executed.
// The given description is written back to the client and could be used to send
// additional meta data to the user.
func commandComplete(writer *buffer.Writer, description string) error {
writer.Start(types.ServerCommandComplete)
writer.AddString(description)
writer.AddNullTerminate()
return writer.End()
}