Skip to content

Commit

Permalink
Updating documentation:
Browse files Browse the repository at this point in the history
 - Added Oracle Database requirements and instructions
 - Added an example configuration file.
  • Loading branch information
Thorsten Hake committed Feb 10, 2020
1 parent 233e8cd commit cbe3c79
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 35 deletions.
70 changes: 49 additions & 21 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ Planned features:
- Reading schema changes from the Archive-Log. Currently the online catalog is used. See
https://docs.oracle.com/cd/B19306_01/server.102/b14215/logminer.htm#i1014687 for more information.


## Initial Import
If the start SCN is not set or set to 0, logminer kafka connect will query
the configured tables for an initial import. During the initial import, no
Expand Down Expand Up @@ -95,14 +94,18 @@ The following source fields will be provided:


## Configuration
You can find an example configuration under [example-configuration.properties](example-configuration.properties).

The following configuration parameter are available:
- `db.hostname`
Database hostname

- Type: string
- Importance: high

- `db.name`
Database SID
Logical name of the database. This name will be used as a prefix for
the topic. You can choose this name as you like.

- Type: string
- Importance: high
Expand All @@ -113,6 +116,12 @@ The following source fields will be provided:
- Type: int
- Importance: high

- `db.sid`
Database SID

- Type: string
- Importance: high

- `db.user`
Database user

Expand All @@ -132,12 +141,12 @@ The following source fields will be provided:
- Default: 1000
- Importance: high

- `record.prefix`
Prefix of the subject record. If you're using an Avro converter,
this will be the namespace.
- `start.scn`
Start SCN, if set to 0 an initial intake from the tables will be
performed.

- Type: string
- Default: ""
- Type: long
- Default: 0
- Importance: high

- `table.whitelist`
Expand All @@ -150,29 +159,48 @@ The following source fields will be provided:
- Default: ""
- Importance: high

- `topic.prefix`
Prefix for the topic. Each monitored table will be written to a
separate topic. If you want to changethis behaviour, you can add a
RegexRouter transform.

- Type: string
- Default: ""
- Importance: medium

- `db.fetch.size`
JDBC result set prefetch size. If not set, it will be defaulted to
batch.size. The fetch should not be smaller than the batch size.

- Type: int
- Default: null
- Importance: medium

- `db.attempts`
Maximum number of attempts to retrieve a valid JDBC connection.

- Type: int
- Default: 3
- Importance: low

- `start.scn`
Start SCN, if set to 0 an initial intake from the tables will be
performed.
- `db.backoff.ms`
Backoff time in milliseconds between connection attempts.

- Type: long
- Default: 0
- Default: 10000
- Importance: low


- `poll.interval.ms`
Positive integer value that specifies the number of milliseconds the
connector should wait after a polling attempt didn't retrieve any
results.

- Type: long
- Default: 2000
- Importance: low

## Oracle Database Configuration Requirements
In order for Logminer Kafka Connect to work, the database needs to be in ARCHIVELOG mode and Supplemental Logging needs to be
enabled with all columns. Here are the commands that need to be executed in sqlplus:
```oraclesqlplus
prompt Shutting down database to activate archivelog mode;
shutdown immediate;
startup mount;
alter database archivelog;
prompt Archive log activated.;
alter database add supplemental log data (all) columns;
prompt Activated supplemental logging with all columns.;
prompt Starting up database;
alter database open;
```
13 changes: 13 additions & 0 deletions example-configuration.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name=logminer-kafka-connect
connector.class=com.github.thake.logminer.kafka.connect.LogminerSourceConnector
start.scn=0
db.name=chosen-alias-name,
db.sid=XE,
db.hostname=127.0.0.1
db.port=1521
db.user=dbUser
db.user.password=dbPassword
db.fetch.size=10000
batch.size=1000
poll.interval.ms=500
table.whitelist=MYSCHEMA.TABLE_NAME, OTHERSCHEMA
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ class SourceConnectorConfig(
get() = getLong(START_SCN) ?: 0


val recordPrefix: String
get() = getString(RECORD_PREFIX)

val pollInterval: Duration
get() = Duration.ofMillis(getLong(POLL_INTERVAL_MS))

Expand All @@ -83,7 +80,6 @@ class SourceConnectorConfig(
const val MONITORED_TABLES = "table.whitelist"
const val DB_FETCH_SIZE = "db.fetch.size"
const val START_SCN = "start.scn"
const val RECORD_PREFIX = "record.prefix"
const val BATCH_SIZE = "batch.size"
const val POLL_INTERVAL_MS = "poll.interval.ms"

Expand Down Expand Up @@ -144,23 +140,16 @@ class SourceConnectorConfig(
DB_FETCH_SIZE,
ConfigDef.Type.INT,
null,
Importance.LOW,
Importance.MEDIUM,
"JDBC result set prefetch size. If not set, it will be defaulted to batch.size. The fetch" +
" should not be smaller than the batch size."
)
.define(
START_SCN,
ConfigDef.Type.LONG,
0L,
Importance.LOW,
"Start SCN, if set to 0 an initial intake from the tables will be performed."
)
.define(
RECORD_PREFIX,
ConfigDef.Type.STRING,
"",
Importance.HIGH,
"Prefix of the subject record. If you're using an Avro converter, this will be the namespace."
"Start SCN, if set to 0 an initial intake from the tables will be performed."
)
.define(
DB_ATTEMPTS,
Expand All @@ -179,7 +168,7 @@ class SourceConnectorConfig(
.define(
POLL_INTERVAL_MS,
ConfigDef.Type.LONG,
500L,
2000L,
Importance.LOW,
"Positive integer value that specifies the number of milliseconds the connector should wait after a polling attempt didn't retrieve any results."
)
Expand Down

0 comments on commit cbe3c79

Please sign in to comment.