Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Test to illustrate https://github.com/twitter/summingbird/issues/671 #672

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from

Conversation

pankajroark
Copy link
Contributor

To illustrate the issue with #671

@johnynek
Copy link
Collaborator

we should see if this is also a bug with scalding.

We should have common code that does the naming based only on the Producer graph (and maybe on the items inside the graph that get the names, so we can actually translate the names).

I tried to do this in the past, but it was reverted.

val bolts = stormTopo.get_bolts

// Tail should use parallelism specified for the summer node
assert(bolts("Tail").get_common.get_parallelism_hint == 20)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's assert both settings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean:
assert(bolts("Tail").get_common.get_parallelism_hint != 10)
assert(bolts("Tail").get_common.get_parallelism_hint == 20)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to write the unit test for scalding platform. I'm trying to use the Reducers option for this. For the online platform tests we could inspect the created topology to check the number of summer nodes created, how do I do the corresponding thing for the scalding platform. i.e. how do I inspect the PipeFactory created after planning to check how many reducers it's going to use for a node? I'm new to the scalding platform of summingbird, so may be missing obvious things, will appreciate any help. Thanks

@pankajroark
Copy link
Contributor Author

I added a few named option tests for scalding including for this case. This bug doesn't seem to affect scalding platform.

@pankajroark
Copy link
Contributor Author

Added a proposed fix, which is a single word change in StormPlatform's get function. The explanation is a bit complex though and follows:
The way node members are stored in SummerNode is initially Summer(IdentityKeyedProducer(FlatMappedProducer....))), IdentityKeyedProducer(FlatMappedProducer....))

but here https://github.com/twitter/summingbird/blob/develop/summingbird-online/src/main/scala/com/twitter/summingbird/planner/OnlinePlan.scala#L230 the order is reversed.

So node.members.last returns IdentityKeyedProducer(FlatMappedProducer....))

When the irrudicible analysis is done this IdentityKeyedProducer doesn't cover the Store used in the summer, and then during the reverse mapping it ends up getting mapped to both flatMapper and summer named nodes.

Thus we end up looking at the options of both flatMapper and summer named nodes and find the flatMapper setting first.

By using node.members.head we use Summer(IdentityKeyedProducer(FlatMappedProducer....))) for look up and correctly find only the summer named node and thus only look up there.

@NPraneeth
Copy link
Contributor

While this fix solves this bug, i think it creates a new one.

Lets suppose we have :
Source(_).name("dummySrc").map ( fn ). name ("dummyOpt").sumByKey(_*)
Options.set("dummySrc"->SummerParallelism(10))
Which gets converted to :
NamedProducer(OptionMappedProducer(NamedProducer(Source))) and the SourceNode will contain nodes : OptionMappedProducer, Source

When you take nodes.members.head -> you will get OptionMappedProuducer's options instead of the SourceProducer's options.
So nodes.members.head might work in the summerNode but will be failing in the SourceNode. The same problem the other way around.

@pankajroark
Copy link
Contributor Author

@NPraneeth good catch.

configCollector.config
}

def verify[T](
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is T?

@NPraneeth
Copy link
Contributor

@pankajroark As the code for the 'get' method is in StormPlatform, we can use case statements to see if it's a SourceNode/SummerNode and try to collect the SourceProducer/SummerProducer and then grab the options corresponding to the producers.( This would be better than grabbing the options for the last node in the members ). And for FlatMapNode we can leave the node.members.last as default.
For Example : SummerNode -> Summer(IdentityKey(....
Instead of getting node.member.last ( IdentityKeyProd in this case ), we can identify it to be a SummerNode and grab the options for the SummerProducer instead. Similarly for the SourceNode too.

@CLAassistant
Copy link

CLAassistant commented Jul 18, 2019

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants