Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added message cleaner configuration. Added database schema. Added micronaut details for DomainEventDispatcher. Fixed rabbitmq docs. #42

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
222 changes: 220 additions & 2 deletions src/docs/asciidoc/cdc-configuration.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ If polling fails, reader will try again using the specified interval.
| Pipeline property name

| eventuate.database.schema
| Schema which is listened by the CDC service
| Schema which is listened by the CDC service (use `NONE` for default from jdbc connection)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think eventuate.database.schema is used to configure the CDC.
It's used to configure the application/service.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used. Each reader has datasource. But I added info for application.

| eventuate
| eventuateDatabaseSchema

Expand Down Expand Up @@ -580,7 +580,7 @@ The Eventuate Tram CDC supports `eventuate-tram` and `eventuate-local`
| -

| eventuateDatabaseSchema
| The schema of the transaction outbox table
| The schema of the transaction outbox table (use `NONE` for default from jdbc connection).
| `eventuate`

| sourceTableName
Expand All @@ -595,6 +595,95 @@ The Eventuate Tram CDC supports `eventuate-tram` and `eventuate-local`

|===

==== Configuring a cleaner

Cdc can automatically remove old messages from message table and received_message table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should explain that you configure one or more named cleaners.
Each clean can clean a message table and/or a received_message table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will add that

To enable cleaning is necessary to define one or more named cleaners.
A cleaner can clean message and/or received_message tables.
Example configuration:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please describe what this configuration does.
Also, please have properties for configuring received_message table too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


----
EVENTUATE_CDC_CLEANER_CLEANER1_PIPELINE: PIPELINE1
EVENTUATE_CDC_CLEANER_CLEANER1_MESSAGE_CLEANING_ENABLED: "true"
EVENTUATE_CDC_CLEANER_CLEANER1_MESSAGES_MAX_AGE_IN_SECONDS: 1
EVENTUATE_CDC_CLEANER_CLEANER1_RECEIVED_MESSAGE_CLEANING_ENABLED: "true"
EVENTUATE_CDC_CLEANER_CLEANER1_RECEIVED_MESSAGES_MAX_AGE_IN_SECONDS: 1
EVENTUATE_CDC_CLEANER_CLEANER1_INTERVAL_IN_SECONDS: 1
...
----

Here is specified message cleaner with name CLEANER1 (EVENTUATE_CDC_CLEANER is root property name).
It used database connection from PIPELINE1.
Cleaning enabled for message and received_message table.
Max message and received message age is one second.
Cleaning will be started on each second.


If pipeline is specified, pipeline database configuration is used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would the user NOT want to specify the pipeline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. But probably we discussed this option when I was working on implementation.

If not, please specify additional database parameters explicitly:

----
EVENTUATE_CDC_CLEANER_CLEANER1_DATASOURCE_URL: jdbc:postgresql://postgreswalpipeline/eventuate
EVENTUATE_CDC_CLEANER_CLEANER1_DATASOURCE_USERNAME: eventuate
EVENTUATE_CDC_CLEANER_CLEANER1_DATASOURCE_PASSWORD: eventuate
EVENTUATE_CDC_CLEANER_CLEANER1_DATASOURCE_DRIVER_CLASSNAME: org.postgresql.Driver
...
----

Full list of configuration properties
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something seems to be missing here that explains what EVENTUATE_CDC_CLEANER_C3 is, e.g. eventuate.cdc.cleaner is some root property name, c3 is the name of a specific cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will add


[cols=3, options="header"]
|===
| Name
| Description
| Default Value

| dataSourceUrl
| jdbc connection url
| -

| dataSourceUserName
| jdbc username
| -

| dataSourcePassword
| jdbc password
| -

| dataSourceDriverClassName
| jdbc driver class name
| -

| eventuateSchema
| database schema
| depends on RDBMS

| pipeline
| pipeline name (used instead of jdbc configuration)
| -

| messageCleaningEnabled
| enables message table cleaning
| false

| messagesMaxAgeInSeconds
| max age of message to remove
| 2 days

| receivedMessageCleaningEnabled
| enables received_message table cleaning
| false

| receivedMessagesMaxAgeInSeconds
| max age of received_message to remove
| 2 days

| intervalInSeconds
| how often cdc starts cleaning
| every minute

|===

=== Configuring the publisher

The publisher is invoked by the pipeline to publish a message/event to the message broker.
Expand Down Expand Up @@ -807,16 +896,145 @@ The CDC service requires various infrastructure services including:
The Eventuate CDC service requires several tables.
The Eventuate MySQL and Postgres images define these tables.


In default eventuate images, tables are located in 'eventuate' schema.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typos and capitalization: 'In the default Eventuate images'

If you created necessary tables (see further) in some custom schema, please specify it in `eventuate.database.schema` property or use value `NONE` for it.
See <<Single pipeline - Pipeline Properties>> and <<Configuring a pipeline>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a hyperlink? If not, it should be

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


==== Mysql schema example for eventuate local
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capitalization:

  • eventuate => Eventuate
  • Mysql => MySQL


----
create table events (
id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
event_id VARCHAR(255),
event_type LONGTEXT,
event_data LONGTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL,
entity_type VARCHAR(255) NOT NULL,
entity_id VARCHAR(255) NOT NULL,
triggering_event LONGTEXT,
metadata LONGTEXT,
published TINYINT DEFAULT 0
);

create table entities (
entity_type VARCHAR(255),
entity_id VARCHAR(255),
entity_version LONGTEXT NOT NULL,
PRIMARY KEY(entity_type, entity_id)
);

create table snapshots (
entity_type VARCHAR(255),
entity_id VARCHAR(255),
entity_version VARCHAR(255),
snapshot_type LONGTEXT NOT NULL,
snapshot_json LONGTEXT NOT NULL,
triggering_events LONGTEXT,
PRIMARY KEY(entity_type, entity_id, entity_version)
);
----

==== Mysql schema example for eventuate tram

----
CREATE TABLE message (
dbid BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
id LONGTEXT,
destination LONGTEXT NOT NULL,
headers LONGTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL,
payload LONGTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL,
published SMALLINT DEFAULT 0,
creation_time BIGINT
);

CREATE TABLE received_messages (
consumer_id VARCHAR(255),
message_id VARCHAR(255),
creation_time BIGINT,
published SMALLINT DEFAULT 0,
PRIMARY KEY(consumer_id, message_id)
);
----

==== Mysql schema example for eventuate tram sagas

----
CREATE TABLE saga_instance_participants (
saga_type VARCHAR(255) NOT NULL,
saga_id VARCHAR(100) NOT NULL,
destination VARCHAR(100) NOT NULL,
resource VARCHAR(100) NOT NULL,
PRIMARY KEY(saga_type, saga_id, destination, resource)
);

CREATE TABLE saga_instance(
saga_type VARCHAR(255) NOT NULL,
saga_id VARCHAR(100) NOT NULL,
state_name VARCHAR(100) NOT NULL,
last_request_id VARCHAR(100),
end_state INT(1),
compensating INT(1),
saga_data_type VARCHAR(1000) NOT NULL,
saga_data_json VARCHAR(1000) NOT NULL,
PRIMARY KEY(saga_type, saga_id)
);

create table saga_lock_table(
target VARCHAR(100) PRIMARY KEY,
saga_type VARCHAR(255) NOT NULL,
saga_Id VARCHAR(100) NOT NULL
);

create table saga_stash_table(
message_id VARCHAR(100) PRIMARY KEY,
target VARCHAR(100) NOT NULL,
saga_type VARCHAR(255) NOT NULL,
saga_id VARCHAR(100) NOT NULL,
message_headers VARCHAR(1000) NOT NULL,
message_payload VARCHAR(1000) NOT NULL
);
----

==== `CDC_MONITORING` table

The CDC service uses the `CDC_MONITORING` table to implement a 'heart beat' mechanism.
Each reader that uses transaction log tailing (MySQL binlog/Postgres WAL) periodically updates a row in this table and measures the delay in receiving the update from the transaction log.

----
create table cdc_monitoring (
reader_id VARCHAR(255) PRIMARY KEY,
last_time BIGINT
);
----

==== `OFFSET_STORE` table

When publishing messages to Apache ActiveMQ, RabbitMQ, Redis, the MySql binlog reader records the current binlog position in this table.

----
CREATE TABLE eventuate.offset_store(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it always eventuate.?

client_name VARCHAR(255) NOT NULL PRIMARY KEY,
serialized_offset LONGTEXT
);
----

==== Full Database Schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move these links into the respective sections.

Please use proper links:

* [MySQL schema for ...](...)


For full schema info, please see:

https://github.com/eventuate-foundation/eventuate-common/tree/master/mysql

https://github.com/eventuate-foundation/eventuate-common/tree/master/postgres

https://github.com/eventuate-foundation/eventuate-common/tree/master/mssql


https://github.com/eventuate-tram/eventuate-tram-sagas/tree/master/mysql

https://github.com/eventuate-tram/eventuate-tram-sagas/tree/master/postgres

https://github.com/eventuate-tram/eventuate-tram-sagas/tree/master/mssql

=== Apache Kafka broker configuration

The MySQL binlog reader records the current binlog position in the `offsetStorageTopicName` topic.
Expand Down
4 changes: 4 additions & 0 deletions src/docs/asciidoc/getting-started-eventuate-tram.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,10 @@ public class AbstractTramEventTestConfiguration {

See this example of https://github.com/eventuate-tram-examples/eventuate-tram-core-quarkus-examples-basic/blob/main/eventuate-tram-examples-common/src/main/java/io/eventuate/tram/examples/basic/events/AbstractTramEventTestConfiguration.java[transaction events].

Please note: if you want to have several dispatchers, you will need to use @Named annotation (https://docs.oracle.com/javaee/6/api/javax/inject/Named.html)
on DomainEventDispatcher bean definitions. To initialize DomainEventDispatcher objects, eventuate framework iterates over all beans
of that type and those beans should have unique names, otherwise only one of them will be retrieved from IoC container (micronaut specific behavior)
without any explicit error.

=== Transactional commands

Expand Down