Skip to content

Commit

Permalink
[ISSUE #4458] Support MySQL Sink Connector feature (#4771)
Browse files Browse the repository at this point in the history
* [ISSUE #4458] Support mysql Sink Connector feature

* remove pg jdbc import

* update dependencies
  • Loading branch information
mxsm authored Feb 18, 2024
1 parent 36888b8 commit d6393ab
Show file tree
Hide file tree
Showing 133 changed files with 5,805 additions and 812 deletions.
3 changes: 3 additions & 0 deletions eventmesh-connectors/eventmesh-connector-jdbc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ packageSources {
dependencies {
antlr("org.antlr:antlr4:4.13.0")
implementation 'org.antlr:antlr4-runtime:4.13.0'
implementation 'com.alibaba:druid:1.2.20'
implementation 'org.hibernate:hibernate-core:5.6.15.Final'
implementation project(":eventmesh-common")
implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
implementation project(":eventmesh-spi")
implementation 'com.zendesk:mysql-binlog-connector-java:0.28.0'
implementation 'mysql:mysql-connector-java:8.0.32'

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import java.util.List;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
/**
* Represents changes in a catalog, such as schema or table modifications.
*/
@Data
@NoArgsConstructor
public class CatalogChanges {

/**
Expand All @@ -52,10 +54,10 @@ public class CatalogChanges {
// The table associated with the changes
private Table table;
// The list of columns affected by the changes
private List<? extends Column> columns;
private List<? extends Column<?>> columns;

private CatalogChanges(String type, String operationType, CatalogSchema catalog, Table table,
List<? extends Column> columns) {
List<? extends Column<?>> columns) {
this.type = type;
this.operationType = operationType;
this.catalog = catalog;
Expand All @@ -81,7 +83,7 @@ public static class Builder {
private String operationType;
private CatalogSchema catalog;
private Table table;
private List<? extends Column> columns;
private List<? extends Column<?>> columns;

/**
* Sets the operation type for the change.
Expand Down Expand Up @@ -123,7 +125,7 @@ public Builder table(Table table) {
* @param columns The list of Column instances.
* @return The Builder instance.
*/
public Builder columns(List<? extends Column> columns) {
public Builder columns(List<? extends Column<?>> columns) {
this.columns = columns;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,107 @@
package org.apache.eventmesh.connector.jdbc;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
/**
* DataChanges class representing changes in data associated with a JDBC connection.
*/
public class DataChanges {

private Object after;

private Object before;

/**
* The type of change.
* <pr>
* {@link org.apache.eventmesh.connector.jdbc.event.DataChangeEventType}
* </pr>
*/
private String type;

/**
* Constructs a DataChanges instance with 'after' and 'before' data.
*
* @param after The data after the change.
* @param before The data before the change.
*/
public DataChanges(Object after, Object before) {
this.after = after;
this.before = before;
}

/**
* Constructs a DataChanges instance with 'after', 'before' data, and a change type.
*
* @param after The data after the change.
* @param before The data before the change.
* @param type The type of change.
*/
public DataChanges(Object after, Object before, String type) {
this.after = after;
this.before = before;
this.type = type;
}

/**
* Creates a new DataChanges builder.
*
* @return The DataChanges builder.
*/
public static Builder newBuilder() {
return new Builder();
}

/**
* Builder class for constructing DataChanges instances.
*/
public static class Builder {

private String type;
private Object after;
private Object before;

/**
* Sets the change type in the builder.
*
* @param type The type of change.
* @return The DataChanges builder.
*/
public Builder withType(String type) {
this.type = type;
return this;
}

/**
* Sets the 'after' data in the builder.
*
* @param after The data after the change.
* @return The DataChanges builder.
*/
public Builder withAfter(Object after) {
this.after = after;
return this;
}

/**
* Sets the 'before' data in the builder.
*
* @param before The data before the change.
* @return The DataChanges builder.
*/
public Builder withBefore(Object before) {
this.before = before;
return this;
}

/**
* Builds the DataChanges instance.
*
* @return The constructed DataChanges.
*/
public DataChanges build() {
return new DataChanges(after, before, type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.eventmesh.connector.jdbc;

import org.apache.eventmesh.connector.jdbc.table.catalog.Column;

import java.util.List;

import lombok.AllArgsConstructor;
Expand All @@ -28,25 +30,25 @@
@AllArgsConstructor
public class Field {

private String type;

private boolean required;

private String field;

private String name;

private Column<?> column;

private List<Field> fields;

public Field(String type, boolean required, String field, String name) {
this.type = type;
public Field(Column<?> column, boolean required, String field, String name) {
this.column = column;
this.required = required;
this.field = field;
this.name = name;
}

public Field withType(String type) {
this.type = type;
public Field withColumn(Column<?> column) {
this.column = column;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,26 @@

package org.apache.eventmesh.connector.jdbc;

/**
* Represents data associated with a JDBC connector.
*/
public final class JdbcConnectData {

/**
* Constant representing data changes in the JDBC connector.
*/
public static final byte DATA_CHANGES = 1;

/**
* Constant representing schema changes in the JDBC connector.
*/
public static final byte SCHEMA_CHANGES = 1 << 1;

private Payload payload = new Payload();

private Schema schema;

private byte type;
private byte type = 0;

public JdbcConnectData() {
}
Expand Down Expand Up @@ -67,4 +76,12 @@ public void markDataChanges() {
public void markSchemaChanges() {
this.type |= SCHEMA_CHANGES;
}

public boolean isDataChanges() {
return (this.type & DATA_CHANGES) != 0;
}

public boolean isSchemaChanges() {
return (this.type & SCHEMA_CHANGES) != 0;
}
}
Loading

0 comments on commit d6393ab

Please sign in to comment.