Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use-case: Computing PageRank on PySpark #144

Closed
daveaitel opened this issue Nov 2, 2021 · 18 comments
Closed

Use-case: Computing PageRank on PySpark #144

daveaitel opened this issue Nov 2, 2021 · 18 comments

Comments

@daveaitel
Copy link

Ran a fairly big pagerank job across my data set in pySpark...worked super well. Thanks for the library!

@EnricoMi
Copy link
Collaborator

EnricoMi commented Nov 3, 2021

Thanks for the feedback, this sounds awesome. Can you give some more details on your dataset, dgraph and spark cluster?

  • How many triples, nodes, types and predicates do you read in to compute the PageRank?
  • How many Dgraph alpha nodes and Spark nodes do you use?
  • How many CPU cores do your Dgraph alpha and Spark worker nodes have?
  • How long does the reading phase (ex. PageRank computation) take?

Did you try different configuration values for dgraph.chunkSize or dgraph.partitioner.uidRange.uidsPerPartition?

@daveaitel
Copy link
Author

I mean, I read in my whole DB! This is my basic script:

#new
pyspark --packages uk.co.gresearch.spark:spark-dgraph-connector_2.12:0.7.0-3.1,graphframes:graphframes:0.8.1-spark3.0-s_2.12

from gresearch.spark.dgraph.connector import *

#triples: DataFrame = spark.read.dgraph.triples("localhost:9080")
edges: DataFrame = spark.read.option("dgraph.chunksize", 300).dgraph.edges("localhost:9080")
nodes: DataFrame = spark.read.option("dgraph.chunksize", 300).dgraph.nodes("localhost:9080")

from graphframes import *
nodes2 = nodes.withColumnRenamed("subject", "id")
edges2 = edges.withColumnRenamed("subject", "src").withColumnRenamed("objectUid","dst")
g = GraphFrame(nodes2, edges2)
sc.setCheckpointDir("/tmp")

g.outDegrees.orderBy("outDegree", ascending=False).limit(10).show()
g.inDegrees.orderBy("inDegree", ascending=False).limit(10).show()

g.triangleCount().orderBy("count", ascending=False).limit(10).show()
pr = g.pageRank(resetProbability=0.15, tol=0.01)

#pr.vertices.orderBy("pagerank", ascending=False).limit(100).show(100)
pr.vertices.select("id","pagerank").dropDuplicates().orderBy("pagerank", ascending=False).limit(100).show(100)

pr.vertices.select("id","pagerank").dropDuplicates().orderBy("pagerank", ascending=False).write.json("pagerank.json")

result = g.labelPropagation(maxIter=10) #FAILS TO COMPLETE

Right now I think I am using the "all in one" Docker Image (aka, one Zero/Alpha). But performance is fine really...I have 8 cores and 64G of RAM.

g.vertices.count()
21418467

g.edges.count()
13125761

I can't remember how long pagerank took but the counting of vertices took a few minutes, if that's any judge. :) I can get better data for you at some point!

Will this version work on the new version of DGraph that is about to come out?

Thanks!

@daveaitel
Copy link
Author

edges2 = edges.withColumnRenamed("subject", "src").withColumnRenamed("objectUid","dst") <---btw. I can't remember if you have this in the documentation but I think you need to rename the columns to make it work? I could be misremembering.

@EnricoMi
Copy link
Collaborator

EnricoMi commented Nov 4, 2021

Thanks for the insights.

The time that your counts take is pretty much what is needed to transfer the graph from Dgraph over to Spark. There is not much more overhead include. This is a good way of measuring read speed.

Looks like you are running Dgraph and the Spark app on the same machine, so there are 8 concurrent Spark tasks reading from your single alpha node. From my experience I would expect your CPU to be 100% utilized by Dgraph alpha in this setup. So you could improve speed by putting Dgraph on a separate machine with at least twice as many CPUs as your Spark job has. But given that graph reads in minutes, there is not much need to improve read speed.

You are setting dgraph.chunksize to 300, which is very low. Is the default not working for you? I would expect larger values like 10000 to be faster (assuming the Dgraph alpha is not saturating the CPU).

I will look into adding support for GraphFrame in PySpark so that you can reduce this

edges: DataFrame = spark.read.option("dgraph.chunksize", 300).dgraph.edges("localhost:9080")
nodes: DataFrame = spark.read.option("dgraph.chunksize", 300).dgraph.nodes("localhost:9080")

nodes2 = nodes.withColumnRenamed("subject", "id")
edges2 = edges.withColumnRenamed("subject", "src").withColumnRenamed("objectUid","dst")
g = GraphFrame(nodes2, edges2)

to

g = spark.read.option("dgraph.chunksize", 300).dgraph.graphframes("localhost:9080")

@daveaitel
Copy link
Author

daveaitel commented Nov 4, 2021 via email

@EnricoMi
Copy link
Collaborator

EnricoMi commented Nov 4, 2021

An excellent use case for #74. Thanks for the valuable insights!

@EnricoMi EnricoMi closed this as completed Nov 4, 2021
@EnricoMi
Copy link
Collaborator

EnricoMi commented Nov 4, 2021

Will this version work on the new version of DGraph that is about to come out?

I forgot to answer this bit: We will see once it comes out. Earlier releases introduced breaking changes. Watch this space.

@EnricoMi
Copy link
Collaborator

EnricoMi commented Nov 4, 2021

I can confirm that v21.09.0 will not work with ≤ 0.7.0 releases of the connector as there is a breaking change in their /state endpoint. Anyway, there will be a new release of the connector once this is sorted out.

@daveaitel
Copy link
Author

daveaitel commented Nov 4, 2021 via email

@EnricoMi EnricoMi changed the title FWIW Use-case: Computing PageRank on PySpark Nov 5, 2021
@EnricoMi EnricoMi pinned this issue Nov 5, 2021
@daveaitel
Copy link
Author

daveaitel commented Nov 5, 2021 via email

@EnricoMi
Copy link
Collaborator

This is not implemented but should be doable for a subset of DQL queries. Ingesting a partitioning will be interesting, though.

There are quite a few filter-pushdowns implemented, so maybe you can use those to filter down to a subgraph on the Dgraph side.

@EnricoMi
Copy link
Collaborator

EnricoMi commented Dec 2, 2021

Hey @daveaitel, Dgraph v21.12.0 has just been released. I have prepared a SNAPSHOT release of the connector for you to try against that Dgraph: 0.8.0-3.1-20211202.210227-1. You may need to add this url to your list of repositories: https://oss.sonatype.org/content/repositories/snapshots.

@daveaitel
Copy link
Author

daveaitel commented Dec 3, 2021 via email

@daveaitel
Copy link
Author

daveaitel commented Dec 7, 2021 via email

@daveaitel
Copy link
Author

daveaitel commented Dec 7, 2021 via email

@EnricoMi
Copy link
Collaborator

EnricoMi commented Dec 7, 2021

Yes, spark-dgraph-connector_2.12:0.8.0-3.1-SNAPSHOT should pick up the right snapshot.

I presume that PageRank ran against the latest v21.12.0 Dgraph. So that works then. Great!

@EnricoMi
Copy link
Collaborator

@daveaitel I have released the Dgraph 20.12.0 support in 0.8.0.

@daveaitel
Copy link
Author

daveaitel commented Jan 22, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants