Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Is there already) support for async stream #586

Open
georgikoyrushki95 opened this issue Dec 5, 2023 · 7 comments
Open

(Is there already) support for async stream #586

georgikoyrushki95 opened this issue Dec 5, 2023 · 7 comments

Comments

@georgikoyrushki95
Copy link
Contributor

georgikoyrushki95 commented Dec 5, 2023

Hi, I also commented on #507 as it seems related (at least to someone not too familiar with S/R yet).

A little bit of context of what I want: I am looking for something that represents an async stream, to sketch out an example:

unifex::task<size_t> getFooSum(/* some params here */)
{
    auto request = /* build request from params */

    // This range of responses is filled from the underlying RPC framework, responses are arriving on
    // a separate thread (what we've termed "the callback thread" internally.
    auto rangeOfMultipartResponses = someRpcClient->sendRequest(request);
    auto fooValueSum = rangeOfMultipartResponses
        unifex::reduce_stream(0, [](auto resp){ resp.fooValue(); });
    co_return co_await fooValueSum;
}

Is this something that the generic range_stream would support OR we need to build our own stream adapter? And isn't this something generic enough to be present in libunifex? Basically something like cppcoro::async_generator that speaks the stream protocol? I have seen cppcoro::async_generator being mentioned in the docs, but wasn't able to find anything that can support that functionality here.

@georgikoyrushki95
Copy link
Contributor Author

Also if not, should it be built in a way that it interoperates with cppcoro::async_generator. Basically being able to go both ways cppcoro::async_generator <-> libunifex::async_stream?

@ispeters
Copy link
Contributor

ispeters commented Dec 6, 2023

Hi @georgikoyrushki95, I was able to wrap a cppcoro::async_generator<int> in a Stream here: https://godbolt.org/z/Kh3rWTMao.

I don't think it makes sense to add that particular Stream type to Unifex because we intend to be free of dependencies, but a type that's analogous to cppcoro::async_generator<T> would be a welcome addition.

@georgikoyrushki95
Copy link
Contributor Author

@ispeters thanks so much! This is exactly what I'm looking for!

I don't think it makes sense to add that particular Stream type to Unifex because we intend to be free of dependencies, but a type that's analogous to cppcoro::async_generator would be a welcome addition.

For this, do you mean a libunifex-specific async generator and adapting that to a stream? Basically identical to what you have done in your godbolt, the only difference being we add the equivalent of cppcoro::async_generator<T> in libunifex as a prerequisite?

Or do you want the stream to be backed by "pure" senders?

Both would work for us & it appears it'd be equivalent for the consumers of the stream values. The difference would be in how the values are produced, I guess.

Let me know your thoughts, I think this is something I'm interested to contribute :)

@ispeters
Copy link
Contributor

ispeters commented Dec 7, 2023

@georgikoyrushki95, I think a coroutine-based async sequence and a "raw sender"-based one would serve different needs and offer different trade-offs so, if I get to be greedy, I want both.

Our experience at Meta has been that coroutines are easier to read, write, debug, and just generally maintain than composition-of-sender algorithms-style code. The cost of that ease is basically overhead; coroutines don't optimize as well as raw senders (either for size or speed). The advice we give to internal teams adopting Unifex is that they should prefer coroutines until they know that the overheads are unacceptable, at which point they can refactor to the lower-level abstraction of raw senders.

My ideal coroutine-based async sequence that's analogous to cppcoro::async_generator<T> would:

  • support co_await and co_yield inside the generating coroutine
  • feel like a unifex::task<> (have scheduler affinity, support awaiting Senders, support similar unwind-on-cancellation semantics, etc.)
    • note: this would also mean parameterizing the type to make it possible to opt in to a noexcept version like unifex::nothrow_task<>
  • map naturally to a Stream (possibly by being a Stream—I'm not sure what interfaces make sense)
    • note: this implies that an async generator coroutine type probably ought to support async clean-up, and it's not obvious to me how you'd express the clean-up work from within the coroutine; maybe unifex::at_coroutine_exit is the way?

I'm not sure what an ideal sender-based Stream would look like; I kind of suspect there's room for several different implementations. @janondrusek plans to upstream an internal type that's currently named unicast_flow; it's essentially a one-slot async queue with an interface like this:

template <typename T>
struct unicast_flow {
  /**
   * Sends t through the flow.
   * 
   * If all previous values of t have been consumed then this saves t
   * as a value within the flow and then completes immediately, waking
   * a pending consumer if there is one. If there's a previously saved t
   * waiting to be consumed, the sender will suspend until a consumer
   * notifies that the previously saved value has been consumed.
   * 
   * The returned sender completes with set_done if the stream consumer
   * has requested stop on a next-sender or awaited the cleanup-sender.
   */
  sender auto notify(const T& t);
  sender auto notify(T&& t);

  /**
   * Puts the flow in a "done" state.
   *
   * This works like "notifying an end-of-flow sentinel"; if there's a
   * consumer waiting for the next value, that consumer will be resumed
   * with set_done. If there's no waiting consumer, the next consumer to
   * arrive will complete with done. If there's a saved t value waiting to be
   * consumed, the sender returned from stop() will suspend until that t
   * has been consumed.
   */
  sender auto stop();

  /**
   * Returns a stream whose next-senders produce the values sent
   * through the flow by calls to notify(). The returned stream completes
   * when a next-sender observes the done signal sent when the producer
   * awaits the sender returned from stop().
   */
  stream auto start();
};

unicast_flow works pretty well for us but I think it leaves plenty of holes in the design space that could be explored with other types. One such hole is a Stream-shaped view of an unbounded queue, which would allow you to provide a synchronous, non-blocking producer interface. Another hole is a Stream that works like the coroutine-based design I described above, but implemented in terms of Senders; I'm not really sure what this would look like.

@georgikoyrushki95
Copy link
Contributor Author

Hey, so I have been looking into this over the past couple of days. Wanted to double-check something with you on the expected scheduler affinity of the async_generator. Basically, do we want to have it span suspensions of the generator when a co_yield is happening: this sounds like the most intuitive way, but does potentially involve some overhead (could end up in always rescheduling when we resume the generator). The below, in other words:

void foo()
{
    // assume thread where foo() executes is thread id == 0
    static unifex::single_thread_context genCtx; // assume thread id == 1

    auto generator = []() -> async_generator<int> {
        std::cout << "thread id before scheduling =" << std::this_thread::get_id() << std::endl; // prints 0
        co_await unifex::schedule(genCtx.get_scheduler());
        std::cout << "thread id after scheduling=" << std::this_thread::get_id() << std::endl; // prints 1
        for(int i = 0; i < 5; ++i) {         
               co_yield i;
               std::cout << "thread id after resuming gen =" << std::this_thread::get_id() << std::endl; // always prints 1
        }     
    }();

    auto result = bexsr::sync_wait([&generator]() -> task<int> {
        int sum = 0;
        for (auto itr = co_await generator.begin(); itr != generator.end(); co_await ++itr) {
            std::cout << "thread id in the outer task =" << std::this_thread::get_id() << std::endl; // always prints 0
            sum += *itr;
        }
        co_return sum;
    }());

    // do something with result
}

In essence, when the generator resumes, we always switch onto the latest thread it was scheduled on? And if that's the case, where would it be a good place to do the switch? My thinking is within the generator's awaiter::await_suspend, but not 100% sure how that can be expressed: basically I need to resume the schedule coroutine, which in itself needs to resume the generator. Is something like this possible?

@ccotter
Copy link
Contributor

ccotter commented Dec 19, 2023

Hi @georgikoyrushki95, I was able to wrap a cppcoro::async_generator<int> in a Stream here: https://godbolt.org/z/Kh3rWTMao.

Really nice! Thanks for sharing this example.

@ispeters
Copy link
Contributor

ispeters commented Jan 3, 2024

Sorry for the delayed response; holiday PTO and performance review season have me inundated.

The short reply is that I think we need to provide the semantics @georgikoyrushki95 described in his example code—I think anything else would violate the principle of least surprise.

One thing: I think it was a mistake to make co_await unifex::schedule(s) as magical as we did. Coroutines probably should support a way to explicitly switch schedulers, but we should do it with something that looks more like co_await reschedule_current_coroutine(s), or something, so that it's distinctly different from schedule(s).

And if that's the case, where would it be a good place to do the switch?

If I've understood the coroutine spec correctly, co_yield <expr> lowers to co_await promise.yield_value(<expr>), which means you have an opportunity to inject a custom Awaitable: the result of yield_value can be of a type that does what you want.

Without having tried to implement a generator like this before, I suspect you'll need to enforce the scheduler affinity invariant in the await_resume method of the awaitable; it'd be nice if you could somehow figure out when a reschedule is unnecessary (because you're already on the right context), but a brute-force solution is to reschedule on every resumption.

I think the way to build "reschedule on each resumption" is to write a little custom receiver that resumes the generator in set_value() and then connect an instance of that receiver to the result of schedule(gen.currentScheduler_). You can put the resulting operation state somewhere in the awaitable and start() it when you want the reschedule to happen.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants