Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve runner so that it can be called from within python #189

Closed
cschloer opened this issue May 14, 2020 · 17 comments
Closed

Improve runner so that it can be called from within python #189

cschloer opened this issue May 14, 2020 · 17 comments

Comments

@cschloer
Copy link
Contributor

Hey,

See datahq/dataflows#96. I'm running into problems with larger pipelines running with just dataflows, so I'm thinking moving back to DPP in some capacity might be a good idea. However, I want to be able to call DPP from within python (without opening up a subprocess) and get the results back in a python variable when it's all done. Is that possible?

@roll

@cschloer
Copy link
Contributor Author

Specifically I think it would also be important to be able to limit the number of rows being held in memory at a given moment. I think that is the problem with dataflows right now - it's sucking in all 18 million rows and then it runs out of memory.

@roll
Copy link
Member

roll commented May 25, 2020

Hi @cschloer,

can you publish a test to crash dataflows? I thought it uses streaming so should be able to handle any file sizes.

cc @akariv

@roll
Copy link
Member

roll commented May 26, 2020

@cschloer
I'm moving it to for a discussion.

I think first, we need to understand the problem with dataflows running out of memory. My hope is that there is a fixable issue. Please share as much as you can so Adam and I can profile.

Returning to the DPP runner is last resort in my opinion.

@cschloer
Copy link
Contributor Author

cschloer commented May 27, 2020

Here is the file:

https://frictionless-share-bucket.s3.amazonaws.com/combined.csv

It is quite large, (1.5GB, ~18 million rows I believe), but the problem also happens for 500mb files.

@cschloer
Copy link
Contributor Author

I usually load it from S3 which might make a difference, but I imagine the same problem will occur with load from local path. The load step is simple format CSV with deduplicate_headers = true

What happens is that CPU goes to 100% usage, memory starts at about 20% and then slowly creeps up higher and higher until my computer freezes and I can't see any more results.

^ That is what happens when I run it on my local development environment. Running in the production AWS environment (ECS with Fargate) results in after about 2 minutes the celery task (I run the pipelines using celery) being killed. As far as I can tell it's being killed by AWS for using too much memory. The same problem happens when I beef up the Fargate task to 32GB, it just takes a little bit longer for it to exceed memory requirements.

I imagine the problem here is that even if dataflows is running as a stream, it still holds the entire result in memory at once, since it needs to return a result and it never uses the filesystem. If the entire result took the same amount of space as the CSV (1.5GB) that would be fine, but I imagine the python datastructures take quite a bit more memory. My suggestion would be to have an option (or automatically do this based on memory usage somehow?) that automatically starts storing results in the filesystem after ~100,000 rows in order to release the memory that is holding the result. Then the return result could be file object referring to that file?

thoughts @akariv @roll ?

@roll
Copy link
Member

roll commented May 27, 2020

Thanks @cschloer,

I'll investigate

@roll
Copy link
Member

roll commented May 27, 2020

@cschloer
Can you please share also the dataflows pipeline you run?

I'm almost sure that's the problem somewhere in tabulator, for example, it doesn't yet support streaming for S3 scheme. If you use https scheme it also can be a problem with remote loader

@roll
Copy link
Member

roll commented May 27, 2020

OK I can confirm:

tabulator

Current memory usage is 6.982565MB; Peak was 6.999308MB
Count: 1770000
Current memory usage is 6.982571MB; Peak was 6.999308MB
Count: 1780000
Current memory usage is 6.98252MB; Peak was 6.999308MB
Count: 1790000
Current memory usage is 6.982574MB; Peak was 6.999308MB
Count: 1800000
Current memory usage is 6.982517MB; Peak was 6.999308MB
Count: 1810000
Current memory usage is 6.982571MB; Peak was 6.999308MB
Count: 1820000
Current memory usage is 6.982519MB; Peak was 6.999308MB
Count: 1830000
Current memory usage is 6.982568MB; Peak was 6.999308MB
Count: 1840000

dataflows

Current memory usage is 228.960815MB; Peak was 228.961327MB
Count: 150000
Current memory usage is 244.653218MB; Peak was 244.65373MB
Count: 160000
Current memory usage is 260.186048MB; Peak was 260.18656MB
Count: 170000
Current memory usage is 275.894346MB; Peak was 275.894858MB
Count: 180000
Current memory usage is 291.43727MB; Peak was 291.437782MB
Count: 190000
Current memory usage is 307.178316MB; Peak was 307.178828MB
Count: 200000
Current memory usage is 322.717173MB; Peak was 322.717685MB
Count: 210000
Current memory usage is 338.258286MB; Peak was 338.262494MB
Count: 220000
Current memory usage is 354.027328MB; Peak was 354.02784MB

@akariv
Is it expected or we can fix it?

@akariv
Copy link
Member

akariv commented May 27, 2020 via email

@roll
Copy link
Member

roll commented May 27, 2020

Thanks!

My scripts:

tabulator

import tracemalloc
from tabulator import Stream


tracemalloc.start()
count = 0
source = 'https://frictionless-share-bucket.s3.amazonaws.com/combined.csv'
with Stream(source) as stream:
    for row in stream.iter():
        count += 1
        if not count % 10000:
            current, peak = tracemalloc.get_traced_memory()
            print(f'Current memory usage is {current / 10**6}MB; Peak was {peak / 10**6}MB')
            print(f'Count: {count}')
print(count)

dataflows

from dataflows import Flow, load

flow = Flow(
    load(
        'https://frictionless-share-bucket.s3.amazonaws.com/combined.csv',
        deduplicate_headers=True,
        limit_rows=100000000000, # load.limiter is used to print memory usage
    )
)
data, package, stats = flow.results()
print(data)
print(package)
print(stats)

@roll
Copy link
Member

roll commented May 27, 2020

I'll try the same for tableschema and datapackage to exclude them from memory leaking suspected

@cschloer
Copy link
Contributor Author

If you want I can share my dataflow script, but it looks like you've got a pretty good example. Mine uses custom processors anyways so it'd be a bit more difficult to transfer to something useful for you.

@akariv
Copy link
Member

akariv commented May 27, 2020

@roll

Flow().results() returns all the data processed so obviously it will take a lot of memory.

To avoid that you should use Flow().process() instead.

The documentation mentions that but is not very explicit I now see

What about large data files? In the above examples, the results are loaded into memory, which is not always preferable or acceptable. In many cases, we'd like to store the results directly onto a hard drive - without having the machine's RAM limit in any way the amount of data we can process.

@roll
Copy link
Member

roll commented May 27, 2020

@akariv
I've run into the same conclusion - datahq/dataflows#139 😃

@cschloer
Can you please confirm that it doesn't (over)consume memory in your case with flow.process()?

@cschloer
Copy link
Contributor Author

I'll confirm that the flow.process() works, specifically with our custom dump_to_s3 processor.

@roll
Copy link
Member

roll commented May 28, 2020

Cool @cschloer! Please close this issue when it's confirmed

@cschloer
Copy link
Contributor Author

Hey so I've updated the infrastructure to use process() instead of results(). I am able to succesfully load in 18 million rows (:tada:) with an automatically added dump_to_s3 step as well (a necessary addition to the infrastructure in order to get the results for display).

I'm still running into intermittent issues with memory when I added more steps, but I am not able to reproduce them consistently. I also suspect it might have something to do with my celery setup rather than something on the dataflow end. I will close for now and create a new issue if I find problems arising again.

Thanks for the help guys!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants