Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-893080: session.bulk_save_objects does not put all objects in one INSERT #441

Open
markfickett opened this issue Aug 15, 2023 · 12 comments
Labels
enhancement The issue is a request for improvement or a new feature status-triage_done Initial triage done, will be further handled by the driver team

Comments

@markfickett
Copy link

Please answer these questions before submitting your issue. Thanks!

  1. What version of Python are you using?

Python 3.10.11 (main, Apr 5 2023, 14:15:10) [GCC 9.4.0]

  1. What operating system and processor architecture are you using?

Linux-5.14.0-1029-oem-x86_64-with-glibc2.31

  1. What are the component versions in the environment (pip freeze)?
alembic==1.10.4
pulumi==3.38.0
pulumi-snowflake==0.28.0
pymssql==2.2.7
PyMySQL==0.9.3
PyYAML==5.3.1
snowflake-connector-python==2.8.3
snowflake-sqlalchemy==1.4.7
SQLAlchemy==1.4.31
  1. What did you do?

Using the SqlAlchemy 1.4 bulk_save_objects API, I added 5000 objects in one SqlAlchemy session and then committed the session.

  1. What did you expect to see?

I expected to see one INSERT with 5000 VALUES rows. Instead, I see a variety of INSERT sizes, from 1 row to ~50 rows, and the insert of 5k objects takes 3+ minutes.

image

@markfickett markfickett added bug Something isn't working needs triage labels Aug 15, 2023
@github-actions github-actions bot changed the title session.bulk_save_objects does not put all objects in one INSERT SNOW-893080: session.bulk_save_objects does not put all objects in one INSERT Aug 15, 2023
@adamkipnis-tuskr
Copy link

Just came across this as well. Seems like a pretty big issue...

@andymiller-og
Copy link

andymiller-og commented Apr 5, 2024

Has there been any progress on this issue?

@sfc-gh-dszmolka sfc-gh-dszmolka self-assigned this Apr 8, 2024
@sfc-gh-dszmolka sfc-gh-dszmolka added status-triage Issue is under initial triage and removed needs triage labels Apr 8, 2024
@sfc-gh-dszmolka
Copy link
Contributor

hi and thank you for raising this issue. checking with the team to see whether we can address this before sqlalchemy 2.0 support release (which has right now priority). thank you for bearing with us !

@sfc-gh-dszmolka sfc-gh-dszmolka added status-triage_done Initial triage done, will be further handled by the driver team and removed status-triage Issue is under initial triage labels Apr 8, 2024
@sfc-gh-dszmolka
Copy link
Contributor

my colleague took a look and shared the below example:

Base = declarative_base()

class SampleBulk(Base):
    __tablename__ = "sample_bulk"

    pk = Column(Integer, Sequence('sample_bulk_pk_seq', order=True), primary_key=True)
    name = Column(String(30), )
    amount = Column(Integer, default=0)

    def __repr__(self) -> str:
        return f"SampleBulk(pk={self.pk}, name={self.name}, amount={self.amount})"

def main(engine):
    try:
        Base.metadata.create_all(engine)

        with Session(engine) as session:
            todds = (SampleBulk(name=f"Tod_{i}", amount=i) for i in range(1, 59999))
            session.bulk_save_objects(todds)
            session.commit()

            result = session.query(func.count(SampleBulk.pk)).scalar()
            print(f" *** {result=}")

    finally:
        Base.metadata.drop_all(engine)


if __name__ == "__main__":
    main()

This was able to insert 60k rows in one single command. Also on a side note, to be able to handle NULL in ORM models this example uses Sequence() instead of autoincrement=True, also my colleague found similar problems with generating pk for other dialects besides Snowflake.

All in all; if any of you still has this problem, could you please check with v1.5.1 and see if it works for you ?
Let us know please how it went.

@sfc-gh-dszmolka sfc-gh-dszmolka added the status-information_needed Additional information is required from the reporter label Apr 15, 2024
@adamkipnis-tuskr
Copy link

adamkipnis-tuskr commented Apr 16, 2024

Thanks for the response @sfc-gh-dszmolka. I ran your test and it did write out all 60k records in a single statement. I'm not seeing the same behavior in our app, though, so I'm trying to put together a test case that reproduces what I'm seeing. The entity I'm working with has about 40 columns and when I submit a batch of ~5k records, it inserts in batches of only about 15 at a time. I'll provide more info on the batch insert once I have a better test to illustrate it.

However, the updates are an even bigger issue. If you modify your test case to include the below snippet, you will see that the updates are happening one at a time:

def main(engine):
    try:
        Base.metadata.create_all(engine)

        with Session(engine) as session:
            todds = (SampleBulk(name=f"Tod_{i}", amount=i) for i in range(1, 59999))
            session.bulk_save_objects(todds)
            session.commit()

            result = session.query(func.count(SampleBulk.pk)).scalar()
            print(f" *** {result=}")

            records = session.query(SampleBulk).all()

            for record in records:
                record.name = f"NewName_{record.pk}"

            session.bulk_save_objects(records)
    finally:
        Base.metadata.drop_all(engine)

Output:

[2024-04-16T11:09:58.284-0700] {cursor.py:1032} INFO - query: [SELECT cm_dwh.sample_bulk.pk AS cm_dwh_sample_bulk_pk, cm_dwh.sample_bulk.name A...]
[2024-04-16T11:10:00.090-0700] {cursor.py:1045} INFO - query execution done
[2024-04-16T11:10:00.090-0700] {cursor.py:1205} INFO - Number of results in first chunk: 464
[2024-04-16T11:10:14.102-0700] {cursor.py:1032} INFO - query: [UPDATE cm_dwh.sample_bulk SET name='NewName_1' WHERE cm_dwh.sample_bulk.pk = 1]
[2024-04-16T11:10:15.770-0700] {cursor.py:1045} INFO - query execution done
[2024-04-16T11:10:15.770-0700] {cursor.py:1032} INFO - query: [UPDATE cm_dwh.sample_bulk SET name='NewName_2' WHERE cm_dwh.sample_bulk.pk = 2]
[2024-04-16T11:10:17.339-0700] {cursor.py:1045} INFO - query execution done
[2024-04-16T11:10:17.339-0700] {cursor.py:1032} INFO - query: [UPDATE cm_dwh.sample_bulk SET name='NewName_3' WHERE cm_dwh.sample_bulk.pk = 3]
[2024-04-16T11:10:18.579-0700] {cursor.py:1045} INFO - query execution done
[2024-04-16T11:10:18.580-0700] {cursor.py:1032} INFO - query: [UPDATE cm_dwh.sample_bulk SET name='NewName_4' WHERE cm_dwh.sample_bulk.pk = 4]
[2024-04-16T11:10:20.257-0700] {cursor.py:1045} INFO - query execution done
[2024-04-16T11:10:20.258-0700] {cursor.py:1032} INFO - query: [UPDATE cm_dwh.sample_bulk SET name='NewName_5' WHERE cm_dwh.sample_bulk.pk = 5]
[2024-04-16T11:10:21.912-0700] {cursor.py:1045} INFO - query execution done
[2024-04-16T11:10:21.912-0700] {cursor.py:1032} INFO - query: [UPDATE cm_dwh.sample_bulk SET name='NewName_6' WHERE cm_dwh.sample_bulk.pk = 6]
[2024-04-16T11:10:22.939-0700] {cursor.py:1045} INFO - query execution done
[2024-04-16T11:10:22.940-0700] {cursor.py:1032} INFO - query: [UPDATE cm_dwh.sample_bulk SET name='NewName_7' WHERE cm_dwh.sample_bulk.pk = 7]
[2024-04-16T11:10:24.029-0700] {cursor.py:1045} INFO - query execution done
[2024-04-16T11:10:24.030-0700] {cursor.py:1032} INFO - query: [UPDATE cm_dwh.sample_bulk SET name='NewName_8' WHERE cm_dwh.sample_bulk.pk = 8]
...

My expectation would be that it should be able to batch the updates into a single query with something like:

update sample_bulk b set name=tmp.name 
from (values (1,'NewName_1'),(2,'NewName_2'),(3,'NewName_3')) as tmp (pk,name)
where b.pk=tmp.pk;

@sfc-gh-dszmolka
Copy link
Contributor

Thanks for the repro, we're taking a look.

However this issue started to branch out a little from the original session.bulk_save_objects does not put all objects in one INSERT - we have a working example how to achieve this goal.

Also considering you now have problems with the UPDATE operation and in parallel you do have an official Support case ongoing with Snowflake Support, do you think this Issue on github can be closed out, since the original issue is addressed and you have ongoing help with the UPDATE in the Support case ? (which anyways has strict SLA's, etc. in contrary to issues here)

Once the UPDATE issue is resolved, I can of course retroactively update this issue with the solution to inform anyone else stumbling across this issue.

@adamkipnis-tuskr
Copy link

adamkipnis-tuskr commented Apr 19, 2024

@sfc-gh-dszmolka : I have a reproducible example. The issue is if an entity has optional/nullable columns. Here's an example where inserts are not done in bulk:

Base = declarative_base()

class SampleBulk(Base):
    __tablename__ = "sample_bulk"

    pk = Column(Integer, Sequence('sample_bulk_pk_seq', order=True), primary_key=True)
    name = Column(String(30), )
    amount = Column(Integer, default=0)
    col1 = Column(String(4000))
    col2 = Column(String(4000))
    col3 = Column(String(4000))

    def __repr__(self) -> str:
        return f"SampleBulk(pk={self.pk}, name={self.name}, amount={self.amount})"

def main(engine):
    try:
        Base.metadata.create_all(engine)

        with Session(engine) as session:
            todds = []
            for i in range(1, 10000):
                d = {
                    "pk": i,
                    "name": f"Tod_{i}",
                    "amount": i,
                }

                for col in ['col1', 'col2', 'col3']:
                    if bool(random.getrandbits(1)):
                        d[col] = f"{col}_{i}"

                todds.append(SampleBulk(**d))

            session.bulk_save_objects(todds, update_changed_only=False, return_defaults=False, preserve_order=False)
            session.commit()

            result = session.query(func.count(SampleBulk.pk)).scalar()
            print(f" *** {result=}")
    finally:
        Base.metadata.drop_all(engine)

if __name__ == "__main__":
    main()

Here's snippet of the output:

[2024-04-19T13:31:54.682-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col2) VALUES (1, 'Tod_1', 1, '...]
[2024-04-19T13:31:55.150-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:55.152-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1) VALUES (2, 'Tod_2', 2, '...]
[2024-04-19T13:31:55.576-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:55.577-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1, col3) VALUES (3, 'Tod_3'...]
[2024-04-19T13:31:56.063-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:56.064-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col3) VALUES (4, 'Tod_4', 4, '...]
[2024-04-19T13:31:56.510-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:56.511-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount) VALUES (5, 'Tod_5', 5)]
[2024-04-19T13:31:56.874-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:56.876-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col2) VALUES (6, 'Tod_6', 6, '...]
[2024-04-19T13:31:57.357-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:57.358-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1, col2) VALUES (7, 'Tod_7'...]
[2024-04-19T13:31:57.820-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:57.821-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1) VALUES (8, 'Tod_8', 8, '...]
[2024-04-19T13:31:58.240-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:58.241-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1, col3) VALUES (9, 'Tod_9'...]
[2024-04-19T13:31:58.695-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:58.696-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1) VALUES (10, 'Tod_10', 10...]
[2024-04-19T13:31:59.341-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:59.342-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col3) VALUES (11, 'Tod_11', 11...]
[2024-04-19T13:32:00.013-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:32:00.015-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1, col2, col3) VALUES (12, ...]
[2024-04-19T13:32:00.406-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:32:00.407-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1, col3) VALUES (13, 'Tod_1...]
[2024-04-19T13:32:00.828-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:32:00.829-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col2) VALUES (14, 'Tod_14', 14...]
[2024-04-19T13:32:01.287-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:32:01.287-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1, col3) VALUES (15, 'Tod_1...]
[2024-04-19T13:32:01.683-0700] {cursor.py:1045} INFO - query execution done

It appears that the inserts will only be batched if there are consecutive objects added to the list with the same set of columns populated.

EDIT-Editing to add that I ran this test with preserve_order=False (I originally pasted the wrong version of the code snippet). If that had been set to True, I would understand why the inserts aren't being batched. But this argument is specifically for this case where heterogenous objects can be added in any order and then re-ordered to allow for batching.

@sfc-gh-dszmolka sfc-gh-dszmolka removed the status-information_needed Additional information is required from the reporter label Apr 22, 2024
@sfc-gh-dszmolka
Copy link
Contributor

thank you for adding this repro for 'not batching INSERTs' issue (issue handled in this issue441).

Out of curiosity; I tried running the reproduction without Snowflake, using a postgres instance:

# cat testpg.py 
from sqlalchemy import create_engine, Column, String, Integer, Sequence, func
from sqlalchemy.engine import URL
from sqlalchemy.orm import declarative_base, Session

import random

url = URL.create(
    drivername="postgresql",
    username="postgres",
    host="/var/run/postgresql",
    database="test_db"
)

engine = create_engine(url, echo=True)

Base = declarative_base()

class SampleBulk(Base):
    __tablename__ = "sample_bulk"

    pk = Column(Integer, Sequence('sample_bulk_pk_seq', order=True), primary_key=True)
    name = Column(String(30), )
    amount = Column(Integer, default=0)
    col1 = Column(String(4000))
    col2 = Column(String(4000))
    col3 = Column(String(4000))

    def __repr__(self) -> str:
        return f"SampleBulk(pk={self.pk}, name={self.name}, amount={self.amount})"

def main(engine):
    try:
        Base.metadata.create_all(engine)

        with Session(engine) as session:
            todds = []
            for i in range(1, 10000):
                d = {
                    "pk": i,
                    "name": f"Tod_{i}",
                    "amount": i,
                }

                for col in ['col1', 'col2', 'col3']:
                    if bool(random.getrandbits(1)):
                        d[col] = f"{col}_{i}"

                todds.append(SampleBulk(**d))

            ### tried with both the defaults and both from your repro, INSERTs still come one at a time
            session.bulk_save_objects(todds, update_changed_only=False, return_defaults=False, preserve_order=False)
            #session.bulk_save_objects(todds)
            session.commit()

            result = session.query(func.count(SampleBulk.pk)).scalar()
            print(f" *** {result=}")
    finally:
        Base.metadata.drop_all(engine)

if __name__ == "__main__":
    main(engine)

Result:

..
2024-04-23 07:48:51,485 INFO sqlalchemy.engine.Engine COMMIT
2024-04-23 07:48:51,596 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-04-23 07:48:51,596 INFO sqlalchemy.engine.Engine INSERT INTO sample_bulk (pk, name, amount, col1) VALUES (%(pk)s, %(name)s, %(amount)s, %(col1)s)
2024-04-23 07:48:51,596 INFO sqlalchemy.engine.Engine [generated in 0.00015s] {'pk': 1, 'name': 'Tod_1', 'amount': 1, 'col1': 'col1_1'}
2024-04-23 07:48:51,597 INFO sqlalchemy.engine.Engine INSERT INTO sample_bulk (pk, name, amount, col1, col3) VALUES (%(pk)s, %(name)s, %(amount)s, %(col1)s, %(col3)s)
2024-04-23 07:48:51,597 INFO sqlalchemy.engine.Engine [generated in 0.00011s] {'pk': 2, 'name': 'Tod_2', 'amount': 2, 'col1': 'col1_2', 'col3': 'col3_2'}
2024-04-23 07:48:51,598 INFO sqlalchemy.engine.Engine INSERT INTO sample_bulk (pk, name, amount) VALUES (%(pk)s, %(name)s, %(amount)s)
2024-04-23 07:48:51,598 INFO sqlalchemy.engine.Engine [generated in 0.00008s] {'pk': 3, 'name': 'Tod_3', 'amount': 3}
2024-04-23 07:48:51,598 INFO sqlalchemy.engine.Engine INSERT INTO sample_bulk (pk, name, amount, col1, col3) VALUES (%(pk)s, %(name)s, %(amount)s, %(col1)s, %(col3)s)
2024-04-23 07:48:51,598 INFO sqlalchemy.engine.Engine [cached since 0.001002s ago] {'pk': 4, 'name': 'Tod_4', 'amount': 4, 'col1': 'col1_4', 'col3': 'col3_4'}
2024-04-23 07:48:51,599 INFO sqlalchemy.engine.Engine INSERT INTO sample_bulk (pk, name, amount, col2) VALUES (%(pk)s, %(name)s, %(amount)s, %(col2)s)
2024-04-23 07:48:51,599 INFO sqlalchemy.engine.Engine [generated in 0.00009s] {'pk': 5, 'name': 'Tod_5', 'amount': 5, 'col2': 'col2_5'}
..

every INSERT is sent individually, which to me suggests that this issue (related to nullable/optional columns) is not related to the Snowflake dialect and snowflake-sqlalchemy, as same issue reproduces without Snowflake. Perhaps it could be reported with https://github.com/sqlalchemy/sqlalchemy/issues

For the issue with the UPDATE, if further troubleshooting is required, it would be greatly appreciated if you could please open a separate Issue here.

@sfc-gh-dszmolka sfc-gh-dszmolka removed the bug Something isn't working label Apr 23, 2024
@sfc-gh-dszmolka
Copy link
Contributor

my colleague also found the following interesting Stackoverflow post : https://stackoverflow.com/questions/48874745/sqlalchemy-bulk-insert-mappings-generates-a-large-number-of-insert-batches-is-t

Looks to be very relevant to your use-case and has possible solutions as well, please take a look once you get a chance. Has to do with how the input data is structured, rather than any Snowflake aspect.

@adamkipnis-tuskr
Copy link

my colleague also found the following interesting Stackoverflow post : https://stackoverflow.com/questions/48874745/sqlalchemy-bulk-insert-mappings-generates-a-large-number-of-insert-batches-is-t

Looks to be very relevant to your use-case and has possible solutions as well, please take a look once you get a chance. Has to do with how the input data is structured, rather than any Snowflake aspect.

Yes, I'm aware of that SO post. That is the method I used to work around the batching issue when using bulk_insert_mappings. This case here is with using bulk_save_objects. The way I'm currently working around the issue described in this ticket is to create a temp table, use bulk_insert_mapping to save my new/modified entities (which also doesn't seem to optimize batching even with render_nulls=True), then use MergeInto. It's not ideal.

@sfc-gh-dszmolka
Copy link
Contributor

sfc-gh-dszmolka commented Apr 24, 2024

I can very much agree that it's not ideal, but also a bit baffled by the fact that same behaviour of bulk_save_objects is happening without Snowflake too, so behaviour we're seeing does not seem to come from Snowflake / snowflake-sqlalchemy.

During my tests with Postgres, I also tried using SQLAlchemy 2.0 instead 1.4.52, result was similar and INSERTs are coming one by one or in a maximum of 2-3 sized batches.

Do you know perhaps of any dialect with which your reproduction works as expected, i.e. sends the generated 10k rows in a single INSERT ?

@sfc-gh-dszmolka sfc-gh-dszmolka added the enhancement The issue is a request for improvement or a new feature label May 6, 2024
@sfc-gh-dszmolka sfc-gh-dszmolka removed their assignment May 6, 2024
@sfc-gh-dszmolka
Copy link
Contributor

Since the behaviour does not seem to originate from snowflake-sqlalchemy, we'll be looking at this as an enhancement request. At this time all priorities and resources are allocated to releasing the version of the connector supporting SQLAlchemy 2.0 so at this time I'm unfortunately unable to attach any timeline for implementing the enhancement.
Should any of this change, I'll update this thread.

In the meantime please reach out to your Snowflake Account Team and let them know how having this improvement would be important for your use-case - this might put additional traction on the request and might help with reprioritizing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement The issue is a request for improvement or a new feature status-triage_done Initial triage done, will be further handled by the driver team
Projects
None yet
Development

No branches or pull requests

4 participants