Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: default implementations of RESTStream (and others) do not adhere to _MAX_RECORDS_LIMIT #1349

Closed
kgpayne opened this issue Jan 25, 2023 · 8 comments
Labels

Comments

@kgpayne
Copy link
Contributor

kgpayne commented Jan 25, 2023

Singer SDK Version

0.18.0

Python Version

3.10

Bug scope

Taps (catalog, state, stream maps, etc.)

Operating System

MacOS

Description

In testing MeltanoLabs/tap-stackexchange (PR here) with the upcoming test improvements, we noticed that the RESTStream implementation included in the SDK does not adhere to the _MAX_RECORDS_LIMIT internal attribute when returning records from get_records(). Specifically, get_records() returns more than the limit, causing the _check_max_record_limit() to fail and throw a MaxRecordsLimitException.

For connection testing (currently the only use of _MAX_RECORDS_LIMIT) this error is caught and passed. However, in the new testing framework, we need the stream to complete gracefully (emitting state etc.) so that we can test the records returned. Catching the raised error, whilst possible, is still unhelpful when records/messages are required for testing.

    # Connection test:
    @final
    def run_connection_test(self) -> bool:
        """Run connection test.

        Returns:
            True if the test succeeded.
        """
        for stream in self.streams.values():
            # Initialize streams' record limits before beginning the sync test.
            stream._MAX_RECORDS_LIMIT = 1

        for stream in self.streams.values():
            if stream.parent_stream_type:
                self.logger.debug(
                    f"Child stream '{type(stream).__name__}' should be called by "
                    f"parent stream '{stream.parent_stream_type.__name__}'. "
                    "Skipping direct invocation."
                )
                continue
            try:
                stream.sync()
            except MaxRecordsLimitException:
                pass
        return True

In the recent SQLStream implementation we added a snippet to "push down" the limit to the remote sql engine, to avoid materialising a complete dataset on the server but exiting early after a small number of records:

    # Get records from stream
    def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
        """Return a generator of record-type dictionary objects.

        If the stream has a replication_key value defined, records will be sorted by the
        incremental key. If the stream also has an available starting bookmark, the
        records will be filtered for values greater than or equal to the bookmark value.

        Args:
            context: If partition context is provided, will read specifically from this
                data slice.

        Yields:
            One dict per record.

        Raises:
            NotImplementedError: If partition is passed in context and the stream does
                not support partitioning.
        """
        if context:
            raise NotImplementedError(
                f"Stream '{self.name}' does not support partitioning."
            )

        selected_column_names = self.get_selected_schema()["properties"].keys()
        table = self.connector.get_table(
            full_table_name=self.fully_qualified_name,
            column_names=selected_column_names,
        )
        query = table.select()

        if self.replication_key:
            replication_key_col = table.columns[self.replication_key]
            query = query.order_by(replication_key_col)

            start_val = self.get_starting_replication_key_value(context)
            if start_val:
                query = query.where(
                    sqlalchemy.text(":replication_key >= :start_val").bindparams(
                        replication_key=replication_key_col, start_val=start_val
                    )
                )

        # limit records if _MAX_RECORDS_LIMIT is specified
        if self._MAX_RECORDS_LIMIT is not None:
            query = query.limit(self._MAX_RECORDS_LIMIT)

        for record in self.connector.connection.execute(query):
            yield dict(record)

By limiting the number of records fetched in get_records() we both limit load on the upstream system and avoid triggering the MaxRecordsLimitException thrown by _check_max_record_limit() if too many records are returned. We could apply the same approach to the other stream classes base implementation. We should then also document this approach for the cases where users override the get_records() implementations.

Code

No response

@aaronsteers
Copy link
Contributor

@kgpayne - understood. One of the reasons this was created as an internal constant was due to the fact that it could not be relied on everywhere.

There are also questions about how this should behave - since arbitrary integers here could break the tap's ability to resume meaningfully. Also this could effect parent child streams, preventing could streams from being reached.

Perhaps a path forward would be to define a "dry run" mode formally, so that certain assumptions can be baked in.

Then, this might end up with a name like Stream.DRY_RUN_RECORD_LIMIT.

🤔

@aaronsteers
Copy link
Contributor

aaronsteers commented Feb 2, 2023

@kgpayne - I opened this issue to specifically focus on dry run use cases:

For test-specific implementations, why not create a custom loop in the same pattern as run_connection_test() - which can specifically handle the loop exceptions within the safety of a harness that is specifically focused on the use case of just getting n records without being accountable to "real" workload requirements?

Specifically, get_records() returns more than the limit, causing the _check_max_record_limit() to fail and throw a MaxRecordsLimitException.

I think this is actually functioning as designed. For the reasons described in #1366, this was not intended to be a "normal" feature - exactly because it breaks stability expectations if attempted to use in a production capacity. The raising of the exception is how the counter is designed to operate, within the context of the connection test.

@aaronsteers
Copy link
Contributor

aaronsteers commented Feb 2, 2023

On second pass through the code above, what about amending the connection test method to take any n which the developer could pass into the Test class constructor?

Another thing to tweak here if we want tests to emit at least one STATE messages would be to modify the run_connection_test() or a clone of it to initialize stream._MAX_RECORDS_LIMIT equal to self.STATE_MSG_FREQUENCY, or vice versa: overwrite self.STATE_MSG_FREQUENCY to match stream._MAX_RECORDS_LIMIT. In either case, this would ensure each stream sends at least one STATE message - either because it reached end of stream, or because it reached the point to send a first STATE message.

-    def run_connection_test(self) -> bool:
+    def run_connection_test(self, max_records_per_stream: int=1) -> bool:
        """Run connection test.

        Returns:
            True if the test succeeded.
        """
        for stream in self.streams.values():
            # Initialize streams' record limits before beginning the sync test.
-            stream._MAX_RECORDS_LIMIT = 1
+            stream._MAX_RECORDS_LIMIT = max_records_per_stream
+            stream.STATE_MSG_FREQUENCY = max_records_per_stream

Important to note that many many streams are not sorted and is_sorted = False is the default in our cookiecutter. This means that when streams are unsorted, any early-submitted STATE message are likely to not be finalized. Those cannot and should not be finalized, since the results are likely not inclusive of all records up to the bookmark point. At the very end of the stream, the STATE message would naturally be finalized as a resumable bookmark, but if we abort mid-stream using the limit, that stream would not get finalized. Further, rerunning the sync with a record limit even an infinite number of times would never result in a finalized state, which means subsequent executions would likely keep emitting the same n records indefinitely. This is why reaching the internal limit really does need be handled as an exception IMHO - and again, is more context why record limit can only be used for dry run scenarios and isn't to be used for non-test use cases.

@kgpayne
Copy link
Contributor Author

kgpayne commented Feb 6, 2023

@aaronsteers that makes sense for the connection test, but doesn't help for the rest of the standardised test suite. The root issue is that tests require a finalised sync to perform stream-level tests. Ultimately we want to test that the Tap runs to completion, and (among other things) correctly finalises accumulated STATE into a bookmark. However we also want to reduce the cost of testing overall, by limiting the number of records fetched as part of that completed sync. Hence the desire to have a mechanism for limiting the number of records fetched and returned by get_records() that doesn't raise an error (which currently terminates the sync before completion).

Whilst I understand the sorting case, and also the parent-child limitations called out elsewhere, having a means to specify "run to completion but fetch no more than n records" for testing purposes will reduce the cost of CI/CD and improve the overall developer experience for Tap maintainers.

Looking further down the line, I still cannot see a reason why this feature couldn't be made available with limitations/caveats; i.e. setting record limits for the purposes of syncing is only available to sorted parent streams, for all the reasons you mentioned. There are plenty of sources that are sorted and not nested that would work as expected and benefit hugely (including many if not most SQL use-cases) 🙂

@aaronsteers
Copy link
Contributor

aaronsteers commented Feb 8, 2023

@aaronsteers that makes sense for the connection test, but doesn't help for the rest of the standardised test suite. The root issue is that tests require a finalised sync to perform stream-level tests.

My point above is that because it's impossible to do so, and because that is understandably part of the spec, we should not add the feature - because we can't deliver generically and still meet the minimal set of Spec-adherence expectations.

Ultimately we want to test that the Tap runs to completion, and (among other things) correctly finalises accumulated STATE into a bookmark.

Understood. Again my point is just that this is impossible, not that it is not desireable. We cannot finalize the STATE because the bookmark is invalid until all records are synced to that point.

https://sdk.meltano.com/en/latest/implementation/state.html#the-impact-of-sorting-on-incremental-sync

However we also want to reduce the cost of testing overall, by limiting the number of records fetched as part of that completed sync. Hence the desire to have a mechanism for limiting the number of records fetched and returned by get_records() that doesn't raise an error (which currently terminates the sync before completion).

If we're talking about testing, then the raised exception can be handled by the test harness. If we're talking about production syncs, the exception is needed to make sure we're alerting on the failure to complete the sync.

Whilst I understand the sorting case, and also the parent-child limitations called out elsewhere, having a means to specify "run to completion but fetch no more than n records" for testing purposes will reduce the cost of CI/CD and improve the overall developer experience for Tap maintainers.

I don't know how "run to completion" could be compatible with "no more than n records". Either we ran to completion or we aborted abnormally - but we can't have done both. There is a way to do this as a feature - but the number of errors we'd have to throw would make it unusable in the majority of tap implementations - especially if delivered before #1350 which would allow individual stream config. That said, if we keep scope focused on dry-run and test use cases, then we end up with something like #1366 that is specifically named and defined as a non-production config option.

Looking further down the line, I still cannot see a reason why this feature couldn't be made available with limitations/caveats; i.e. setting record limits for the purposes of syncing is only available to sorted parent streams, for all the reasons you mentioned.

It's not clear how those exceptions/limitations/caveats would be delivered. Would we fail the entire sync operation if users tries to apply a record limit on an unsorted stream or on a stream which cannot support the record limit? How would users know in advance which streams are sorted or not, since that's internal to the tap implementation? What if a stream switches from INCREMENTAL to FULL_TABLE? Would all FULL_TABLE syncs fail every time a record limit applied or would they - and would they fail before starting or fail after n+1 records are reached? Any FULL_TABLE sync that has a record limit applied is never going to finish syncing its records - and therefor fails to meet even basic foundational Singer Spec requirements. (Not an issue if we are explicitly "dry run" mode or similar, but does preclude "real" use cases from being viable.)

There are plenty of sources that are sorted and not nested that would work as expected and benefit hugely (including many if not most SQL use-cases) 🙂

I don't really see this being generically valuable outside of dry_run and test scenarios.

Teasing apart test/dry-run use cases from "real world" production applications

For dry run scenarios, we can do:

For all other real world use cases, I think we'd need to agree on a pretty large set of failure scenarios - circumstances where we'd basically be forced to fail the whole sync operation, whenever we run into a stream that can't honor a resumable sync operation with the record limit applied. Those cases include but wouldn't be limited to: unsorted streams (the default for RESTStream), full table syncs, and child streams that have state_partitioning_keys overridden.

For all these reasons, I think this would be better to let tap developers implement themselves - and to not add it as a generic/global config across all taps. If we wanted to deliver genrically, I think it just needs to be positioned as a "dry-run" or "test" feature, and not a generic record limit.

@stale
Copy link

stale bot commented Jul 18, 2023

This has been marked as stale because it is unassigned, and has not had recent activity. It will be closed after 21 days if no further activity occurs. If this should never go stale, please add the evergreen label, or request that it be added.

@stale stale bot added the stale label Jul 18, 2023
@stale stale bot closed this as completed Aug 8, 2023
@edgarrmondragon
Copy link
Collaborator

Still relevant

@stale stale bot removed stale labels Aug 8, 2023
Copy link

stale bot commented Aug 7, 2024

This has been marked as stale because it is unassigned, and has not had recent activity. It will be closed after 21 days if no further activity occurs. If this should never go stale, please add the evergreen label, or request that it be added.

@stale stale bot added the stale label Aug 7, 2024
@stale stale bot closed this as completed Aug 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants