Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How should I handle arrays of structs, #17

Open
dandexcare opened this issue Jun 16, 2023 · 16 comments
Open

How should I handle arrays of structs, #17

dandexcare opened this issue Jun 16, 2023 · 16 comments

Comments

@dandexcare
Copy link

How do I handle a field with array of struct values like the following? I will send a full example shortly :

df.with_columns(pl.struct('aggregate_ratings.sub_ratings').alias('aggregate_ratings.sub_ratings').map(to_json, return_dtype=pl.Utf8))

id = pa.array([1,2,3])
complicated = pa.array([[{'average_rating': 4.9, 'crawled_date': '2023-06-06'},{'average_rating': 4.7, 'crawled_date': '2023-06-04'}]
                        ,[{'average_rating': 4.8, 'crawled_date': '2023-05-06'},{'average_rating': 4.6, 'crawled_date': '2023-05-04'}]
                        ,[{'average_rating': 4.7, 'crawled_date': '2023-04-06'},{'average_rating': 4.5, 'crawled_date': '2023-04-04'}]])
names = ["id", "complicated"]
complicated = array_to_json(complicated) 
df = pa.RecordBatch.from_arrays(
        [
            pa.array([0, 1, 2]),
            pa.array(array_to_json(complicated),              
            type=pa.list_(pa.struct(pa.field("average_rating", pa.double()),pa.field("crawled_date", pa.large_string()))),
            ),
        ],
        schema=pa.schema(
            [ ("id", pa.int32()),
                pa.field(
                    "complicated",
                    pa.list_(pa.struct(pa.field("average_rating", pa.double()),pa.field("crawled_date", pa.large_string()))),
                ),
            ]
        ),names=names).to_pandas()

print(df)

I made an attempt with the following encoder but it fails on the copy because the output tye is Jsonb() instead of jsonb[]:

encoder = ArrowToPostgresBinaryEncoder.new_with_encoders(
    schema,
    {   'main_key': Int32EncoderBuilder(schema.field('main_key')),
        'aggregate_ratings.sub_ratings': LargeStringEncoderBuilder.new_with_output(
        schema[schema.get_field_index('aggregate_ratings.sub_ratings')],
        Jsonb()
    )}
)

Error:

psycopg.errors.QueryCanceled: COPY from stdin failed: error from Python: PanicException - called `Result::unwrap()` on an `Err` value: ColumnTypeMismatch { field: "aggregate_ratings.sub_ratings", expected: "arrow_array::array::byte_array::GenericByteArray<arrow_array::types::GenericStringType<i64>>", actual: LargeList(Field { name: "item", data_type: Struct([Field { name: "average_rating", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "crawled_date", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "metric", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) }
@adriangb
Copy link
Owner

See #5

@adriangb
Copy link
Owner

Sorry I think I got confused by the mention of struct columns. Seems like the issue is something else, let me take a look.

Regardless, the error message should be better

@dandexcare
Copy link
Author

dandexcare commented Jun 16, 2023 via email

@adriangb
Copy link
Owner

I think the issue is how you were creating the encoders. In particular I don't see LargeStringEncoderBuilder anywhere. Try to adapt this example + the one from #13:

import psycopg
import pyarrow as pa
from pgpq import ArrowToPostgresBinaryEncoder
from pgpq.encoders import ListEncoderBuilder, LargeStringEncoderBuilder
from pgpq.schema import Jsonb

fields = [
    ('a', pa.int32()),
    ('b', pa.bool_()),
]
schema = pa.schema([
    ('col', pa.list_(pa.large_string())),
])

batch = pa.RecordBatch.from_arrays(
    [
        pa.array([['{"a":123,"b":false}', '{"a":456,"b":true}']])
    ],
    schema=schema,
)

encoder = ArrowToPostgresBinaryEncoder.new_with_encoders(
    batch.schema,
    {
        'col': ListEncoderBuilder.new_with_inner(
            schema.field(0),
            LargeStringEncoderBuilder.new_with_output(
                schema.field(0).type.field(0),
                Jsonb(),
            )
        )
    }
)

ddl = f"DROP TABLE IF EXISTS data;CREATE TABLE data (data jsonb[]);"

with psycopg.connect("postgres://postgres:postgres@localhost:5432/postgres") as conn:
    with conn.cursor() as cursor:
        cursor.execute(ddl)  # type: ignore
        with cursor.copy("COPY data FROM STDIN WITH (FORMAT BINARY)") as copy:
            copy.write(encoder.write_header())
            copy.write(encoder.write_batch(batch))
            copy.write(encoder.finish())
    conn.commit()

I do feel like there should be a simpler way to build these encoders, but I'm not sure what the right API is just yet.

@dandexcare
Copy link
Author

dandexcare commented Jun 16, 2023

I noticed that you have single quotes surrounding the struct in the array of structs. So I will do that transformation:

pa.array([['{"a":123,"b":false}', '{"a":456,"b":true}']])

And I am receiving the "jsonb not-supported error", and I am not clear the work-around while using the release 0.7.3. I tried making the receiving table JSON, but I don't know how to set the output type of the encoder to Json().

encoder = ArrowToPostgresBinaryEncoder.new_with_encoders(
    batch.schema,
    {
        'col': ListEncoderBuilder.new_with_inner(
            schema.field(0),
            LargeStringEncoderBuilder.new_with_output(
                schema.field(0).type.field(0),
                Jsonb(),
            )
        )
    }
)

@adriangb
Copy link
Owner

The single quotes are just because json uses double quotes so it avoids escaping. You should get the same result if you convert a struct to json, I just skipped that step to simplify the example.

And I am receiving the "jsonb not-supported error", and I am not clear the work-around while using the release 0.7.3. I tried making the receiving table JSON, but I don't know how to set the output type of the encoder to Json().

I'm not sure I'm understanding what issue you are having or where you're getting that error. The jist of it is that you need to make the encoder a list of Jsonb, what you have there looks about right, and make the column in the table jsonb[] (see my DDL above).

@dandexcare
Copy link
Author

image

I'm not sure I'm understanding what issue you are having or where you're getting that error. The jist of it is that you need to make the encoder a list of Jsonb, what you have there looks about right, and make the column in the table jsonb[] (see my DDL above).

I did not change your code that you sent 12 hours ago... Any ideas?

Warm regards,
Dan

@adriangb
Copy link
Owner

Are you sure you're using 0.7.3? Does my example above as is run for you?

@dandexcare
Copy link
Author

yes, but just to make sure, I am going to create a venv

@dandexcare
Copy link
Author

numpy==1.24.3
pgpq==0.7.3
polars==0.18.3
psycopg==3.1.9
pyarrow==12.0.1
typing_extensions==4.6.3

It didn't error! But the result looks like a struct within a struct, not and array of structs:
image

@adriangb
Copy link
Owner

try doing something like SELECT col[1] FROM data

@dandexcare
Copy link
Author

It worked, so is that functionally the same as [{'a': 123, 'b': False}, {'a': 456, 'b': True}] ?

@adriangb
Copy link
Owner

Yes at that point this is in Postgres, what I gave you is just the Postgres syntax for working with arrays: https://www.postgresql.org/docs/current/arrays.html

@dandexcare
Copy link
Author

dandexcare commented Jun 16, 2023

It worked, so finish off this evaluation, I need to create an encoder for all the arrays of structs:

encoder = ArrowToPostgresBinaryEncoder.new_with_encoders(
    schema,
    {
        field: ListEncoderBuilder.new_with_inner(
            schema[schema.get_field_index(field)],
            LargeStringEncoderBuilder.new_with_output(
                schema[schema.get_field_index(field)].type[schema.get_field_index(field)],
                Jsonb(),
            )
        )
       for field in struct_array_fields
    }
)

and then possibly apply the uint64 -> uint 32 transformation that I saw in the outstanding issues. Hopefully it will just work!! Much appreciated!

@adriangb
Copy link
Owner

I think this is resolved, can we close this issue?

@dandexcare
Copy link
Author

dandexcare commented Jun 20, 2023

I am blocked by this. I reduced the number of fields to just one to try to isolate the problem.:

ColumnTypeMismatch { field: "aggregate_ratings.sub_ratings", expected: "arrow_array::array::list_array::GenericListArray<i32>", actual: LargeList(Field { name: "item", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) }

When converting to json, I need to set orient='records' so the metadata falls away. I don't know how to stick that into this function:

def to_json(s: pl.Series) -> pl.Series:
    print('*********************s')
    print(s)
    a = s.to_arrow()
    a = array_to_utf8_json_array(a)
    print(a)
    return pl.from_arrow(a)

THis is how is was able to converrt the items in the arrays of structs to json strings, but I think it is capturing the metadata, but I am having difficulty troubleshooting:


# Add a row number column using row_number() function
df = df.with_row_count(name="row_num", offset=1)

exploded_df = df.explode("aggregate_ratings.sub_ratings")
exploded_df = exploded_df.with_columns(pl.col("aggregate_ratings.sub_ratings")).map(to_json, return_dtype=pl.Utf8)

aggregated_df = exploded_df.groupby("row_num").agg(
        [
            pl.col('aggregate_ratings.sub_ratings'),
        ]
    )
df = aggregated_df.select(['aggregate_ratings.sub_ratings'])

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants