Skip to content

Commit

Permalink
SAMZA-2352: Use min.compaction.lag.ms to avoid compacting the Kafka c…
Browse files Browse the repository at this point in the history
…hangelog topic (#1190)
  • Loading branch information
xinyuiscool authored and prateekm committed Oct 17, 2019
1 parent e2928e1 commit 842f970
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.samza.execution.StreamManager;
import org.apache.samza.util.StreamUtil;

import static com.google.common.base.Preconditions.*;


/**
* Config helper methods related to storage.
Expand All @@ -45,13 +47,18 @@ public class StorageConfig extends MapConfig {
public static final String MSG_SERDE = STORE_PREFIX + "%s.msg.serde";
public static final String CHANGELOG_STREAM = STORE_PREFIX + "%s" + CHANGELOG_SUFFIX;
public static final String ACCESSLOG_STREAM_SUFFIX = "access-log";
// TODO: setting replication.factor seems not working as in KafkaConfig.
public static final String CHANGELOG_REPLICATION_FACTOR = STORE_PREFIX + "%s.changelog.replication.factor";
public static final String CHANGELOG_MAX_MSG_SIZE_BYTES = STORE_PREFIX + "%s.changelog.max.message.size.bytes";
public static final int DEFAULT_CHANGELOG_MAX_MSG_SIZE_BYTES = 1048576;
public static final String DISALLOW_LARGE_MESSAGES = STORE_PREFIX + "%s.disallow.large.messages";
public static final boolean DEFAULT_DISALLOW_LARGE_MESSAGES = false;
public static final String DROP_LARGE_MESSAGES = STORE_PREFIX + "%s.drop.large.messages";
public static final boolean DEFAULT_DROP_LARGE_MESSAGES = false;
// The log compaction lag time for transactional state change log
public static final String MIN_COMPACTION_LAG_MS = "min.compaction.lag.ms";
public static final String CHANGELOG_MIN_COMPACTION_LAG_MS = STORE_PREFIX + "%s.changelog." + MIN_COMPACTION_LAG_MS;
public static final long DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS = TimeUnit.HOURS.toMillis(4);

static final String CHANGELOG_SYSTEM = "job.changelog.system";
static final String CHANGELOG_DELETE_RETENTION_MS = STORE_PREFIX + "%s.changelog.delete.retention.ms";
Expand Down Expand Up @@ -207,6 +214,15 @@ public boolean getDropLargeMessages(String storeName) {
return getBoolean(String.format(DROP_LARGE_MESSAGES, storeName), DEFAULT_DROP_LARGE_MESSAGES);
}

public long getChangelogMinCompactionLagMs(String storeName) {
String minCompactLagConfigName = String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, storeName);
// Avoid the inconsistency of overriding using stores.x.changelog.kafka...
checkArgument(get("stores." + storeName + ".changelog.kafka." + MIN_COMPACTION_LAG_MS) == null,
"Use " + minCompactLagConfigName + " to set kafka min.compaction.lag.ms property.");

return getLong(minCompactLagConfigName, DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS);
}

/**
* Helper method to check if a system has a changelog attached to it.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.concurrent.TimeUnit;
import org.apache.samza.SamzaException;
import org.junit.Test;

Expand Down Expand Up @@ -296,4 +297,17 @@ public void testGetDropLargeMessages() {
new MapConfig(ImmutableMap.of(String.format(StorageConfig.DROP_LARGE_MESSAGES, STORE_NAME0), "true")));
assertEquals(true, storageConfig.getDropLargeMessages(STORE_NAME0));
}

@Test
public void testGetChangelogMinCompactionLagMs() {
// empty config, return default lag ms
assertEquals(DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS,
new StorageConfig(new MapConfig()).getChangelogMinCompactionLagMs(STORE_NAME0));

long lagOverride = TimeUnit.HOURS.toMillis(6);
StorageConfig storageConfig = new StorageConfig(
new MapConfig(ImmutableMap.of(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0),
String.valueOf(lagOverride))));
assertEquals(lagOverride, storageConfig.getChangelogMinCompactionLagMs(STORE_NAME0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,18 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
kafkaChangeLogProperties.setProperty("max.message.bytes", getChangelogStreamMaxMessageByte(name))
}

val storageConfig = new StorageConfig(config)
kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name)))
kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(storageConfig.getChangeLogDeleteRetentionInMs(name)))

// To enable transactional state, we will need to avoid the head of the changelog
// (the messages after last checkpoint) being log-compacted so we can trim the rest of the updates.
// We use min.compaction.log.ms to control the compaction time.
if (new TaskConfig(this).getTransactionalStateRestoreEnabled) {
kafkaChangeLogProperties.setProperty(StorageConfig.MIN_COMPACTION_LAG_MS,
String.valueOf(storageConfig.getChangelogMinCompactionLagMs(name)))
}

filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) }
kafkaChangeLogProperties
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.samza.config

import java.util.Properties
import java.util.concurrent.TimeUnit

import org.apache.samza.config.factories.PropertiesConfigFactory
import org.junit.Assert._
Expand Down Expand Up @@ -82,11 +83,11 @@ class TestKafkaConfig {

@Test
def testChangeLogProperties() {
props.setProperty("job.changelog.system", SYSTEM_NAME)
props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")
props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
props.setProperty("stores.test2.changelog", "kafka.mychangelog2")
props.setProperty("stores.test2.changelog.max.message.bytes", "1024000")
props.setProperty("job.changelog.system", "kafka")
props.setProperty("stores.test3.changelog", "otherstream")
props.setProperty("stores.test1.changelog.kafka.cleanup.policy", "delete")
props.setProperty("stores.test4.rocksdb.ttl.ms", "3600")
Expand All @@ -107,6 +108,7 @@ class TestKafkaConfig {
assertEquals("otherstream", storeToChangelog.getOrDefault("test3", ""))
assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("retention.ms"))
assertNull(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("retention.ms"))
assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms"))

props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.SomeOtherFactory")
val storeToChangelog1 = kafkaConfig.getKafkaChangelogEnabledStores()
Expand Down Expand Up @@ -138,6 +140,17 @@ class TestKafkaConfig {
String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("max.message.bytes"),
KafkaConfig.DEFAULT_LOG_COMPACT_TOPIC_MAX_MESSAGE_BYTES)

// test compaction config for transactional state
val lagOverride = String.valueOf(TimeUnit.HOURS.toMillis(6))
props.setProperty(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true")
props.setProperty("stores.test2.changelog.min.compaction.lag.ms", lagOverride)
val tsMapConfig = new MapConfig(props.asScala.asJava)
val tsKafkaConfig = new KafkaConfig(tsMapConfig)
assertEquals(String.valueOf(StorageConfig.DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS),
tsKafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms"))
assertEquals(lagOverride,
tsKafkaConfig.getChangelogKafkaProperties("test2").getProperty("min.compaction.lag.ms"))
}

@Test
Expand Down

0 comments on commit 842f970

Please sign in to comment.