From b8feb06a8b0f5073e825f0c90bee647dc50a91cc Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Wed, 26 Jun 2024 14:48:05 +0200 Subject: [PATCH] Upgrade go-cassandra-native-protocol library --- go.mod | 2 +- go.sum | 51 ++++++++++++++++ integration-tests/customhandler_test_utils.go | 4 +- integration-tests/functioncalls_test.go | 23 ++++---- integration-tests/prepared_statements_test.go | 58 +++++++++---------- proxy/pkg/config/config.go | 2 +- proxy/pkg/zdmproxy/clientconn.go | 2 +- proxy/pkg/zdmproxy/clienthandler.go | 18 +++--- proxy/pkg/zdmproxy/controlconn.go | 5 +- proxy/pkg/zdmproxy/cqlconn.go | 9 ++- proxy/pkg/zdmproxy/cqlparser.go | 11 ++-- .../cqlparser_adv_workloads_utils_test.go | 6 +- proxy/pkg/zdmproxy/cqlparser_test.go | 10 +++- proxy/pkg/zdmproxy/frameprocessor.go | 4 +- proxy/pkg/zdmproxy/host.go | 32 +++++++--- proxy/pkg/zdmproxy/nativeprotocol.go | 12 ++-- proxy/pkg/zdmproxy/parametermodifier_test.go | 6 +- proxy/pkg/zdmproxy/querymodifier.go | 8 +-- proxy/pkg/zdmproxy/querymodifier_test.go | 10 ++-- 19 files changed, 177 insertions(+), 96 deletions(-) diff --git a/go.mod b/go.mod index cd3ed9dc..ad8753b8 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20211106181442-e4c1a74c66bd - github.com/datastax/go-cassandra-native-protocol v0.0.0-20220525125956-6158d9e218b8 + github.com/datastax/go-cassandra-native-protocol v0.0.0-20240626123646-2abea740da8d github.com/gocql/gocql v0.0.0-20200624222514-34081eda590e github.com/google/uuid v1.1.1 github.com/jpillora/backoff v1.0.0 diff --git a/go.sum b/go.sum index e28fd660..8d3e4fdf 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20211106181442-e4c1a74c66bd h1:fjJY1LimH0wVCvOHLX35SCX/MbWomAglET1H2kvz7xc= github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20211106181442-e4c1a74c66bd/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY= +github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 h1:yL7+Jz0jTC6yykIK/Wh74gnTJnrGr5AyrNMXuA0gves= +github.com/antlr/antlr4/runtime/Go/antlr v1.4.10/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -14,9 +16,14 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/datastax/go-cassandra-native-protocol v0.0.0-20220525125956-6158d9e218b8 h1:NKLtNzC76ssf68VOenDAzMyQGg+QkxuD2QCubX+GvLk= github.com/datastax/go-cassandra-native-protocol v0.0.0-20220525125956-6158d9e218b8/go.mod h1:yFD0OKoVV9d1QW7Es58c1Gv6ijrqTGPcxgHv27wdC4Q= +github.com/datastax/go-cassandra-native-protocol v0.0.0-20240626123646-2abea740da8d h1:UnPtAA8Ux3GvHLazSSUydERFuoQRyxHrB8puzXyjXIE= +github.com/datastax/go-cassandra-native-protocol v0.0.0-20240626123646-2abea740da8d/go.mod h1:6FzirJfdffakAVqmHjwVfFkpru/gNbIazUOK5rIhndc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -27,20 +34,31 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gocql/gocql v0.0.0-20200624222514-34081eda590e h1:SroDcndcOU9BVAduPf/PXihXoR2ZYTQYLXbupbqxAyQ= github.com/gocql/gocql v0.0.0-20200624222514-34081eda590e/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY= +github.com/gocql/gocql v1.6.0 h1:IdFdOTbnpbd0pDhl4REKQDM+Q0SzKXQ1Yh+YZZ8T/qU= +github.com/gocql/gocql v1.6.0/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= @@ -57,9 +75,17 @@ github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFB github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -69,32 +95,46 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/pierrec/lz4/v4 v4.0.3 h1:vNQKSVZNYUEAvRY9FaUXAF1XPbSOHJtDTiP41kzDz2E= github.com/pierrec/lz4/v4 v4.0.3/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.3.0 h1:miYCvYqFXtl/J9FIy8eNpBfYthAEFg+Ys0XyUVEcDsc= github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og= +github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= +github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.1.0 h1:ElTg5tNp4DqfV7UQjDqv2+RJlNzsDtvNAWccbItceIE= github.com/prometheus/client_model v0.1.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.7.0 h1:L+1lyG48J1zAQXA3RBX/nG/B3gjlHq0zTt2tlbJLyCY= github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.20.0 h1:38k9hgtUBdxFwE34yS8rTHmHBa4eN16E4DJlv177LNs= github.com/rs/zerolog v1.20.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo= +github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= +github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= @@ -102,9 +142,12 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -119,12 +162,20 @@ golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190828213141-aed303cbaa74/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= diff --git a/integration-tests/customhandler_test_utils.go b/integration-tests/customhandler_test_utils.go index 7e440d6f..97fdc7ae 100644 --- a/integration-tests/customhandler_test_utils.go +++ b/integration-tests/customhandler_test_utils.go @@ -57,7 +57,7 @@ var ( releaseVersionColumn = &message.ColumnMetadata{Keyspace: "system", Table: "local", Name: "release_version", Type: datatype.Varchar} rpcAddressColumn = &message.ColumnMetadata{Keyspace: "system", Table: "local", Name: "rpc_address", Type: datatype.Inet} schemaVersionColumn = &message.ColumnMetadata{Keyspace: "system", Table: "local", Name: "schema_version", Type: datatype.Uuid} - tokensColumn = &message.ColumnMetadata{Keyspace: "system", Table: "local", Name: "tokens", Type: datatype.NewSetType(datatype.Varchar)} + tokensColumn = &message.ColumnMetadata{Keyspace: "system", Table: "local", Name: "tokens", Type: datatype.NewSet(datatype.Varchar)} ) // These columns are a subset of the total columns returned by OSS C* 3.11.2, and contain all the information that @@ -86,7 +86,7 @@ var ( releaseVersionPeersColumn = &message.ColumnMetadata{Keyspace: "system", Table: "peers", Name: "release_version", Type: datatype.Varchar} rpcAddressPeersColumn = &message.ColumnMetadata{Keyspace: "system", Table: "peers", Name: "rpc_address", Type: datatype.Inet} schemaVersionPeersColumn = &message.ColumnMetadata{Keyspace: "system", Table: "peers", Name: "schema_version", Type: datatype.Uuid} - tokensPeersColumn = &message.ColumnMetadata{Keyspace: "system", Table: "peers", Name: "tokens", Type: datatype.NewSetType(datatype.Varchar)} + tokensPeersColumn = &message.ColumnMetadata{Keyspace: "system", Table: "peers", Name: "tokens", Type: datatype.NewSet(datatype.Varchar)} ) // These columns are a subset of the total columns returned by OSS C* 3.11.2, and contain all the information that diff --git a/integration-tests/functioncalls_test.go b/integration-tests/functioncalls_test.go index 7cf04d76..96fbbfeb 100644 --- a/integration-tests/functioncalls_test.go +++ b/integration-tests/functioncalls_test.go @@ -854,7 +854,7 @@ func TestNowFunctionReplacementPreparedStatement(t *testing.T) { isReplacedNow: false, value: []int{11, 22, 33}, valueSimulacron: []int{11, 22, 33}, - dataType: datatype.NewListType(datatype.Int), + dataType: datatype.NewList(datatype.Int), simulacronType: "list", }, { @@ -880,7 +880,7 @@ func TestNowFunctionReplacementPreparedStatement(t *testing.T) { {1, 2, 3}, {2, 3, 4}, }, - dataType: datatype.NewListType(datatype.NewTupleType(datatype.Int, datatype.Int, datatype.Int)), + dataType: datatype.NewList(datatype.NewTuple(datatype.Int, datatype.Int, datatype.Int)), simulacronType: "list>", }, { @@ -2261,7 +2261,7 @@ func TestNowFunctionReplacementBatchStatement(t *testing.T) { } expectedBatchChildQueries = append(expectedBatchChildQueries, expectedBatchChildQuery) - var queryOrId interface{} + var batchChild *message.BatchChild if childStatement.prepared { when := simulacron.NewWhenQueryOptions() for _, p := range expectedChildQueryParams { @@ -2285,18 +2285,21 @@ func TestNowFunctionReplacementBatchStatement(t *testing.T) { require.Nil(t, err) prepared, ok := resp.Body.Message.(*message.PreparedResult) require.True(t, ok) - queryOrId = prepared.PreparedQueryId + batchChild = &message.BatchChild{ + Id: prepared.PreparedQueryId, + Values: positionalValues, + } validateForwardedPrepare(simulacronSetup.Origin, childStatement) validateForwardedPrepare(simulacronSetup.Target, childStatement) } else { - queryOrId = childStatement.originalQuery + batchChild = &message.BatchChild{ + Query: childStatement.originalQuery, + Values: positionalValues, + } } - batchChildStatements = append(batchChildStatements, &message.BatchChild{ - QueryOrId: queryOrId, - Values: positionalValues, - }) + batchChildStatements = append(batchChildStatements, batchChild) } batchMsg := &message.Batch{ @@ -2325,7 +2328,7 @@ func TestNowFunctionReplacementBatchStatement(t *testing.T) { actualStmt := matching[0].QueriesOrIds[idx] actualParams := matching[0].Values[idx] if childStatement.prepared { - b64ExpectedValue := base64.StdEncoding.EncodeToString(batchChildStatements[idx].QueryOrId.([]byte)) + b64ExpectedValue := base64.StdEncoding.EncodeToString(batchChildStatements[idx].Id) require.Equal(t, b64ExpectedValue, actualStmt, idx) } else { if enableNowReplacement { diff --git a/integration-tests/prepared_statements_test.go b/integration-tests/prepared_statements_test.go index cf8edc11..6e62a5e6 100644 --- a/integration-tests/prepared_statements_test.go +++ b/integration-tests/prepared_statements_test.go @@ -353,9 +353,9 @@ func TestPreparedIdReplacement(t *testing.T) { var batchPrepareMsg *message.Prepare var expectedBatchPrepareMsg *message.Prepare if test.batchQuery != "" { - batchPrepareMsg = prepareMsg.Clone().(*message.Prepare) + batchPrepareMsg = prepareMsg.DeepCopy() batchPrepareMsg.Query = test.batchQuery - expectedBatchPrepareMsg = batchPrepareMsg.Clone().(*message.Prepare) + expectedBatchPrepareMsg = batchPrepareMsg.DeepCopy() expectedBatchPrepareMsg.Query = test.expectedBatchQuery prepareResp, err = testSetup.Client.CqlConnection.SendAndReceive( frame.NewFrame(primitive.ProtocolVersion4, 10, batchPrepareMsg)) @@ -391,15 +391,15 @@ func TestPreparedIdReplacement(t *testing.T) { Type: primitive.BatchTypeLogged, Children: []*message.BatchChild{ { - QueryOrId: test.query, + Query: test.query, // the decoder uses empty slices instead of nil so this has to be initialized this way // so that the equality assertions work later in this test Values: make([]*primitive.Value, 0), }, { - QueryOrId: originBatchPreparedId, - Values: make([]*primitive.Value, 0), + Id: originBatchPreparedId, + Values: make([]*primitive.Value, 0), }, }, Consistency: primitive.ConsistencyLevelLocalQuorum, @@ -482,7 +482,7 @@ func TestPreparedIdReplacement(t *testing.T) { require.Equal(t, originPreparedId, originExecuteMessages[0].QueryId) if expectedOriginBatches > 0 { require.Equal(t, 2, len(originBatchMessages[0].Children)) - require.Equal(t, originBatchPreparedId, originBatchMessages[0].Children[1].QueryOrId) + require.Equal(t, originBatchPreparedId, originBatchMessages[0].Children[1].Id) } for _, targetExecute := range targetExecuteMessages { @@ -491,7 +491,7 @@ func TestPreparedIdReplacement(t *testing.T) { } if expectedTargetBatches > 0 { require.Equal(t, 2, len(targetBatchMessages[0].Children)) - require.Equal(t, targetBatchPreparedId, targetBatchMessages[0].Children[1].QueryOrId) + require.Equal(t, targetBatchPreparedId, targetBatchMessages[0].Children[1].Id) require.NotEqual(t, batchMsg, targetBatchMessages[0]) } @@ -508,8 +508,8 @@ func TestPreparedIdReplacement(t *testing.T) { require.NotEqual(t, len(executeMsg.Options.PositionalValues), len(originExecuteMessages[0].Options.PositionalValues)) // check if only the positional values are different, we test the parameter replacement in depth on other tests - modifiedOriginExecuteMsg := originExecuteMessages[0].Clone() - modifiedOriginExecuteMsg.(*message.Execute).Options.PositionalValues = executeMsg.Options.PositionalValues + modifiedOriginExecuteMsg := originExecuteMessages[0].DeepCopy() + modifiedOriginExecuteMsg.Options.PositionalValues = executeMsg.Options.PositionalValues require.Equal(t, executeMsg, modifiedOriginExecuteMsg) require.Equal(t, originExecuteMessages[0].Options, targetExecuteMessages[0].Options) } else { @@ -524,19 +524,19 @@ func TestPreparedIdReplacement(t *testing.T) { require.Equal(t, expectedBatchPrepareMsg, originPrepareMessages[1]) if test.expectedBatchPreparedStmtVariables != nil { - require.NotEqual(t, batchMsg.Children[0].QueryOrId, originBatchMessages[0].Children[0].QueryOrId) - require.NotEqual(t, batchMsg.Children[0].QueryOrId, targetBatchMessages[0].Children[0].QueryOrId) - require.Equal(t, originBatchMessages[0].Children[0].QueryOrId, targetBatchMessages[0].Children[0].QueryOrId) + require.NotEqual(t, batchMsg.Children[0].Query, originBatchMessages[0].Children[0].Query) + require.NotEqual(t, batchMsg.Children[0].Query, targetBatchMessages[0].Children[0].Query) + require.Equal(t, originBatchMessages[0].Children[0].Query, targetBatchMessages[0].Children[0].Query) require.Equal(t, 0, len(targetBatchMessages[0].Children[0].Values)) require.Equal(t, 0, len(originBatchMessages[0].Children[0].Values)) require.Equal(t, 0, len(batchMsg.Children[0].Values)) - require.Equal(t, batchMsg.Children[1].QueryOrId, originBatchMessages[0].Children[1].QueryOrId) - require.NotEqual(t, batchMsg.Children[1].QueryOrId, targetBatchMessages[0].Children[1].QueryOrId) - require.NotEqual(t, originBatchMessages[0].Children[1].QueryOrId, targetBatchMessages[0].Children[1].QueryOrId) - require.Equal(t, targetBatchPreparedId, targetBatchMessages[0].Children[1].QueryOrId) - require.Equal(t, originBatchPreparedId, originBatchMessages[0].Children[1].QueryOrId) - require.Equal(t, originBatchPreparedId, batchMsg.Children[1].QueryOrId) + require.Equal(t, batchMsg.Children[1].Query, originBatchMessages[0].Children[1].Query) + require.NotEqual(t, batchMsg.Children[1].Id, targetBatchMessages[0].Children[1].Id) + require.NotEqual(t, originBatchMessages[0].Children[1].Id, targetBatchMessages[0].Children[1].Id) + require.Equal(t, targetBatchPreparedId, targetBatchMessages[0].Children[1].Id) + require.Equal(t, originBatchPreparedId, originBatchMessages[0].Children[1].Id) + require.Equal(t, originBatchPreparedId, batchMsg.Children[1].Id) require.Equal(t, len(test.expectedBatchPreparedStmtVariables.Columns), len(targetBatchMessages[0].Children[1].Values)) require.Equal(t, len(test.expectedBatchPreparedStmtVariables.Columns), len(originBatchMessages[0].Children[1].Values)) require.Equal(t, 0, len(batchMsg.Children[1].Values)) @@ -546,8 +546,8 @@ func TestPreparedIdReplacement(t *testing.T) { } else { require.Equal(t, batchMsg, originBatchMessages[0]) require.NotEqual(t, batchMsg, targetBatchMessages[0]) - clonedBatchMsg := targetBatchMessages[0].Clone().(*message.Batch) - clonedBatchMsg.Children[1].QueryOrId = originBatchPreparedId + clonedBatchMsg := targetBatchMessages[0].DeepCopy() + clonedBatchMsg.Children[1].Id = originBatchPreparedId require.Equal(t, batchMsg, clonedBatchMsg) } } @@ -706,7 +706,7 @@ func TestUnpreparedIdReplacement(t *testing.T) { var batchMsg *message.Batch var batchPrepareMsg *message.Prepare if test.batchQuery != "" { - batchPrepareMsg = prepareMsg.Clone().(*message.Prepare) + batchPrepareMsg = prepareMsg.DeepCopy() batchPrepareMsg.Query = test.batchQuery prepareResp, err = testSetup.Client.CqlConnection.SendAndReceive( frame.NewFrame(primitive.ProtocolVersion4, 10, batchPrepareMsg)) @@ -721,14 +721,14 @@ func TestUnpreparedIdReplacement(t *testing.T) { Type: primitive.BatchTypeLogged, Children: []*message.BatchChild{ { - QueryOrId: test.query, + Query: test.query, // the decoder uses empty slices instead of nil so this has to be initialized this way // so that the equality assertions work later in this test Values: make([]*primitive.Value, 0), }, { - QueryOrId: originBatchPreparedId, - Values: make([]*primitive.Value, 0), + Id: originBatchPreparedId, + Values: make([]*primitive.Value, 0), }, }, Consistency: primitive.ConsistencyLevelLocalQuorum, @@ -843,7 +843,7 @@ func TestUnpreparedIdReplacement(t *testing.T) { if expectedTargetBatches > 0 { for _, batch := range targetBatchMessages { require.Equal(t, 2, len(batch.Children)) - require.Equal(t, targetBatchPreparedId, batch.Children[1].QueryOrId) + require.Equal(t, targetBatchPreparedId, batch.Children[1].Id) require.NotEqual(t, batchMsg, batch) } } @@ -1117,13 +1117,11 @@ func NewPreparedTestHandler( func checkIfPreparedIdMatches(batchMsg *message.Batch, preparedId []byte) (bool, []byte) { var batchPreparedId []byte for _, child := range batchMsg.Children { - switch queryOrId := child.QueryOrId.(type) { - case []byte: - batchPreparedId = queryOrId - if !bytes.Equal(queryOrId, preparedId) { + if child.Id != nil { + batchPreparedId = child.Id + if !bytes.Equal(child.Id, preparedId) { return false, batchPreparedId } - default: } } diff --git a/proxy/pkg/config/config.go b/proxy/pkg/config/config.go index 4df23560..a249cbf8 100644 --- a/proxy/pkg/config/config.go +++ b/proxy/pkg/config/config.go @@ -20,7 +20,7 @@ type Config struct { ReadMode string `default:"PRIMARY_ONLY" split_words:"true"` ReplaceCqlFunctions bool `default:"false" split_words:"true"` AsyncHandshakeTimeoutMs int `default:"4000" split_words:"true"` - LogLevel string `default:"INFO" split_words:"true"` + LogLevel string `default:"DEBUG" split_words:"true"` ControlConnMaxProtocolVersion string `default:"3" split_words:"true"` // Numeric Cassandra OSS protocol version or Dse1 / Dse2 // Proxy Topology (also known as system.peers "virtualization") bucket diff --git a/proxy/pkg/zdmproxy/clientconn.go b/proxy/pkg/zdmproxy/clientconn.go index b6bafe65..ff8e2100 100644 --- a/proxy/pkg/zdmproxy/clientconn.go +++ b/proxy/pkg/zdmproxy/clientconn.go @@ -187,7 +187,7 @@ func (cc *ClientConnector) listenForRequests() { cc.sendResponseToClient(protocolErrResponseFrame) continue } else if alreadySentProtocolErr != nil { - clonedProtocolErr := alreadySentProtocolErr.Clone() + clonedProtocolErr := alreadySentProtocolErr.DeepCopy() clonedProtocolErr.Header.StreamId = f.Header.StreamId cc.sendResponseToClient(clonedProtocolErr) continue diff --git a/proxy/pkg/zdmproxy/clienthandler.go b/proxy/pkg/zdmproxy/clienthandler.go index 1eb5e1dd..cedf7c83 100644 --- a/proxy/pkg/zdmproxy/clienthandler.go +++ b/proxy/pkg/zdmproxy/clienthandler.go @@ -891,7 +891,7 @@ func (ch *ClientHandler) processClientResponse( return nil, fmt.Errorf("invalid cluster type: %v", responseClusterType) } - newFrame = decodedFrame.Clone() + newFrame = decodedFrame.DeepCopy() newUnprepared := &message.Unprepared{ ErrorMessage: fmt.Sprintf("Prepared query with ID %s not found (either the query was not prepared "+ "on this host (maybe the host has been restarted?) or you have prepared too many queries and it has "+ @@ -945,7 +945,7 @@ func (ch *ClientHandler) processPreparedResponse( return nil, fmt.Errorf("replaced terms in the prepared statement but prepared result doesn't have variables metadata: %v", bodyMsg) } - newResponse = response.Clone() + newResponse = response.DeepCopy() newPreparedBody, ok := newResponse.Body.Message.(*message.PreparedResult) if !ok { return nil, fmt.Errorf("could not modify prepared result to remove generated parameters because "+ @@ -1655,7 +1655,7 @@ func (ch *ClientHandler) handleExecuteRequest( } replacementTimeUuids = ch.parameterModifier.generateTimeUuids(prepareRequestInfo) - newOriginRequest := clientRequest.Clone() + newOriginRequest := clientRequest.DeepCopy() _, err = ch.parameterModifier.AddValuesToExecuteFrame( newOriginRequest, prepareRequestInfo, preparedData.GetOriginVariablesMetadata(), replacementTimeUuids) if err != nil { @@ -1677,7 +1677,7 @@ func (ch *ClientHandler) handleExecuteRequest( return nil, nil, nil, fmt.Errorf("could not decode execute raw frame: %w", err) } - newTargetRequest := clientRequest.Clone() + newTargetRequest := clientRequest.DeepCopy() var newTargetExecuteMsg *message.Execute if len(replacedTerms) > 0 { if replacementTimeUuids == nil { @@ -1726,7 +1726,7 @@ func (ch *ClientHandler) handleBatchRequest( var newOriginRequest *frame.Frame var newOriginBatchMsg *message.Batch - newTargetRequest := decodedFrame.Clone() + newTargetRequest := decodedFrame.DeepCopy() newTargetBatchMsg, ok := newTargetRequest.Body.Message.(*message.Batch) if !ok { return nil, nil, fmt.Errorf("expected Batch but got %v instead", newTargetRequest.Body.Message.GetOpCode()) @@ -1736,7 +1736,7 @@ func (ch *ClientHandler) handleBatchRequest( prepareRequestInfo := preparedData.GetPrepareRequestInfo() if len(prepareRequestInfo.GetReplacedTerms()) > 0 { if newOriginRequest == nil { - newOriginRequest = decodedFrame.Clone() + newOriginRequest = decodedFrame.DeepCopy() newOriginBatchMsg, ok = newOriginRequest.Body.Message.(*message.Batch) if !ok { return nil, nil, fmt.Errorf("expected Batch but got %v instead", newOriginRequest.Body.Message.GetOpCode()) @@ -1754,8 +1754,8 @@ func (ch *ClientHandler) handleBatchRequest( } } - originalQueryId := newTargetBatchMsg.Children[stmtIdx].QueryOrId.([]byte) - newTargetBatchMsg.Children[stmtIdx].QueryOrId = preparedData.GetTargetPreparedId() + originalQueryId := newTargetBatchMsg.Children[stmtIdx].Id + newTargetBatchMsg.Children[stmtIdx].Id = preparedData.GetTargetPreparedId() log.Tracef("Replacing prepared ID %s within a BATCH with %s for target cluster.", hex.EncodeToString(originalQueryId), hex.EncodeToString(preparedData.GetTargetPreparedId())) } @@ -1803,7 +1803,7 @@ func (ch *ClientHandler) sendToAsyncConnector( } if sendAlsoToAsync { - asyncRequest = asyncRequest.Clone() // forwardToAsyncOnly requests don't need to be cloned because they are only sent to 1 connector + asyncRequest = asyncRequest.DeepCopy() // forwardToAsyncOnly requests don't need to be cloned because they are only sent to 1 connector } if isFireAndForget { diff --git a/proxy/pkg/zdmproxy/controlconn.go b/proxy/pkg/zdmproxy/controlconn.go index 394c8ae7..47fb09ce 100644 --- a/proxy/pkg/zdmproxy/controlconn.go +++ b/proxy/pkg/zdmproxy/controlconn.go @@ -378,7 +378,7 @@ func (cc *ControlConn) connAndNegotiateProtoVer(endpoint Endpoint, initialProtoV cc.connConfig.GetClusterType(), endpoint.GetEndpointIdentifier(), err) return nil, err } - newConn := NewCqlConnection(tcpConn, cc.username, cc.password, ccReadTimeout, ccWriteTimeout, cc.conf, protoVer) + newConn := NewCqlConnection(endpoint, tcpConn, cc.username, cc.password, ccReadTimeout, ccWriteTimeout, cc.conf, protoVer) err = newConn.InitializeContext(protoVer, ctx) var respErr *ResponseError if err != nil && errors.As(err, &respErr) && respErr.IsProtocolError() && strings.Contains(err.Error(), "Invalid or unsupported protocol version") { @@ -435,8 +435,7 @@ func (cc *ControlConn) RefreshHosts(conn CqlConnection, ctx context.Context) ([] return nil, fmt.Errorf("could not fetch information from system.local table: %w", err) } - localInfo, localHost, err := ParseSystemLocalResult(localQueryResult, cc.defaultPort) - // localHost may be nil, if we did not find the address in system.local table + localInfo, localHost, err := ParseSystemLocalResult(localQueryResult, conn.GetEndpoint(), cc.defaultPort) if err != nil { return nil, err } diff --git a/proxy/pkg/zdmproxy/cqlconn.go b/proxy/pkg/zdmproxy/cqlconn.go index c413cb50..929ec46d 100644 --- a/proxy/pkg/zdmproxy/cqlconn.go +++ b/proxy/pkg/zdmproxy/cqlconn.go @@ -31,6 +31,7 @@ const ( ) type CqlConnection interface { + GetEndpoint() Endpoint IsInitialized() bool InitializeContext(version primitive.ProtocolVersion, ctx context.Context) error SendAndReceive(request *frame.Frame, ctx context.Context) (*frame.Frame, error) @@ -48,6 +49,7 @@ type CqlConnection interface { type cqlConn struct { readTimeout time.Duration writeTimeout time.Duration + endpoint Endpoint conn net.Conn credentials *AuthCredentials initialized bool @@ -71,12 +73,16 @@ var ( StreamIdMismatchErr = errors.New("stream id of the response is different from the stream id of the request") ) +func (c *cqlConn) GetEndpoint() Endpoint { + return c.endpoint +} + func (c *cqlConn) String() string { return fmt.Sprintf("cqlConn{conn: %v}", c.conn.RemoteAddr().String()) } func NewCqlConnection( - conn net.Conn, + endpoint Endpoint, conn net.Conn, username string, password string, readTimeout time.Duration, writeTimeout time.Duration, conf *config.Config, protoVer primitive.ProtocolVersion) CqlConnection { @@ -84,6 +90,7 @@ func NewCqlConnection( cqlConn := &cqlConn{ readTimeout: readTimeout, writeTimeout: writeTimeout, + endpoint: endpoint, conn: conn, credentials: &AuthCredentials{ Username: username, diff --git a/proxy/pkg/zdmproxy/cqlparser.go b/proxy/pkg/zdmproxy/cqlparser.go index 56446ba9..937afe73 100644 --- a/proxy/pkg/zdmproxy/cqlparser.go +++ b/proxy/pkg/zdmproxy/cqlparser.go @@ -115,15 +115,13 @@ func buildRequestInfo( } preparedDataByStmtIdxMap := make(map[int]PreparedData) for childIdx, child := range batchMsg.Children { - switch queryOrId := child.QueryOrId.(type) { - case []byte: - preparedData, err := getPreparedData(psCache, mh, queryOrId, primitive.OpCodeBatch, decodedFrame) + if child.Id != nil { + preparedData, err := getPreparedData(psCache, mh, child.Id, primitive.OpCodeBatch, decodedFrame) if err != nil { return nil, err } else { preparedDataByStmtIdxMap[childIdx] = preparedData } - default: } } return NewBatchRequestInfo(preparedDataByStmtIdxMap), nil @@ -352,11 +350,10 @@ func (recv *frameDecodeContext) inspectStatements(currentKeyspace string, timeUu currentKeyspace = typedMsg.Keyspace } for idx, childStmt := range typedMsg.Children { - switch typedQueryOrId := childStmt.QueryOrId.(type) { - case string: + if len(childStmt.Query) > 0 { statementsQueryData = append( statementsQueryData, &statementQueryData{ - statementIndex: idx, queryData: inspectCqlQuery(typedQueryOrId, currentKeyspace, timeUuidGenerator)}) + statementIndex: idx, queryData: inspectCqlQuery(childStmt.Query, currentKeyspace, timeUuidGenerator)}) } } default: diff --git a/proxy/pkg/zdmproxy/cqlparser_adv_workloads_utils_test.go b/proxy/pkg/zdmproxy/cqlparser_adv_workloads_utils_test.go index 4ae50b02..46cefec9 100644 --- a/proxy/pkg/zdmproxy/cqlparser_adv_workloads_utils_test.go +++ b/proxy/pkg/zdmproxy/cqlparser_adv_workloads_utils_test.go @@ -39,6 +39,8 @@ func getGeneralParamsForTests(t *testing.T) params { } func buildQueryMessageForTests(queryString string) *message.Query { + var defaultTimestamp int64 = 1647023221311969 + var serialConsistency = primitive.ConsistencyLevelLocalSerial return &message.Query{ Query: queryString, Options: &message.QueryOptions{ @@ -49,8 +51,8 @@ func buildQueryMessageForTests(queryString string) *message.Query { PageSize: 5000, PageSizeInBytes: false, PagingState: nil, - SerialConsistency: &primitive.NillableConsistencyLevel{Value: primitive.ConsistencyLevelLocalSerial}, - DefaultTimestamp: &primitive.NillableInt64{Value: 1647023221311969}, + SerialConsistency: &serialConsistency, + DefaultTimestamp: &defaultTimestamp, Keyspace: "", NowInSeconds: nil, ContinuousPagingOptions: &message.ContinuousPagingOptions{ diff --git a/proxy/pkg/zdmproxy/cqlparser_test.go b/proxy/pkg/zdmproxy/cqlparser_test.go index a211abba..198cf822 100644 --- a/proxy/pkg/zdmproxy/cqlparser_test.go +++ b/proxy/pkg/zdmproxy/cqlparser_test.go @@ -182,7 +182,15 @@ func mockExecuteFrame(t *testing.T, preparedId string) *frame.RawFrame { } func mockBatch(t *testing.T, query interface{}) *frame.RawFrame { - batchMsg := &message.Batch{Children: []*message.BatchChild{{QueryOrId: query}}} + var child message.BatchChild + switch query.(type) { + case []byte: + child = message.BatchChild{Id: query.([]byte)} + default: + child = message.BatchChild{Query: query.(string)} + + } + batchMsg := &message.Batch{Children: []*message.BatchChild{&child}} return mockFrame(t, batchMsg, primitive.ProtocolVersion4) } diff --git a/proxy/pkg/zdmproxy/frameprocessor.go b/proxy/pkg/zdmproxy/frameprocessor.go index ee392a61..2beb62b1 100644 --- a/proxy/pkg/zdmproxy/frameprocessor.go +++ b/proxy/pkg/zdmproxy/frameprocessor.go @@ -84,7 +84,7 @@ func setRawFrameStreamId(f *frame.RawFrame, id int16) *frame.RawFrame { if f.Header.StreamId == id { return f } - newHeader := f.Header.Clone() + newHeader := f.Header.DeepCopy() newHeader.StreamId = id return &frame.RawFrame{ Header: newHeader, @@ -98,7 +98,7 @@ func setFrameStreamId(f *frame.Frame, id int16) *frame.Frame { if f.Header.StreamId == id { return f } - newHeader := f.Header.Clone() + newHeader := f.Header.DeepCopy() newHeader.StreamId = id return &frame.Frame{ Header: newHeader, diff --git a/proxy/pkg/zdmproxy/host.go b/proxy/pkg/zdmproxy/host.go index b37e4753..bc2168ca 100644 --- a/proxy/pkg/zdmproxy/host.go +++ b/proxy/pkg/zdmproxy/host.go @@ -7,6 +7,8 @@ import ( "github.com/google/uuid" log "github.com/sirupsen/logrus" "net" + "strconv" + "strings" ) type Host struct { @@ -48,7 +50,7 @@ func (recv *Host) String() string { hex.EncodeToString(recv.HostId[:])) } -func ParseSystemLocalResult(rs *ParsedRowSet, defaultPort int) (map[string]*optionalColumn, *Host, error) { +func ParseSystemLocalResult(rs *ParsedRowSet, ccEndpoint Endpoint, defaultPort int) (map[string]*optionalColumn, *Host, error) { if len(rs.Rows) < 1 { return nil, nil, fmt.Errorf("could not parse system local query result: query returned %d rows", len(rs.Rows)) } @@ -60,17 +62,17 @@ func ParseSystemLocalResult(rs *ParsedRowSet, defaultPort int) (map[string]*opti row := rs.Rows[0] addr, port, err := ParseRpcAddress(false, row, defaultPort) + if addr == nil { + // could not resolve address from system.local table (e.g. not present in C* 2.0.0) + addr, port, err = ParseEndpoint(ccEndpoint) + } if err != nil { return nil, nil, err } - var host *Host - if addr != nil { - // could not resolve address from system.local table (e.g. not present in C* 2.0.0) - host, err = parseHost(addr, port, row) - if err != nil { - return nil, nil, err - } + host, err := parseHost(addr, port, row) + if err != nil { + return nil, nil, err } sysLocalCols := map[string]*optionalColumn{ @@ -222,6 +224,20 @@ func ParseRpcAddress(isPeersV2 bool, row *ParsedRow, defaultPort int) (net.IP, i return addr, rpcPort, nil } +func ParseEndpoint(endpoint Endpoint) (net.IP, int, error) { + socketEndpoint := endpoint.GetSocketEndpoint() + parts := strings.Split(socketEndpoint, ":") + if len(parts) != 2 { + return nil, -1, fmt.Errorf("invalid endpoint: %s", socketEndpoint) + } + addr := parts[0] + port, err := strconv.Atoi(parts[1]) + if err != nil { + return nil, -1, fmt.Errorf("invalid endpoint: %s", socketEndpoint) + } + return net.ParseIP(addr), port, nil +} + func parseRpcPortPeersV2(row *ParsedRow) (int, bool) { val, ok := row.GetByColumn("native_port") if ok && val != nil { diff --git a/proxy/pkg/zdmproxy/nativeprotocol.go b/proxy/pkg/zdmproxy/nativeprotocol.go index 8f3515ed..98b0dfe1 100644 --- a/proxy/pkg/zdmproxy/nativeprotocol.go +++ b/proxy/pkg/zdmproxy/nativeprotocol.go @@ -262,10 +262,10 @@ var ( storagePortColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemLocalTableName, Name: "storage_port", Type: datatype.Int} storagePortSslColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemLocalTableName, Name: "storage_port_ssl", Type: datatype.Int} thriftVersionColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemLocalTableName, Name: "thrift_version", Type: datatype.Varchar} - tokensColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemLocalTableName, Name: "tokens", Type: datatype.NewSetType(datatype.Varchar)} - truncatedAtColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemLocalTableName, Name: "truncated_at", Type: datatype.NewMapType(datatype.Uuid, datatype.Blob)} + tokensColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemLocalTableName, Name: "tokens", Type: datatype.NewSet(datatype.Varchar)} + truncatedAtColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemLocalTableName, Name: "truncated_at", Type: datatype.NewMap(datatype.Uuid, datatype.Blob)} workloadColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemLocalTableName, Name: "workload", Type: datatype.Varchar} - workloadsColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemLocalTableName, Name: "workloads", Type: datatype.NewSetType(datatype.Varchar)} + workloadsColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemLocalTableName, Name: "workloads", Type: datatype.NewSet(datatype.Varchar)} ) var systemLocalColumns = []*message.ColumnMetadata{ @@ -367,7 +367,7 @@ func columnFromSelector( } // we are assuming here that resultColumn always refers to an unaliased column because the cql grammar doesn't support alias recursion - aliasedColumn := resultColumn.Clone() + aliasedColumn := resultColumn.DeepCopy() aliasedColumn.Name = s.alias return aliasedColumn, isCountSelector, nil default: @@ -605,9 +605,9 @@ var ( serverIdPeersColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemPeersTableName, Name: "server_id", Type: datatype.Varchar} storagePortPeersColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemPeersTableName, Name: "storage_port", Type: datatype.Int} storagePortSslPeersColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemPeersTableName, Name: "storage_port_ssl", Type: datatype.Int} - tokensPeersColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemPeersTableName, Name: "tokens", Type: datatype.NewSetType(datatype.Varchar)} + tokensPeersColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemPeersTableName, Name: "tokens", Type: datatype.NewSet(datatype.Varchar)} workloadPeersColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemPeersTableName, Name: "workload", Type: datatype.Varchar} - workloadsPeersColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemPeersTableName, Name: "workloads", Type: datatype.NewSetType(datatype.Varchar)} + workloadsPeersColumn = &message.ColumnMetadata{Keyspace: systemKeyspaceName, Table: systemPeersTableName, Name: "workloads", Type: datatype.NewSet(datatype.Varchar)} ) var systemPeersColumns = []*message.ColumnMetadata{ diff --git a/proxy/pkg/zdmproxy/parametermodifier_test.go b/proxy/pkg/zdmproxy/parametermodifier_test.go index dc0f99cf..0cb85ade 100644 --- a/proxy/pkg/zdmproxy/parametermodifier_test.go +++ b/proxy/pkg/zdmproxy/parametermodifier_test.go @@ -25,7 +25,7 @@ func TestAddValuesToExecuteFrame_NoReplacedTerms(t *testing.T) { PkIndices: nil, Columns: nil, } - fClone := f.Clone() + fClone := f.DeepCopy() replacementTimeUuids := parameterModifier.generateTimeUuids(prepareRequestInfo) newMsg, err := parameterModifier.AddValuesToExecuteFrame(fClone, prepareRequestInfo, variablesMetadata, replacementTimeUuids) require.Same(t, fClone.Body.Message, newMsg) @@ -198,7 +198,7 @@ func TestAddValuesToExecuteFrame_PositionalValues(t *testing.T) { require.Nil(t, err) parameterModifier := NewParameterModifier(generator) queryOpts := &message.QueryOptions{PositionalValues: requestPosVals} - clonedQueryOpts := queryOpts.Clone() // we use this so that we keep the "original" request options + clonedQueryOpts := queryOpts.DeepCopy() // we use this so that we keep the "original" request options f := frame.NewFrame(primitive.ProtocolVersion4, 1, &message.Execute{ QueryId: nil, ResultMetadataId: nil, @@ -344,7 +344,7 @@ func TestAddValuesToExecuteFrame_NamedValues(t *testing.T) { require.Nil(t, err) parameterModifier := NewParameterModifier(generator) queryOpts := &message.QueryOptions{NamedValues: requestNamedVals} - clonedQueryOpts := queryOpts.Clone() // we use this so that we keep the "original" request options + clonedQueryOpts := queryOpts.DeepCopy() // we use this so that we keep the "original" request options f := frame.NewFrame(primitive.ProtocolVersion4, 1, &message.Execute{ QueryId: nil, ResultMetadataId: nil, diff --git a/proxy/pkg/zdmproxy/querymodifier.go b/proxy/pkg/zdmproxy/querymodifier.go index 91eb9c05..d2a0dd25 100644 --- a/proxy/pkg/zdmproxy/querymodifier.go +++ b/proxy/pkg/zdmproxy/querymodifier.go @@ -89,7 +89,7 @@ func (recv *QueryModifier) replaceQueryInBatchMessage( return decodedFrame, []*statementReplacedTerms{}, statementsQueryData, nil } - newFrame := decodedFrame.Clone() + newFrame := decodedFrame.DeepCopy() newBatchMsg, ok := newFrame.Body.Message.(*message.Batch) if !ok { return nil, nil, nil, fmt.Errorf("expected Batch in cloned frame but got %v instead", newFrame.Body.Message.GetOpCode()) @@ -100,7 +100,7 @@ func (recv *QueryModifier) replaceQueryInBatchMessage( return nil, nil, nil, fmt.Errorf("new query data statement index (%v) is greater or equal than "+ "number of batch child statements (%v)", newStmtQueryData.statementIndex, len(newBatchMsg.Children)) } - newBatchMsg.Children[newStmtQueryData.statementIndex].QueryOrId = newStmtQueryData.queryData.getQuery() + newBatchMsg.Children[newStmtQueryData.statementIndex].Query = newStmtQueryData.queryData.getQuery() } return newFrame, statementsReplacedTerms, newStatementsQueryData, nil @@ -117,7 +117,7 @@ func (recv *QueryModifier) replaceQueryInQueryMessage( return decodedFrame, []*statementReplacedTerms{}, statementsQueryData, nil } newQueryData, replacedTerms := stmtQueryData.queryData.replaceNowFunctionCallsWithLiteral() - newFrame := decodedFrame.Clone() + newFrame := decodedFrame.DeepCopy() newQueryMsg, ok := newFrame.Body.Message.(*message.Query) if !ok { return nil, nil, nil, fmt.Errorf("expected Query in cloned frame but got %v instead", newFrame.Body.Message.GetOpCode()) @@ -143,7 +143,7 @@ func (recv *QueryModifier) replaceQueryInPrepareMessage( } else { newQueryData, replacedTerms = stmtQueryData.queryData.replaceNowFunctionCallsWithPositionalBindMarkers() } - newFrame := decodedFrame.Clone() + newFrame := decodedFrame.DeepCopy() newPrepareMsg, ok := newFrame.Body.Message.(*message.Prepare) if !ok { return nil, nil, nil, fmt.Errorf("expected Prepare in cloned frame but got %v instead", newFrame.Body.Message.GetOpCode()) diff --git a/proxy/pkg/zdmproxy/querymodifier_test.go b/proxy/pkg/zdmproxy/querymodifier_test.go index d7634f84..da315367 100644 --- a/proxy/pkg/zdmproxy/querymodifier_test.go +++ b/proxy/pkg/zdmproxy/querymodifier_test.go @@ -123,7 +123,7 @@ func TestReplaceQueryString(t *testing.T) { {"OpCodeBatch Mixed Prepared and Simple", mockBatchWithChildren(t, []*message.BatchChild{ { - QueryOrId: "UPDATE blah SET a = ?, b = 123 " + + Query: "UPDATE blah SET a = ?, b = 123 " + "WHERE f[now()] = ? IF " + "g[123] IN (2, 3, ?, now(), ?, now()) AND " + "d IN ? AND " + @@ -132,12 +132,12 @@ func TestReplaceQueryString(t *testing.T) { Values: []*primitive.Value{}, // not used by the SUT (system under test) }, { - QueryOrId: []byte{0}, - Values: []*primitive.Value{}, // not used by the SUT + Id: []byte{0}, + Values: []*primitive.Value{}, // not used by the SUT }, { - QueryOrId: "DELETE FROM blah WHERE b = 123 AND a = now()", - Values: []*primitive.Value{}, // not used by the SUT + Query: "DELETE FROM blah WHERE b = 123 AND a = now()", + Values: []*primitive.Value{}, // not used by the SUT }}), []*statementReplacedTerms{ {statementIndex: 0, replacedTerms: []*term{