Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add an Arrow-based, columnar binlog buffer #91

Merged
merged 22 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.23'
go-version: '1.22'
GaoYusong marked this conversation as resolved.
Show resolved Hide resolved

- name: Set up Python
uses: actions/setup-python@v5
Expand Down
19 changes: 13 additions & 6 deletions backend/connpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,30 @@ import (

"github.com/apecloud/myduckserver/catalog"
"github.com/dolthub/go-mysql-server/sql"
"github.com/marcboeker/go-duckdb"
"github.com/sirupsen/logrus"
)

type ConnectionPool struct {
*stdsql.DB
catalog string
conns sync.Map // concurrent-safe map[uint32]*stdsql.Conn
txns sync.Map // concurrent-safe map[uint32]*stdsql.Tx
connector *duckdb.Connector
catalog string
conns sync.Map // concurrent-safe map[uint32]*stdsql.Conn
txns sync.Map // concurrent-safe map[uint32]*stdsql.Tx
}

func NewConnectionPool(catalog string, db *stdsql.DB) *ConnectionPool {
func NewConnectionPool(catalog string, connector *duckdb.Connector, db *stdsql.DB) *ConnectionPool {
return &ConnectionPool{
DB: db,
catalog: catalog,
DB: db,
connector: connector,
catalog: catalog,
}
}

func (p *ConnectionPool) Connector() *duckdb.Connector {
return p.connector
}

func (p *ConnectionPool) GetConn(ctx context.Context, id uint32) (*stdsql.Conn, error) {
var conn *stdsql.Conn
entry, ok := p.conns.Load(id)
Expand Down
1 change: 1 addition & 0 deletions binlog/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The code in this directory was copied and modified from [the Vitess project](https://github.com/vitessio/vitess) (as of 2024-09-19, https://github.com/vitessio/vitess/blob/main/go/mysql/binlog/). The original code is licensed under the Apache License, Version 2.0. The modifications are also licensed under the Apache License, Version 2.0. The goal is to bypass unnecessary string conversion and memory allocation in the original code.
152 changes: 152 additions & 0 deletions binlog/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
Copyright 2023 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package binlog

// This is the data type for a field.
// Values taken from include/mysql/mysql_com.h
const (
// TypeDecimal is MYSQL_TYPE_DECIMAL. It is deprecated.
TypeDecimal = 0

// TypeTiny is MYSQL_TYPE_TINY
TypeTiny = 1

// TypeShort is MYSQL_TYPE_SHORT
TypeShort = 2

// TypeLong is MYSQL_TYPE_LONG
TypeLong = 3

// TypeFloat is MYSQL_TYPE_FLOAT
TypeFloat = 4

// TypeDouble is MYSQL_TYPE_DOUBLE
TypeDouble = 5

// TypeNull is MYSQL_TYPE_NULL
TypeNull = 6

// TypeTimestamp is MYSQL_TYPE_TIMESTAMP
TypeTimestamp = 7

// TypeLongLong is MYSQL_TYPE_LONGLONG
TypeLongLong = 8

// TypeInt24 is MYSQL_TYPE_INT24
TypeInt24 = 9

// TypeDate is MYSQL_TYPE_DATE
TypeDate = 10

// TypeTime is MYSQL_TYPE_TIME
TypeTime = 11

// TypeDateTime is MYSQL_TYPE_DATETIME
TypeDateTime = 12

// TypeYear is MYSQL_TYPE_YEAR
TypeYear = 13

// TypeNewDate is MYSQL_TYPE_NEWDATE
TypeNewDate = 14

// TypeVarchar is MYSQL_TYPE_VARCHAR
TypeVarchar = 15

// TypeBit is MYSQL_TYPE_BIT
TypeBit = 16

// TypeTimestamp2 is MYSQL_TYPE_TIMESTAMP2
TypeTimestamp2 = 17

// TypeDateTime2 is MYSQL_TYPE_DATETIME2
TypeDateTime2 = 18

// TypeTime2 is MYSQL_TYPE_TIME2
TypeTime2 = 19

// TypeVector is MYSQL_TYPE_VECTOR
TypeVector = 242

// TypeJSON is MYSQL_TYPE_JSON
TypeJSON = 245

// TypeNewDecimal is MYSQL_TYPE_NEWDECIMAL
TypeNewDecimal = 246

// TypeEnum is MYSQL_TYPE_ENUM
TypeEnum = 247

// TypeSet is MYSQL_TYPE_SET
TypeSet = 248

// TypeTinyBlob is MYSQL_TYPE_TINY_BLOB
TypeTinyBlob = 249

// TypeMediumBlob is MYSQL_TYPE_MEDIUM_BLOB
TypeMediumBlob = 250

// TypeLongBlob is MYSQL_TYPE_LONG_BLOB
TypeLongBlob = 251

// TypeBlob is MYSQL_TYPE_BLOB
TypeBlob = 252

// TypeVarString is MYSQL_TYPE_VAR_STRING
TypeVarString = 253

// TypeString is MYSQL_TYPE_STRING
TypeString = 254

// TypeGeometry is MYSQL_TYPE_GEOMETRY
TypeGeometry = 255
)

var TypeNames = map[byte]string{
TypeDecimal: "DECIMAL",
TypeTiny: "TINY",
TypeShort: "SHORT",
TypeLong: "LONG",
TypeFloat: "FLOAT",
TypeDouble: "DOUBLE",
TypeNull: "NULL",
TypeTimestamp: "TIMESTAMP",
TypeLongLong: "LONGLONG",
TypeInt24: "INT24",
TypeDate: "DATE",
TypeTime: "TIME",
TypeDateTime: "DATETIME",
TypeYear: "YEAR",
TypeNewDate: "NEWDATE",
TypeVarchar: "VARCHAR",
TypeBit: "BIT",
TypeTimestamp2: "TIMESTAMP2",
TypeDateTime2: "DATETIME2",
TypeTime2: "TIME2",
TypeVector: "VECTOR",
TypeJSON: "JSON",
TypeNewDecimal: "NEWDECIMAL",
TypeEnum: "ENUM",
TypeSet: "SET",
TypeTinyBlob: "TINY_BLOB",
TypeMediumBlob: "MEDIUM_BLOB",
TypeLongBlob: "LONG_BLOB",
TypeBlob: "BLOB",
TypeVarString: "VAR_STRING",
TypeString: "STRING",
TypeGeometry: "GEOMETRY",
}
37 changes: 37 additions & 0 deletions binlog/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2024-2025 ApeCloud, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package binlog

type RowEventType int8

const (
// IMPORTANT: The order of these values is important.
// We translate UPDATE to DELETE + INSERT, so DELETE should come first.
DeleteRowEvent RowEventType = iota
UpdateRowEvent
InsertRowEvent
)

func (e RowEventType) String() string {
switch e {
case DeleteRowEvent:
return "DELETE"
case UpdateRowEvent:
return "UPDATE"
case InsertRowEvent:
return "INSERT"
default:
return "UNKNOWN"
}
}
Loading