Skip to content

Commit

Permalink
some-logging-improvements (#262)
Browse files Browse the repository at this point in the history
* some-logging-improvements

- log during table creation failures
- some additional logging at critical states between coordinator/worker during commit phase

* comments
  • Loading branch information
tabmatfournier authored Jun 6, 2024
1 parent e942a59 commit 066bb79
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ public boolean isCommitTimedOut() {
return false;
}

if (System.currentTimeMillis() - startTime > config.commitTimeoutMs()) {
LOG.info("Commit timeout reached");
long currentTime = System.currentTimeMillis();
if (currentTime - startTime > config.commitTimeoutMs()) {
LOG.info("Commit timeout reached. Now: {}, start: {}, timeout: {}", currentTime, startTime, config.commitTimeoutMs());
return true;
}
return false;
Expand All @@ -125,14 +126,14 @@ public boolean isCommitReady(int expectedPartitionCount) {
.sum();

if (receivedPartitionCount >= expectedPartitionCount) {
LOG.debug(
LOG.info(
"Commit {} ready, received responses for all {} partitions",
currentCommitId,
receivedPartitionCount);
return true;
}

LOG.debug(
LOG.info(
"Commit {} not ready, received responses for {} of {} partitions, waiting for more",
currentCommitId,
receivedPartitionCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ public void process() {
if (commitState.isCommitIntervalReached()) {
// send out begin commit
commitState.startNewCommit();
LOG.info("Started new commit with commit-id={}", commitState.currentCommitId().toString());
Event event =
new Event(config.controlGroupId(), new StartCommit(commitState.currentCommitId()));
send(event);
LOG.debug("Started new commit with commit-id={}", commitState.currentCommitId().toString());
LOG.info("Sent workers commit trigger with commit-id={}", commitState.currentCommitId().toString());

}

consumeAvailable(POLL_DURATION, this::receive);
Expand All @@ -127,6 +129,7 @@ private boolean receive(Envelope envelope) {

private void commit(boolean partialCommit) {
try {
LOG.info("Processing commit after responses for {}, isPartialCommit {}",commitState.currentCommitId(), partialCommit);
doCommit(partialCommit);
} catch (Exception e) {
LOG.warn("Commit failed, will try again next cycle", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private void routeRecordStatically(SinkRecord record) {

private void routeRecordDynamically(SinkRecord record) {
String routeField = config.tablesRouteField();
Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing");
Preconditions.checkNotNull(routeField, String.format("Route field cannot be null with dynamic routing at topic: %s, partition: %d, offset: %d", record.topic(), record.kafkaPartition(), record.kafkaOffset()));

String routeValue = extractRouteValue(record.value(), routeField);
if (routeValue != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public IcebergWriterFactory(Catalog catalog, IcebergSinkConfig config) {
}

public RecordWriter createWriter(
String tableName, SinkRecord sample, boolean ignoreMissingTable) {
String tableName, SinkRecord sample, boolean ignoreMissingTable) {
TableIdentifier identifier = TableIdentifier.parse(tableName);
Table table;
try {
Expand All @@ -67,47 +67,52 @@ public RecordWriter createWriter(

@VisibleForTesting
Table autoCreateTable(String tableName, SinkRecord sample) {
StructType structType;
if (sample.valueSchema() == null) {
structType =
SchemaUtils.inferIcebergType(sample.value(), config)
.orElseThrow(() -> new DataException("Unable to create table from empty object"))
.asStructType();
} else {
structType = SchemaUtils.toIcebergType(sample.valueSchema(), config).asStructType();
}
try {
StructType structType;
if (sample.valueSchema() == null) {
structType =
SchemaUtils.inferIcebergType(sample.value(), config)
.orElseThrow(() -> new DataException("Unable to create table from empty object"))
.asStructType();
} else {
structType = SchemaUtils.toIcebergType(sample.valueSchema(), config).asStructType();
}

org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields());
TableIdentifier identifier = TableIdentifier.parse(tableName);
org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields());
TableIdentifier identifier = TableIdentifier.parse(tableName);

List<String> partitionBy = config.tableConfig(tableName).partitionBy();
PartitionSpec spec;
try {
spec = SchemaUtils.createPartitionSpec(schema, partitionBy);
List<String> partitionBy = config.tableConfig(tableName).partitionBy();
PartitionSpec spec;
try {
spec = SchemaUtils.createPartitionSpec(schema, partitionBy);
} catch (Exception e) {
LOG.error(
"Unable to create partition spec {}, table {} will be unpartitioned",
partitionBy,
identifier,
e);
spec = PartitionSpec.unpartitioned();
}

PartitionSpec partitionSpec = spec;
AtomicReference<Table> result = new AtomicReference<>();
Tasks.range(1)
.retry(IcebergSinkConfig.CREATE_TABLE_RETRIES)
.run(
notUsed -> {
try {
result.set(catalog.loadTable(identifier));
} catch (NoSuchTableException e) {
result.set(
catalog.createTable(
identifier, schema, partitionSpec, config.autoCreateProps()));
LOG.info("Created new table {} from record at topic: {}, partition: {}, offset: {}", identifier, sample.topic(), sample.kafkaPartition(), sample.kafkaOffset());
}
});
return result.get();
} catch (Exception e) {
LOG.error(
"Unable to create partition spec {}, table {} will be unpartitioned",
partitionBy,
identifier,
e);
spec = PartitionSpec.unpartitioned();
LOG.error("Error creating new table {} from record at topic: {}, partition: {}, offset: {}", tableName, sample.topic(), sample.kafkaPartition(), sample.kafkaOffset());
throw e;
}

PartitionSpec partitionSpec = spec;
AtomicReference<Table> result = new AtomicReference<>();
Tasks.range(1)
.retry(IcebergSinkConfig.CREATE_TABLE_RETRIES)
.run(
notUsed -> {
try {
result.set(catalog.loadTable(identifier));
} catch (NoSuchTableException e) {
result.set(
catalog.createTable(
identifier, schema, partitionSpec, config.autoCreateProps()));
LOG.info("Created new table {} from record at topic: {}, partition: {}, offset: {}", identifier, sample.topic(), sample.kafkaPartition(), sample.kafkaOffset());
}
});
return result.get();
}
}

0 comments on commit 066bb79

Please sign in to comment.