diff --git a/README.md b/README.md index 14af446..63f11e3 100644 --- a/README.md +++ b/README.md @@ -114,8 +114,8 @@ func Handler(ctx *replication.ListenerContext) { * [Simple](./example/simple) * [Simple File Config](./example/simple-file-config) -* [PostgreSQL to Elasticsearch](./example/elasticsearch) -* [PostgreSQL to Kafka](./example/kafka) +* [PostgreSQL to Elasticsearch](https://github.com/Trendyol/go-pq-cdc-elasticsearch/tree/main/example/simple) +* [PostgreSQL to Kafka](https://github.com/Trendyol/go-pq-cdc-kafka/tree/main/example/simple) * [PostgreSQL to PostgreSQL](./example/postgresql) ### Availability @@ -167,19 +167,19 @@ This setup ensures continuous data synchronization and minimal downtime in captu The client collects relevant metrics related to PostgreSQL change data capture (CDC) and makes them available at the `/metrics` endpoint. -| Metric Name | Description | Labels | Value Type | -|-----------------------------------------------------|-------------------------------------------------------------------------------------------------------|-----------|------------| -| go_pq_cdc_update_total | The total number of `UPDATE` operations captured on specific tables. | slot_name | Counter | -| go_pq_cdc_delete_total | The total number of `DELETE` operations captured on specific tables. | slot_name | Counter | -| go_pq_cdc_insert_total | The total number of `INSERT` operations captured on specific tables. | slot_name | Counter | -| go_pq_cdc_cdc_latency_current | The current latency in capturing data changes from PostgreSQL. | slot_name | Gauge | -| go_pq_cdc_process_latency_current | The current latency in processing the captured data changes. | slot_name | Gauge | -| go_pq_cdc_replication_slot_slot_confirmed_flush_lsn | The last confirmed flush Log Sequence Number (LSN) in the PostgreSQL replication slot. | slot_name | Gauge | -| go_pq_cdc_replication_slot_slot_current_lsn | The current Log Sequence Number (LSN) being processed in the PostgreSQL replication slot. | slot_name | Gauge | -| go_pq_cdc_replication_slot_slot_is_active | Indicates whether the PostgreSQL replication slot is currently active (1 for active, 0 for inactive). | slot_name | Gauge | -| go_pq_cdc_replication_slot_slot_lag | The replication lag measured by the difference between the current LSN and the confirmed flush LSN. | slot_name | Gauge | -| go_pq_cdc_replication_slot_slot_retained_wal_size | The size of Write-Ahead Logging (WAL) files retained for the replication slot in bytes. | slot_name | Gauge | -| runtime metrics | [Prometheus Collector](https://golang.bg/src/runtime/metrics/description.go) | N/A | N/A | +| Metric Name | Description | Labels | Value Type | +|-----------------------------------------------------|-------------------------------------------------------------------------------------------------------|----------------|------------| +| go_pq_cdc_update_total | The total number of `UPDATE` operations captured on specific tables. | slot_name, host| Counter | +| go_pq_cdc_delete_total | The total number of `DELETE` operations captured on specific tables. | slot_name, host| Counter | +| go_pq_cdc_insert_total | The total number of `INSERT` operations captured on specific tables. | slot_name, host| Counter | +| go_pq_cdc_cdc_latency_current | The current latency in capturing data changes from PostgreSQL. | slot_name, host| Gauge | +| go_pq_cdc_process_latency_current | The current latency in processing the captured data changes. | slot_name, host| Gauge | +| go_pq_cdc_replication_slot_slot_confirmed_flush_lsn | The last confirmed flush Log Sequence Number (LSN) in the PostgreSQL replication slot. | slot_name, host| Gauge | +| go_pq_cdc_replication_slot_slot_current_lsn | The current Log Sequence Number (LSN) being processed in the PostgreSQL replication slot. | slot_name, host| Gauge | +| go_pq_cdc_replication_slot_slot_is_active | Indicates whether the PostgreSQL replication slot is currently active (1 for active, 0 for inactive). | slot_name, host| Gauge | +| go_pq_cdc_replication_slot_slot_lag | The replication lag measured by the difference between the current LSN and the confirmed flush LSN. | slot_name, host| Gauge | +| go_pq_cdc_replication_slot_slot_retained_wal_size | The size of Write-Ahead Logging (WAL) files retained for the replication slot in bytes. | slot_name, host| Gauge | +| runtime metrics | [Prometheus Collector](https://golang.bg/src/runtime/metrics/description.go) | N/A | N/A | ### Grafana Dashboard diff --git a/example/elasticsearch/docker-compose.yml b/example/elasticsearch/docker-compose.yml deleted file mode 100644 index d746aac..0000000 --- a/example/elasticsearch/docker-compose.yml +++ /dev/null @@ -1,80 +0,0 @@ -version: "3" -services: - postgres: - image: postgres:16.2 - restart: always - command: ["-c", "wal_level=logical", "-c", "max_wal_senders=10", "-c", "max_replication_slots=10"] - environment: - POSTGRES_USER: "cdc_user" - POSTGRES_PASSWORD: "cdc_pass" - POSTGRES_DB: "cdc_db" - POSTGRES_HOST_AUTH_METHOD: trust - network_mode: "host" - es01: - image: docker.elastic.co/elasticsearch/elasticsearch:7.17.11 - labels: - co.elastic.logs/module: elasticsearch - volumes: - - es01_data:/usr/share/elasticsearch/data - networks: - - cdc_net - ports: - - "9200:9200" - environment: - - node.name=es01 - - cluster.name=cdc - - discovery.type=single-node - - ELASTIC_PASSWORD=cdc_pass - - bootstrap.memory_lock=true - - xpack.security.enabled=false - - xpack.security.http.ssl.enabled=false - - xpack.security.transport.ssl.enabled=false - ulimits: - memlock: - soft: -1 - hard: -1 - healthcheck: - test: - [ - "CMD-SHELL", - "curl -I -o /dev/null http://localhost:9200 -w '%{http_code}' | grep -q '200'", - ] - interval: 10s - timeout: 10s - retries: 120 - - kibana: - depends_on: - es01: - condition: service_healthy - image: docker.elastic.co/kibana/kibana:7.17.11 - networks: - - cdc_net - labels: - co.elastic.logs/module: kibana - volumes: - - kibana1_data:/usr/share/kibana/data - ports: - - "5601:5601" - environment: - - SERVERNAME=kibana - - ELASTICSEARCH_HOSTS=http://es01:9200 - - ELASTICSEARCH_USERNAME=kibana_system - - ELASTICSEARCH_PASSWORD=cdc_pass - healthcheck: - test: - [ - "CMD-SHELL", - "curl -s -I http://localhost:5601 | grep -q 'HTTP/1.1 302 Found'", - ] - interval: 10s - timeout: 10s - retries: 120 - -volumes: - postgres_data: null - kibana1_data: null - es01_data: null -networks: - cdc_net: - driver: bridge \ No newline at end of file diff --git a/example/elasticsearch/go.mod b/example/elasticsearch/go.mod deleted file mode 100644 index e27285f..0000000 --- a/example/elasticsearch/go.mod +++ /dev/null @@ -1,30 +0,0 @@ -module github.com/Trendyol/go-pq-cdc/example/elasticsearch - -go 1.22.4 - -replace github.com/Trendyol/go-pq-cdc => ../.. - -require ( - github.com/Trendyol/go-pq-cdc v0.0.0-00010101000000-000000000000 - github.com/elastic/go-elasticsearch/v7 v7.17.10 -) - -require ( - github.com/avast/retry-go/v4 v4.6.0 // indirect - github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/go-playground/errors v3.3.0+incompatible // indirect - github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect - github.com/jackc/pgx/v5 v5.6.0 // indirect - github.com/lib/pq v1.10.9 // indirect - github.com/prometheus/client_golang v1.19.1 // indirect - github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.48.0 // indirect - github.com/prometheus/procfs v0.12.0 // indirect - golang.org/x/crypto v0.22.0 // indirect - golang.org/x/sys v0.19.0 // indirect - golang.org/x/text v0.14.0 // indirect - google.golang.org/protobuf v1.34.1 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect -) diff --git a/example/elasticsearch/go.sum b/example/elasticsearch/go.sum deleted file mode 100644 index 141401b..0000000 --- a/example/elasticsearch/go.sum +++ /dev/null @@ -1,64 +0,0 @@ -github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA= -github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE= -github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= -github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= -github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= -github.com/go-playground/errors v3.3.0+incompatible h1:w7qP6bdFXNmI86aV8VEfhXrGxoQWYHc/OX4Muw4FgW0= -github.com/go-playground/errors v3.3.0+incompatible/go.mod h1:n+RcthKmtLxDczVHKkhqiUSOGtTjvRl+HB4Gga0vWSI= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= -github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA= -github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= -github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= -github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= -github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= -github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= -github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= -github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= -github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= -github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= -github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= -github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= -github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= -github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= -github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= -github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= -golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= -golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= -google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/example/elasticsearch/main.go b/example/elasticsearch/main.go deleted file mode 100644 index 4a4ebc5..0000000 --- a/example/elasticsearch/main.go +++ /dev/null @@ -1,218 +0,0 @@ -package main - -import ( - "bytes" - "context" - "encoding/json" - cdc "github.com/Trendyol/go-pq-cdc" - "github.com/Trendyol/go-pq-cdc/config" - "github.com/Trendyol/go-pq-cdc/pq/message/format" - "github.com/Trendyol/go-pq-cdc/pq/publication" - "github.com/Trendyol/go-pq-cdc/pq/replication" - "github.com/Trendyol/go-pq-cdc/pq/slot" - "github.com/elastic/go-elasticsearch/v7" - "github.com/elastic/go-elasticsearch/v7/esutil" - "log/slog" - "math" - "os" - "runtime" - "strconv" - "time" -) - -/* - psql "postgres://cdc_user:cdc_pass@127.0.0.1/cdc_db?replication=database" - - CREATE TABLE users ( - id serial PRIMARY KEY, - name text NOT NULL, - created_on timestamptz - ); - - INSERT INTO users (name) - SELECT - 'Oyleli' || i - FROM generate_series(1, 100) AS i; -*/ - -type Message struct { - Message esutil.BulkIndexerItem - Ack func() error -} - -func main() { - ctx := context.Background() - - esCfg := elasticsearch.Config{ - MaxRetries: math.MaxInt, - Addresses: []string{"http://localhost:9200"}, - CompressRequestBody: false, - DiscoverNodesOnStart: true, - DiscoverNodesInterval: 5 * time.Minute, - } - - w, err := NewElasticsearchBulkIndexer(esCfg, "cdc_index") - if err != nil { - slog.Error("new elasticsearch bulk indexer", "error", err) - } - - defer func() { - err = w.Close(ctx) - if err != nil { - slog.Error("elasticsearch bulk indexer close", "error", err) - } - }() - - messages := make(chan Message, 10000) - go Produce(ctx, w, messages) - - cfg := config.Config{ - Host: "127.0.0.1", - Username: "cdc_user", - Password: "cdc_pass", - Database: "cdc_db", - Publication: publication.Config{ - CreateIfNotExists: true, - Name: "cdc_publication", - Operations: publication.Operations{ - publication.OperationInsert, - publication.OperationDelete, - publication.OperationTruncate, - publication.OperationUpdate, - }, - Tables: publication.Tables{publication.Table{ - Name: "users", - ReplicaIdentity: publication.ReplicaIdentityFull, - }}, - }, - Slot: slot.Config{ - CreateIfNotExists: true, - Name: "cdc_slot", - SlotActivityCheckerInterval: 3000, - }, - Metric: config.MetricConfig{ - Port: 8081, - }, - Logger: config.LoggerConfig{ - LogLevel: slog.LevelInfo, - }, - } - - connector, err := cdc.NewConnector(ctx, cfg, FilteredMapper(messages)) - if err != nil { - slog.Error("new connector", "error", err) - os.Exit(1) - } - - connector.Start(ctx) -} - -func NewElasticsearchBulkIndexer(cfg elasticsearch.Config, indexName string) (esutil.BulkIndexer, error) { - esClient, err := elasticsearch.NewClient(cfg) - if err != nil { - return nil, err - } - - res, err := esClient.Indices.Create(indexName) - if err != nil { - return nil, err - } - defer res.Body.Close() - - bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ - Index: indexName, - Client: esClient, - NumWorkers: runtime.NumCPU(), - FlushBytes: int(5e+6), - FlushInterval: 100 * time.Millisecond, - }) - if err != nil { - return nil, err - } - - return bi, nil -} - -func FilteredMapper(messages chan Message) replication.ListenerFunc { - return func(ctx *replication.ListenerContext) { - switch msg := ctx.Message.(type) { - case *format.Insert: - encoded, _ := json.Marshal(msg.Decoded) - messages <- Message{ - Message: esutil.BulkIndexerItem{ - Action: "index", - DocumentID: strconv.Itoa(int(msg.Decoded["id"].(int32))), - Body: bytes.NewReader(encoded), - OnSuccess: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { - slog.Info("es insert doc success", "id", item.DocumentID) - if err := ctx.Ack(); err != nil { - slog.Error("ack", "error", err) - } - }, - OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { - if err != nil { - slog.Error("elasticsearch create document", "error", err) - } else { - slog.Error("elasticsearch create document", "type", res.Error.Type, "error", err) - } - }, - }, - Ack: ctx.Ack, - } - case *format.Delete: - messages <- Message{ - Message: esutil.BulkIndexerItem{ - Action: "delete", - DocumentID: strconv.Itoa(int(msg.OldDecoded["id"].(int32))), - OnSuccess: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { - slog.Info("es delete doc success", "id", item.DocumentID) - if err := ctx.Ack(); err != nil { - slog.Error("ack", "error", err) - } - }, - OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { - if err != nil { - slog.Error("elasticsearch delete document", "error", err) - } else { - slog.Error("elasticsearch delete document", "type", res.Error.Type, "error", err) - } - }, - }, - Ack: ctx.Ack, - } - case *format.Update: - encoded, _ := json.Marshal(msg.NewDecoded) - messages <- Message{ - Message: esutil.BulkIndexerItem{ - Action: "update", - DocumentID: strconv.Itoa(int(msg.NewDecoded["id"].(int32))), - Body: bytes.NewReader(encoded), - OnSuccess: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { - slog.Info("es update doc success", "id", item.DocumentID) - if err := ctx.Ack(); err != nil { - slog.Error("ack", "error", err) - } - }, OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { - if err != nil { - slog.Error("elasticsearch update document", "error", err) - } else { - slog.Error("elasticsearch update document", "type", res.Error.Type, "error", err) - } - }, - }, - Ack: ctx.Ack, - } - } - } -} - -func Produce(ctx context.Context, w esutil.BulkIndexer, messages <-chan Message) { - var err error - for { - event := <-messages - err = w.Add(ctx, event.Message) - if err != nil { - slog.Error("elasticsearch bulk indexer item add", "error", err) - } - } -} diff --git a/example/kafka/docker-compose.yml b/example/kafka/docker-compose.yml deleted file mode 100644 index 80d2e7d..0000000 --- a/example/kafka/docker-compose.yml +++ /dev/null @@ -1,113 +0,0 @@ -version: "3" -services: - postgres: - image: postgres:16.2 - restart: always - command: ["-c", "wal_level=logical", "-c", "max_wal_senders=10", "-c", "max_replication_slots=10"] - environment: - POSTGRES_USER: "cdc_user" - POSTGRES_PASSWORD: "cdc_pass" - POSTGRES_DB: "cdc_db" - POSTGRES_HOST_AUTH_METHOD: trust - network_mode: "host" - - ##### KAFKA ###### - redpanda: - image: docker.redpanda.com/redpandadata/redpanda:v23.1.12 - command: - - redpanda start - - --smp 1 - - --overprovisioned - - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092 - - --advertise-kafka-addr internal://redpanda:9092,external://localhost:19092 - - --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082 - - --advertise-pandaproxy-addr internal://redpanda:8082,external://localhost:18082 - - --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081 - - --rpc-addr redpanda:33145 - - --advertise-rpc-addr redpanda:33145 - ports: - - 18081:18081 - - 18082:18082 - - 19092:19092 - - 19644:9644 - volumes: - - redpanda_data:/var/lib/redpanda/data - - ~/pandapost_integration:/tmp/pandapost_integration - networks: - - cdc_net - healthcheck: - test: ["CMD-SHELL", "rpk cluster health | grep -E 'Healthy:.+true' || exit 1"] - interval: 15s - timeout: 3s - retries: 5 - - console: - image: docker.redpanda.com/redpandadata/console:v2.2.4 - entrypoint: /bin/sh - command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console" - environment: - CONFIG_FILEPATH: /tmp/config.yml - CONSOLE_CONFIG_FILE: | - kafka: - brokers: ["redpanda:9092"] - schemaRegistry: - enabled: true - urls: ["http://redpanda:8081"] - redpanda: - adminApi: - enabled: true - urls: ["http://redpanda:9644"] - connect: - enabled: true - clusters: - - name: local-connect-cluster - url: http://connect:8083 - ports: - - 8085:8080 - networks: - - cdc_net - depends_on: - - redpanda - - connect: - image: docker.redpanda.com/redpandadata/connectors:latest - hostname: connect - container_name: connect - networks: - - cdc_net - # platform: 'linux/amd64' - depends_on: - - redpanda - ports: - - "8083:8083" - environment: - CONNECT_CONFIGURATION: | - key.converter=org.apache.kafka.connect.converters.ByteArrayConverter - value.converter=org.apache.kafka.connect.converters.ByteArrayConverter - group.id=connectors-cluster - offset.storage.topic=_internal_connectors_offsets - config.storage.topic=_internal_connectors_configs - status.storage.topic=_internal_connectors_status - config.storage.replication.factor=-1 - offset.storage.replication.factor=-1 - status.storage.replication.factor=-1 - offset.flush.interval.ms=1000 - producer.linger.ms=50 - producer.batch.size=131072 - CONNECT_BOOTSTRAP_SERVERS: redpanda:9092 - CONNECT_GC_LOG_ENABLED: "false" - CONNECT_HEAP_OPTS: -Xms512M -Xmx512M - CONNECT_LOG_LEVEL: info - -volumes: - couchbase_data: null - postgres_data: null - zookeeper_data: null - zookeeper_log: null - kafka_data: null - redpanda_data: null - kibana1_data: null - es01_data: null -networks: - cdc_net: - driver: bridge diff --git a/example/kafka/go.mod b/example/kafka/go.mod deleted file mode 100644 index 4dbeb1a..0000000 --- a/example/kafka/go.mod +++ /dev/null @@ -1,33 +0,0 @@ -module github.com/Trendyol/go-pq-cdc/example/kafka - -go 1.22.4 - -replace github.com/Trendyol/go-pq-cdc => ../.. - -require ( - github.com/Trendyol/go-pq-cdc v0.0.0-00010101000000-000000000000 - github.com/google/uuid v1.6.0 - github.com/segmentio/kafka-go v0.4.47 -) - -require ( - github.com/avast/retry-go/v4 v4.6.0 // indirect - github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/go-playground/errors v3.3.0+incompatible // indirect - github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect - github.com/jackc/pgx/v5 v5.6.0 // indirect - github.com/klauspost/compress v1.17.8 // indirect - github.com/lib/pq v1.10.9 // indirect - github.com/pierrec/lz4/v4 v4.1.21 // indirect - github.com/prometheus/client_golang v1.19.1 // indirect - github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.48.0 // indirect - github.com/prometheus/procfs v0.12.0 // indirect - golang.org/x/crypto v0.22.0 // indirect - golang.org/x/sys v0.19.0 // indirect - golang.org/x/text v0.14.0 // indirect - google.golang.org/protobuf v1.34.1 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect -) diff --git a/example/kafka/go.sum b/example/kafka/go.sum deleted file mode 100644 index f7741b7..0000000 --- a/example/kafka/go.sum +++ /dev/null @@ -1,123 +0,0 @@ -github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA= -github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE= -github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= -github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-playground/errors v3.3.0+incompatible h1:w7qP6bdFXNmI86aV8VEfhXrGxoQWYHc/OX4Muw4FgW0= -github.com/go-playground/errors v3.3.0+incompatible/go.mod h1:n+RcthKmtLxDczVHKkhqiUSOGtTjvRl+HB4Gga0vWSI= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= -github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA= -github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= -github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= -github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= -github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= -github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= -github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= -github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= -github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= -github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= -github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= -github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= -github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= -github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= -github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= -github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= -github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= -github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= -github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= -github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= -github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= -github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= -github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= -github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= -github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= -github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= -github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= -golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= -golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= -golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= -google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/example/kafka/main.go b/example/kafka/main.go deleted file mode 100644 index e8e124e..0000000 --- a/example/kafka/main.go +++ /dev/null @@ -1,163 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - cdc "github.com/Trendyol/go-pq-cdc" - "github.com/Trendyol/go-pq-cdc/config" - "github.com/Trendyol/go-pq-cdc/pq/message/format" - "github.com/Trendyol/go-pq-cdc/pq/publication" - "github.com/Trendyol/go-pq-cdc/pq/replication" - "github.com/Trendyol/go-pq-cdc/pq/slot" - "github.com/google/uuid" - "github.com/segmentio/kafka-go" - "log/slog" - "os" - "time" -) - -/* - psql "postgres://cdc_user:cdc_pass@127.0.0.1/cdc_db?replication=database" - - CREATE TABLE users ( - id serial PRIMARY KEY, - name text NOT NULL, - created_on timestamptz - ); - - INSERT INTO users (name) - SELECT - 'Oyleli' || i - FROM generate_series(1, 100) AS i; -*/ - -type Message struct { - Message kafka.Message - Ack func() error -} - -func main() { - ctx := context.Background() - - w := &kafka.Writer{ - Addr: kafka.TCP("redpanda:9092"), - Topic: "cdc.test.produce", - Balancer: &kafka.LeastBytes{}, - BatchSize: 10000, - AllowAutoTopicCreation: true, - } - - defer func() { - err := w.Close() - if err != nil { - slog.Error("kafka writer close", "error", err) - } - }() - - messages := make(chan Message, 10000) - go Produce(ctx, w, messages) - - cfg := config.Config{ - Host: "localhost:5432", - Username: "cdc_user", - Password: "cdc_pass", - Database: "cdc_db", - DebugMode: false, - Publication: publication.Config{ - CreateIfNotExists: true, - Name: "cdc_publication", - Operations: publication.Operations{ - publication.OperationInsert, - publication.OperationDelete, - publication.OperationTruncate, - publication.OperationUpdate, - }, - Tables: publication.Tables{publication.Table{ - Name: "users", - ReplicaIdentity: publication.ReplicaIdentityFull, - }}, - }, - Slot: slot.Config{ - CreateIfNotExists: true, - Name: "cdc_slot", - SlotActivityCheckerInterval: 3000, - }, - Metric: config.MetricConfig{ - Port: 8081, - }, - Logger: config.LoggerConfig{ - LogLevel: slog.LevelInfo, - }, - } - - connector, err := cdc.NewConnector(ctx, cfg, FilteredMapper(messages)) - if err != nil { - slog.Error("new connector", "error", err) - os.Exit(1) - } - - connector.Start(ctx) -} - -func FilteredMapper(messages chan Message) replication.ListenerFunc { - return func(ctx *replication.ListenerContext) { - switch msg := ctx.Message.(type) { - case *format.Insert: - encoded, _ := json.Marshal(msg.Decoded) - messages <- Message{ - Message: kafka.Message{ - Key: []byte(uuid.NewString()), - Value: encoded, - Time: time.Now(), - }, - Ack: ctx.Ack, - } - case *format.Delete: - slog.Info("delete message received", "old", msg.OldDecoded) - case *format.Update: - slog.Info("update message received", "new", msg.NewDecoded, "old", msg.OldDecoded) - } - } -} - -func Produce(ctx context.Context, w *kafka.Writer, messages <-chan Message) { - var err error - var lastAck func() error - message := make([]kafka.Message, 100000) - counter := 0 - - for { - select { - case event := <-messages: - message[counter] = event.Message - lastAck = event.Ack - counter++ - - if counter == 100000 { - err = w.WriteMessages(ctx, message...) - if err != nil { - slog.Error("kafka produce", "error", err) - continue - } - slog.Info("kafka produce", "count", counter) - counter = 0 - if err = event.Ack(); err != nil { - slog.Error("ack", "error", err) - } - } - case <-time.After(100 * time.Millisecond): - if counter > 0 { - err = w.WriteMessages(ctx, message[:counter]...) - if err != nil { - slog.Error("kafka produce", "error", err) - continue - } - slog.Info("kafka produce time", "count", counter) - counter = 0 - if err = lastAck(); err != nil { - slog.Error("ack", "error", err) - } - } - } - } -} diff --git a/internal/metric/metric.go b/internal/metric/metric.go index 75bc0e9..9694c24 100644 --- a/internal/metric/metric.go +++ b/internal/metric/metric.go @@ -1,6 +1,8 @@ package metric import ( + "os" + "github.com/prometheus/client_golang/prometheus" ) @@ -40,6 +42,7 @@ type metric struct { //nolint:funlen func NewMetric(slotName string) Metric { + hostname, _ := os.Hostname() return &metric{ totalInsert: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: cdcNamespace, @@ -48,6 +51,7 @@ func NewMetric(slotName string) Metric { Help: "total number of insert operation message in cdc", ConstLabels: prometheus.Labels{ "slot_name": slotName, + "host": hostname, }, }), totalUpdate: prometheus.NewCounter(prometheus.CounterOpts{ @@ -57,6 +61,7 @@ func NewMetric(slotName string) Metric { Help: "total number of update operation message in cdc", ConstLabels: prometheus.Labels{ "slot_name": slotName, + "host": hostname, }, }), totalDelete: prometheus.NewCounter(prometheus.CounterOpts{ @@ -66,6 +71,7 @@ func NewMetric(slotName string) Metric { Help: "total number of delete operation message in cdc", ConstLabels: prometheus.Labels{ "slot_name": slotName, + "host": hostname, }, }), cdcLatency: prometheus.NewGauge(prometheus.GaugeOpts{ @@ -75,6 +81,7 @@ func NewMetric(slotName string) Metric { Help: "latest consumed cdc message latency ms", ConstLabels: prometheus.Labels{ "slot_name": slotName, + "host": hostname, }, }), processLatency: prometheus.NewGauge(prometheus.GaugeOpts{ @@ -84,6 +91,7 @@ func NewMetric(slotName string) Metric { Help: "latest cdc process latency", ConstLabels: prometheus.Labels{ "slot_name": slotName, + "host": hostname, }, }), slotActivity: prometheus.NewGauge(prometheus.GaugeOpts{ @@ -93,6 +101,7 @@ func NewMetric(slotName string) Metric { Help: "whether the replication slot is active or not", ConstLabels: prometheus.Labels{ "slot_name": slotName, + "host": hostname, }, }), slotConfirmedFlushLSN: prometheus.NewGauge(prometheus.GaugeOpts{ @@ -102,6 +111,7 @@ func NewMetric(slotName string) Metric { Help: "last lsn confirmed flushed to the replication slot", ConstLabels: prometheus.Labels{ "slot_name": slotName, + "host": hostname, }, }), slotCurrentLSN: prometheus.NewGauge(prometheus.GaugeOpts{ @@ -111,6 +121,7 @@ func NewMetric(slotName string) Metric { Help: "current lsn", ConstLabels: prometheus.Labels{ "slot_name": slotName, + "host": hostname, }, }), slotRetainedWALSize: prometheus.NewGauge(prometheus.GaugeOpts{ @@ -120,6 +131,7 @@ func NewMetric(slotName string) Metric { Help: "current lsn - restart lsn", ConstLabels: prometheus.Labels{ "slot_name": slotName, + "host": hostname, }, }), slotLag: prometheus.NewGauge(prometheus.GaugeOpts{ @@ -129,6 +141,7 @@ func NewMetric(slotName string) Metric { Help: "current lsn - confirmed flush lsn", ConstLabels: prometheus.Labels{ "slot_name": slotName, + "host": hostname, }, }), }