diff --git a/go/mysql/binlog/binlog_json.go b/go/mysql/binlog/binlog_json.go index 30c80531a96..d1ecf464081 100644 --- a/go/mysql/binlog/binlog_json.go +++ b/go/mysql/binlog/binlog_json.go @@ -73,9 +73,57 @@ func ParseBinaryJSON(data []byte) (*json.Value, error) { } // ParseBinaryJSONDiff provides the parsing function from the binary MySQL -// JSON diff representation to an SQL expression. +// JSON diff representation to an SQL expression. These diffs are included +// in the AFTER image of PartialUpdateRows events which exist in MySQL 8.0 +// and later when the --binlog-row-value-options=PARTIAL_JSON is used. You +// can read more about these here: +// https://dev.mysql.com/blog-archive/efficient-json-replication-in-mysql-8-0/ +// https://dev.mysql.com/worklog/task/?id=2955 +// https://github.com/mysql/mysql-server/blob/trunk/sql-common/json_diff.h +// https://github.com/mysql/mysql-server/blob/trunk/sql-common/json_diff.cc +// +// The binary format for the partial JSON column or JSON diff is: +// +--------+--------+--------+ +--------+ +// | length | diff_1 | diff_2 | ... | diff_N | +// +--------+--------+--------+ +--------+ +// +// Each diff_i represents a single JSON diff. It has the following +// format: +// +-----------+-------------+------+ +-------------+------+ +// | operation | path_length | path | ( | data_length | data | )? +// +-----------+-------------+------+ +-------------+------+ +// +// The fields are: +// +// 1. operation: a single byte containing the JSON diff operation. +// The possible values are defined by enum_json_diff_operation: +// REPLACE=0 +// INSERT=1 +// REMOVE=2 +// +// 2. path_length: an unsigned integer in net_field_length() format. +// +// 3. path: a string of 'path_length' bytes containing the JSON path +// of the update. +// +// 4. data_length: an unsigned integer in net_field_length() format. +// +// 5. data: a string of 'data_length' bytes containing the JSON +// document that will be inserted at the position specified by +// 'path'. +// +// data_length and data are omitted if and only if operation=REMOVE. +// +// Examples of the resulting SQL expression are: +// - "" for an empty diff when the column was not updated +// - "null" for a JSON null +// - "JSON_REMOVE(%s, _utf8mb4'$.salary')" for a REMOVE operation +// - "JSON_INSERT(%s, _utf8mb4'$.role', CAST(JSON_QUOTE(_utf8mb4'manager') as JSON))" for an INSERT operation +// - "JSON_INSERT(JSON_REMOVE(JSON_REPLACE(%s, _utf8mb4'$.day', CAST(JSON_QUOTE(_utf8mb4'tuesday') as JSON)), _utf8mb4'$.favorite_color'), _utf8mb4'$.hobby', CAST(JSON_QUOTE(_utf8mb4'skiing') as JSON))" for a more complex example func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) { if len(data) == 0 { + // An empty diff is used as a way to elide the column from + // the AFTER image when it was not updated in the row event. return sqltypes.MakeTrusted(sqltypes.Expression, data), nil } @@ -88,10 +136,36 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) { outer := false innerStr := "" + // Create the SQL expression from the data which will consist of + // a sequence of JSON_X(col/json, path[, value]) clauses where X + // is REPLACE, INSERT, or REMOVE. The data can also be a JSON + // null, which is a special case we handle here as well. We take + // a binary representation of a vector of JSON diffs, for example: + // (REPLACE, '$.a', '7') + // (REMOVE, '$.d[0]') + // (INSERT, '$.e', '"ee"') + // (INSERT, '$.f[1]', '"ff"') + // (INSERT, '$.g', '"gg"') + // And build an SQL expression from it: + // JSON_INSERT( + // JSON_INSERT( + // JSON_INSERT( + // JSON_REMOVE( + // JSON_REPLACE(col, '$.a', 7), + // '$.d[0]'), + // '$.e', 'ee'), + // '$.f[3]', 'ff'), + // '$.g', 'gg') for pos < len(data) { opType := jsonDiffOp(data[pos]) pos++ if outer { + // We process the bytes sequentially but build the SQL + // expression from the inner most function to the outer most + // and thus need to wrap any subsequent functions around the + // previous one(s). For example: + // The inner: JSON_REPLACE(%s, '$.a', 7) + // The outer: JSON_REMOVE(, '$.b') innerStr = diff.String() diff.Reset() } @@ -112,6 +186,7 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) { "invalid JSON diff operation: %d", opType) } if outer { + // Wrap this outer function around the previous inner one(s). diff.WriteString(innerStr) diff.WriteString(", ") } else { // Only the inner most function has the field name @@ -119,13 +194,16 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) { } outer = true + // Read the JSON document path that we want to operate on. pathLen, readTo := readVariableLength(data, pos) pos = readTo path := data[pos : pos+pathLen] pos += pathLen - // We have to specify the unicode character set for the strings we + // We have to specify the unicode character set for the path we // use in the expression as the connection can be using a different // character set (e.g. vreplication always uses set names binary). + // The generated path will look like this: + // _utf8mb4'$.role' diff.WriteString(sqlparser.Utf8mb4Str) diff.WriteByte('\'') diff.Write(path) @@ -136,16 +214,22 @@ func ParseBinaryJSONDiff(data []byte) (sqltypes.Value, error) { } diff.WriteString(", ") + // Read the JSON document value that we want to set. valueLen, readTo := readVariableLength(data, pos) pos = readTo + // The native JSON type and value that we want to + // set (string, number, object, array, null). value, err := ParseBinaryJSON(data[pos : pos+valueLen]) if err != nil { return sqltypes.Value{}, vterrors.Wrapf(err, "cannot read JSON diff value for path %q", path) } pos += valueLen + + // Generate the SQL clause for the JSON diff's value. For example: + // "CAST(JSON_QUOTE(_utf8mb4'manager') as JSON)" diff.Write(value.MarshalSQLTo(nil)) - diff.WriteByte(')') + diff.WriteByte(')') // Close the JSON function } return sqltypes.MakeTrusted(sqltypes.Expression, diff.Bytes()), nil