Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2286: Integrate MetadataStore#putAll for improved startup time. #1125

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

dnishimura
Copy link
Contributor

This PR integrates MetadataStore#putAll with TaskAssignmentManager, TaskPartitionAssignmentManager, ChangelogStreamManager and ZkJobCoordinator when writing assignments to the coordinator stream. For a Kafka-based coordinator stream, the putAll method allows multiple messages to be produced before calling flush. We have observed in preliminary benchmark tests that this provides an order of magnitude improvement when calling putAll vs. put for each entry. The impact is faster startup times.

@shanthoosh - please take a look since you have the most context.

@dnishimura dnishimura force-pushed the samza-2286-integrate-metadatastore-putall branch from f999e0a to b22b96e Compare August 2, 2019 21:24
@dnishimura
Copy link
Contributor Author

@shanthoosh @bharathkk - Please take a look when you get a chance.

@dnishimura dnishimura force-pushed the samza-2286-integrate-metadatastore-putall branch from b22b96e to 75eafe8 Compare October 2, 2019 21:44
@dnishimura
Copy link
Contributor Author

Rebased with the latest master branch ⬆️

Copy link
Contributor

@shanthoosh shanthoosh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

It would be great if we can test this patch with a sample yarn job(e.g samza-hello-samza) and quantify the reduction in the write-latency.

for (Map.Entry<TaskName, Integer> entry : changelogEntries.entrySet()) {
Preconditions.checkNotNull(entry.getKey());
String taskName = entry.getKey().getTaskName();
if (entry.getValue() != null) {
String changeLogPartitionId = String.valueOf(entry.getValue());
LOG.debug("TaskName: {} to Partition: {}", taskName, entry.getValue());
metadataStore.put(taskName, valueSerde.toBytes(changeLogPartitionId));
changelogEntriesToStore.put(taskName, valueSerde.toBytes(changeLogPartitionId));
} else {
LOG.debug("Deleting the TaskName: {}", taskName);
metadataStore.delete(taskName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this change. But it would be beneficial to add a deleteAll API in MetadataStore (synonymous to putAll API). deleteAll API would simplify the iterative delete API invocations. If you agree, then it would be great to create a follow-up ticket and add a todo here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants