Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow blob in write_fragments #3235

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

fecet
Copy link

@fecet fecet commented Dec 12, 2024

Allow user use write_fragments for lance with blob storage class by returning a nullable list of blob ops

@github-actions github-actions bot added enhancement New feature or request python labels Dec 12, 2024
@fecet fecet force-pushed the feat/frag-blob branch 2 times, most recently from 71c464c to e0784b9 Compare December 14, 2024 06:00
@fecet
Copy link
Author

fecet commented Dec 14, 2024

@westonpace Could you review it please? I'm wondering if the change in python/src/fragment.rs is valid? As my blob column in the balanced dataset become all None after I readed them by _take_rows.

@fecet
Copy link
Author

fecet commented Dec 15, 2024

image
I think the problem is it's not possible to modify id of schema on python side

@fecet fecet marked this pull request as ready for review December 15, 2024 10:16
Copy link
Contributor

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this! Overall I think this looks like it is going in the right direction.

I have a few cleanup suggestions. Also, this will need a unit test before we can merge. test_commit_batch_append in test_dataset.py should be pretty close.

I think the problem is it's not possible to modify id of schema on python side

Shouldn't the id of the field remain the same? An append should only add new fragments, it should not modify the schema in any way.

If you can make a unit test (even if its a failing one) then I can debug further. At the moment it isn't clear to me exactly how you plan to use this so it is a bit hard to debug.

@@ -513,6 +513,67 @@ pub fn write_fragments(
.collect()
}

#[pyfunction(name = "_write_fragments_with_blobs")]
#[pyo3(signature = (dest, reader, **kwargs))]
pub fn write_fragments_with_blobs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason we can't modify write_fragments above instead of introducing a new method?

It's ok for there to be breaking changes at the rust level because the public API is the write_fragments in fragment.py.

Comment on lines 287 to 315

let min_field_id = fragments.iter()
.flat_map(|fragment| &fragment.files)
.flat_map(|file| &file.fields)
.min()
.copied();
let new_schema = if let Some(min_id) = min_field_id {
let filtered_fields: Vec<Field> = schema.fields
.iter()
.filter(|f| f.id >= min_id)
.cloned()
.collect();

if filtered_fields.is_empty() {
return Err(PyValueError::new_err(format!(
"No fields in schema have field_id >= {}",
min_id
)));
}

Schema {
fields: filtered_fields,
metadata: schema.metadata.clone(),
}
} else {
schema
};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are these changes related? Are you doing a overwrite operation?

Comment on lines 2225 to 2234
if isinstance(operation, Transaction):
new_ds = _Dataset.commit_transaction(
base_uri,
operation,
commit_lock,
storage_options=storage_options,
enable_v2_manifest_paths=enable_v2_manifest_paths,
detached=detached,
max_retries=max_retries,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...being able to send a transaction in directly seems like it introduces more complexity to the user than needed. Is it possible to modify https://github.com/lancedb/lance/blob/main/python/python/lance/dataset.py#L2494 (the Append operation) so that it can take in a second list of blob fragments:

    @dataclass
    class Append(BaseOperation):
        fragments: Iterable[FragmentMetadata]
        blob_fragments: Iterable[FragmentMetadata] = []

@fecet
Copy link
Author

fecet commented Dec 17, 2024

I have found that the current implementation still has some issues, but I can first outline the difficulties I am currently encountering:

I am using the code below for testing, and it currently needs to run on the modified version.

import shutil


def make_table(offset, num_rows, big_val):
    end = offset + num_rows
    values = pa.array([big_val for _ in range(num_rows)], pa.large_binary())
    idx = pa.array(range(offset, end), pa.uint64())
    table = pa.record_batch(
        [idx, values],
        schema=pa.schema(
            [
                pa.field("idx", pa.uint64()),
                pa.field(
                    "blobs",
                    pa.large_binary(),
                    metadata={
                        "lance-schema:storage-class": "blob",
                    },
                ),
            ]
        ),
    )
    return table


tbl = make_table(0, 10, b"0" * 1024 * 1024)
uri = "test1"
shutil.rmtree(uri, ignore_errors=True)

default_frags, blob_frags = lance.fragment.write_fragments(
    pa.Table.from_batches([tbl]),
    uri,
    with_blobs=True,
    enable_move_stable_row_ids=True,
)

blob_operation = lance.LanceOperation.Overwrite(tbl.schema, blob_frags)
operation = lance.LanceOperation.Overwrite(tbl.schema, default_frags)
transaction = lance.Transaction(
    operation=operation,
    blobs_op=blob_operation,
    read_version=0,
)

ds = lance.LanceDataset.commit(
    uri,
    transaction,
    enable_v2_manifest_paths=True,
)
ds._take_rows(range(10))

The problem lies in the Rust implementation where the operation._to_inner method is called. Additionally, the two fragments returned by write_fragments retain the column index information. Therefore, the Overwrite operation expects to correctly preserve the index in the schema.

blob_operation = lance.LanceOperation.Overwrite(tbl.schema, blob_frags)
blob_operation._to_inner()

this will return correct schema

Overwrite { fragments: [Fragment { id: 0, files: [DataFile { path: "823dcbd1-175e-4307-b726-d9fbc775e887.lance", fields: [1], column_indices: [0], file_major_version: 2, file_minor_version: 0 }], deletion_file: None, row_id_meta: None, physical_rows: Some(10) }], schema: Schema { fields: [Field { name: "blobs", id: 1, parent_id: -1, logical_type: LogicalType("large_binary"), metadata: {"lance-schema:storage-class": "blob"}, encoding: Some(VarBinary), nullable: true, children: [], dictionary: None, storage_class: Blob }], metadata: {} }, config_upsert_values: None }

I confirm it's correct by

lance.write_dataset(tbl, "test2", tbl.schema)

lance.LanceDataset("test2/_blobs").lance_schema

The key of problem is the id in schema: Schema { fields: [Field { name: "blobs", id: 1, this cannot be done on the python side, as the id will be relabeled from zero to the length of schema,

pub fn set_field_id(&mut self, max_existing_id: Option<i32>) {
        let schema_max_id = self.max_field_id().unwrap_or(-1);
        let max_existing_id = max_existing_id.unwrap_or(-1);
        let mut current_id = schema_max_id.max(max_existing_id) + 1;
        self.fields
            .iter_mut()
            .for_each(|f| f.set_id(-1, &mut current_id));
    }
blob_operation = lance.LanceOperation.Overwrite(pa.schema(
            [
                pa.field(
                    "blobs",
                    pa.large_binary(),
                    metadata={
                        "lance-schema:storage-class": "blob",
                    },
                ),
            ]
        ), blob_frags)
blob_operation._to_inner()

will return schema: Schema { fields: [Field { name: "blobs", id: 0, that's why I do changes in https://github.com/lancedb/lance/pull/3235/files/56cda28424b3934f9ca32b1060e8f47d340639d2#diff-31492cb9a22ac40a820049c18a44ac88f651a5d0d7115fc09386899ab95f0c10. But I'm agreed it's not that correct, that could be easier if we can provide a way to directly return the transaction and we can extract inner schema from there.

@westonpace
Copy link
Contributor

So you are using Overwrite to create a new dataset? I agree that should work. Would it help if write_fragments also returned the schema? I think this is how we work around this problem in Fragment.merge_columns.

@fecet
Copy link
Author

fecet commented Dec 19, 2024

That should be a better way. So do you think we should change the definition in LanceOperation so that they can accept LanceSchame like merge?

schema: LanceSchema | pa.Schema

Append operation should be just ok but I haven't do any test

@wjones127
Copy link
Contributor

That should be a better way. So do you think we should change the definition in LanceOperation so that they can accept LanceSchame like merge?

That's actually being done right now in #3240

@fecet
Copy link
Author

fecet commented Dec 19, 2024

That's actually being done right now in #3240

Yes the translating between rust and python for transaction often trips me, will this be merged soon?

@fecet fecet closed this Dec 27, 2024
@fecet fecet reopened this Dec 27, 2024
@fecet fecet requested a review from westonpace December 27, 2024 14:21
@fecet
Copy link
Author

fecet commented Dec 27, 2024

@westonpace I rewrite the implement and add test for it, I think just return a transaction would be cleaner for end user as it can be convert to python easily.

@fecet fecet force-pushed the feat/frag-blob branch 2 times, most recently from e4a3bde to 66bb8d2 Compare December 28, 2024 09:00
@fecet
Copy link
Author

fecet commented Jan 5, 2025

Now pytest looks good on my end
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request python
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants