Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Task registration fails if node_dependency_hints includes a workflow #5989

Open
2 tasks done
Tom-Newton opened this issue Nov 11, 2024 · 3 comments
Open
2 tasks done
Assignees
Labels
bug Something isn't working flytekit FlyteKit Python related issue help wanted Extra attention is needed

Comments

@Tom-Newton
Copy link
Contributor

Describe the bug

With the following workflow

@task
def task0():
    return None


@workflow
def workflow0():
    return task0()


@dynamic(node_dependency_hints=[workflow0])
def dynamic0():
    return workflow0()


@workflow
def workflow1():
    return dynamic0()
bazel run example -- remote register dynamic0

fails with

RPC Failed, with Status: StatusCode.NOT_FOUND
        Details: missing entity of type TASK with identifier project:"flyteexamples"  domain:"development"  name:"<redacted>.workflow.workflow0"  version:"L6mEsnIJmUVT84b059kVJA"

However

bazel run example -- remote register workflow1

works as expected

Expected behavior

bazel run example -- remote register dynamic0

Should register the workflow successfully

Additional context to reproduce

The cause

The registration code assumes that the last entity registered is the top level one

        # concurrent register
        cp_task_entity_map = OrderedDict(filter(lambda x: isinstance(x[1], task_models.TaskSpec), m.items()))
        tasks = []
        loop = asyncio.get_event_loop()
        for entity, cp_entity in cp_task_entity_map.items():
            tasks.append(
                loop.run_in_executor(
                    None,
                    functools.partial(self.raw_register, cp_entity, serialization_settings, version, og_entity=entity),
                )
            )

        identifiers_or_exceptions = []
        identifiers_or_exceptions.extend(await asyncio.gather(*tasks, return_exceptions=True))
        # Check to make sure any exceptions are just registration skipped exceptions
        for ie in identifiers_or_exceptions:
            if isinstance(ie, RegistrationSkipped):
                logger.info(f"Skipping registration... {ie}")
                continue
            if isinstance(ie, Exception):
                raise ie
        # serial register
        cp_other_entities = OrderedDict(filter(lambda x: not isinstance(x[1], task_models.TaskSpec), m.items()))
        for entity, cp_entity in cp_other_entities.items():
            identifiers_or_exceptions.append(
                self.raw_register(cp_entity, serialization_settings, version, og_entity=entity)
            )
        return identifiers_or_exceptions[-1]

https://github.com/flyteorg/flytekit/blob/8fdd0c68f10b9f08cd65aa74ef90f0d9af564dd2/flytekit/remote/remote.py#L890
However this assumption no longer holds when registering a task with a node_dependency_hint on a workflow. I guess we need to switch to retrieving the top level registered entity by ID instead of assuming.

Screenshots

No response

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@Tom-Newton Tom-Newton added bug Something isn't working untriaged This issues has not yet been looked at by the Maintainers labels Nov 11, 2024
@Tom-Newton Tom-Newton added the flytekit FlyteKit Python related issue label Nov 11, 2024
@eapolinario eapolinario added help wanted Extra attention is needed and removed untriaged This issues has not yet been looked at by the Maintainers labels Nov 14, 2024
@eapolinario
Copy link
Contributor

@Tom-Newton , is this something that you could contribute?

@Tom-Newton
Copy link
Contributor Author

@Tom-Newton , is this something that you could contribute?

I don't know when but I probably can

@eapolinario
Copy link
Contributor

Thank you. I'm going to assign this to you, but in case you notice that you won't have time, please comment again and we can try to find someone to take a look, ok?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working flytekit FlyteKit Python related issue help wanted Extra attention is needed
Projects
Status: Assigned
Development

No branches or pull requests

2 participants