Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark zipnumcluster job (draft) #8

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open

Spark zipnumcluster job (draft) #8

wants to merge 16 commits into from

Conversation

jt55401
Copy link

@jt55401 jt55401 commented Nov 11, 2024

This is a cc-pyspark version of the zipnum clustering job (without the use of mrjob framework)

  • This may be problematic, but, I am not sorting the results quite as strictly as the existing job - my read of the zipnum document makes it sound like as long as your index file points to right place, we should be fine. Not sure if this will really work until I test it though.
  • My test plan was going to be to run the index server locally and confirm I can get urls in the index, and also not in the index (ie: in between chunks)

@jt55401 jt55401 self-assigned this Nov 11, 2024
@jt55401
Copy link
Author

jt55401 commented Nov 11, 2024

also note: @sebastian-nagel - I do have a "more perfect sorting" version which uses reservoir sampling in my local history - if we end up needing it, it's already done.

@sebastian-nagel
Copy link

not sorting the results quite as strictly as the existing job - my read of the zipnum document makes it sound like as long as your index file points to right place, we should be fine.

After thinking longer about it: some types of queries will still work but others don't. The problem starts when the software reading the CDX index assumes that it is totally sorted. This especially applies to any kind of range queries.

For example, the query

https://index.commoncrawl.org/CC-MAIN-2024-42-index?url=*.com&showNumPages=true&output=json

returns {"pages": 84874, "pageSize": 5, "blocks": 424369}.

So, for the .com top-level domain there are maximum 424369 * 3000 = 1273107000 or 1.27 billion captures in CC-MAIN-2024-42.

Basically this is counting the number of lines in the cluster.idx which start with com,:

$> grep -c '^com,' collections/CC-MAIN-2024-42/indexes/cluster.idx 
424368

Because there might be also results in the zipnum block before the first one, 1 is added to the number of lines.

If the zipnum blocks are non-contiguous, we'd need to add 1 for every contiguous range of block. Naturally, the result will become less precise.

In addition, there's more work to do for larger range queries. That's what the statement "Generally, this overhead [of the zipnum index] is negligible when looking up large indexes, and non-existent when doing a range query across many CDX lines." (https://pywb.readthedocs.io/en/latest/manual/indexing.html#zipnum)

On the other end, queries for single URLs might work the same and with the same performance independent from the partitioning scheme.

a "more perfect sorting" version

What does it mean? Total order sorting?

My test plan was going to be to run the index server locally and confirm I can get urls in the index, and also not in the index (ie: in between chunks)

All kind of range queries also need to be tested:

  • using a wildcard: *.com, *.org.uk, youtu.be/a*
  • matchType={prefix,host,domain}
  • showNumPages

Of course, even then: we'd need to document for our users the new CDX index sorting and spread this information. We do not know which assumptions are made in any third-party software and whether they rely on the total order sorting. This alone might make it less work to implement the total order sorting.

if len(current_chunk) >= chunk_size:
# Compress and write chunk
chunk_data = ''.join(current_chunk).encode('utf-8')
compressed = z.compress(chunk_data)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whenever a new chunk is started the zlib compressobj needs to be reset. Also need to flush. Should be:

z = zlib.compressobj(6, zlib.DEFLATED, zlib.MAX_WBITS + 16)
compressed = z.compress(chunk_data) + z.flush()

# Handle final chunk
if current_chunk:
chunk_data = ''.join(current_chunk).encode('utf-8')
compressed = z.compress(chunk_data) + z.flush()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well: start a new compressobj.


# Create single index entry per record
for sk, ts in chunk_records:
index_entries.append((sk, ts, partition_id, current_offset, chunk_length))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The secondary index index (cluster.idx) only contains the offset for the first record of every zipnum block but not all of them.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah - this (and your explanation above) makes more sense to me now, thank you.
(I am still not seeing any of that in the docs I was able to find, but, I trust you, and will adjust)

f.write(compressed)

for sk, ts in chunk_records:
index_entries.append((sk, ts, partition_id, current_offset, chunk_length))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also: only first record.

@jt55401
Copy link
Author

jt55401 commented Nov 11, 2024

What does it mean? Total order sorting?

I mentioned in next sentence fragment - I used same technique as was used in hadoop version - reservoir sampling to produce the ranges, then another pass using those ranges to do the shards.

I will find that version in my local history and check it when I work on this next.

@sebastian-nagel
Copy link

reservoir sampling to produce the ranges, then another pass using those ranges to do the shards.

Maybe it's not necessary to do the sampling step - Spark has a sortBy (or sortByKey) method which does a total order sorting with N partitions. We use it to sort the vertices of the host-level webgraph before enumerating them. Same with the reservoir sampling: the partitions are not perfectly balanced, but the balance is acceptable.

Note: Spark has also methods to only sort the data within the partitions, they are usually named by "WithinPartitions", see for example repartitionAndSortWithinPartitions.

@jt55401
Copy link
Author

jt55401 commented Nov 15, 2024

reservoir sampling to produce the ranges, then another pass using those ranges to do the shards.

Maybe it's not necessary to do the sampling step - Spark has a sortBy (or sortByKey) method which does a total order sorting with N partitions. We use it to sort the vertices of the host-level webgraph before enumerating them. Same with the reservoir sampling: the partitions are not perfectly balanced, but the balance is acceptable.

Note: Spark has also methods to only sort the data within the partitions, they are usually named by "WithinPartitions", see for example repartitionAndSortWithinPartitions.

Indeed - I was aware of these, but not all of them, and only some of the nuance. I've done some deep reading, and, by my best assessment:

my informal definition of "perfect sort" is the last record of 1 partition will be less than the first record of the next partition (so, if I go through the partitions in order, I will never get records out of order.)

  • sort() (and sortByKey)- will work - and will sort perfectly. As you say, it does require a shuffle, so, it's "expensive" by comparison.
  • repartitionAndSortWithinPartitions seems like the best option.
    • using hash to partition, like I do now - will be faster, as it will require fewer passes through the data, and will produce more uneven partitions, but, they should be "perfectly" sorted.
    • using reservoir sampling - requires a sampled pass through the data to generate ranges, and will generate more even partitions, with "almost" perfect sort - but not 100 due to sampling I think...
    • using dynamic reservoir sampling - will be about as fast as hash, but, the initial distributions to each partition will be relatively "messy" (I do have a version of this as well)

I'm leaning towards repartitionAndSortWithinPartitions, using hash of the url - but I may change my mind after running a few jobs and seeing how uneven they are... IMHO, 5-10% variance seems fine, if it's much more than that, it doesn't feel as good (though, that's why I want to read the zipnum code as I state below - it may not have a practical issue... so, could be fine)

Since I'm waiting on/monitoring other jobs anyway, I'm going to take another block of time tomorrow to do similar deep read of zipnum code, just so I have much better understanding of that as well. (specifically, I'm going to read the index server's code which USES zipnum, as that's the part that is still murky to me)

Thanks again for the input @sebastian-nagel , much appreciated.

@wumpus
Copy link
Member

wumpus commented Nov 15, 2024

Everyone's expectation is that the cdx index shards and parquet shards surt values should not overlap.

For cdx, that's expected by pywb.

For parquet, it's important for optimization. We do have a few parquet indexes for which that isn't true, and it's a problem we will fix someday.

@jt55401
Copy link
Author

jt55401 commented Nov 17, 2024

Everyone's expectation is that the cdx index shards and parquet shards surt values should not overlap.

Got it - I think with hash and reservoir sampled approaches I outlined, they should not overlap at the shard level (as the former would not allow it, and the latter would be matching what we already do today pretty exactly). There MAY be overlapping gzip chunks that overlap (very small amounts with reservoir, and potentially rather larger amounts with hash) - but, as long as the secondary index reflects those properly, I don't think it'll be an issue based on my read of the index server side of things.

@jt55401
Copy link
Author

jt55401 commented Nov 17, 2024

I will get back to this task Monday, so, plenty of time to discuss if I'm wrong there... I will bring it up on eng. call, and if we need to talk, we can do it then.

@sebastian-nagel
Copy link

sebastian-nagel commented Nov 18, 2024

Everyone's expectation is that the cdx index shards and parquet shards surt values should not overlap.

Got it - I think with hash and reservoir sampled approaches I outlined, they should not overlap at the shard level

What Greg means is that there must be zero overlap for all ranges defined by the first and the last SURT in a zipnum block. It's important because the secondary index (cluster.idx) only stores the first SURT but not the last one. But with strict sorting, the last one must be lower (sorts before) than the first one of the next zipnum block.

For Parquet zero overlap is an optimization but no requirement: every Parquet file and row group has the min and max values in the statistics in the footer.

@jt55401
Copy link
Author

jt55401 commented Nov 20, 2024

I have updates to this task in another branch - for now, I'm going to preserve the existing approach, reservoir, and get this task finished. I have it all working locally and will be testing tonight/tomorrow in s3 on a full crawl, and will then re-do the PR to reflect (I'll probably merge it into this branch, to keep things simple, and preserve the above history.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants