-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Are distinct connection objects thread safe ? #178
Comments
However, distinct redis connection objects should certainly be thread safe. You still have to follow the rules for shared objects. |
@vinniefalco , yes, I wrote another example code that is using
Each TaskId has one
I expect the same level thread safety to Code#include <iostream>
#include <syncstream>
#include <atomic>
#include <thread>
#include <vector>
#include <boost/asio.hpp>
namespace asio = boost::asio;
constexpr std::size_t num_of_threads = 8;
constexpr std::size_t num_of_tasks = 5;
constexpr std::size_t num_of_steps = 10;
std::atomic<int> tid_gen;
thread_local int tid = tid_gen++;
asio::io_context ioc;
asio::awaitable<void> proc(int id) {
// create
asio::steady_timer tim{co_await asio::this_coro::executor};
// request (SET)
for (std::size_t i = 0; i != num_of_steps; ++i) {
tim.expires_after(std::chrono::milliseconds(100));
co_await tim.async_wait(asio::deferred);
std::osyncstream(std::cout)
<< " TaskId:" << id
<< " Step:" << i
<< " ThreadId:" << tid
<< " Fired" << std::endl;
}
// finish
std::osyncstream(std::cout)
<< " TaskId:" << id
<< " ThreadId:" << tid
<< " Finish" << std::endl;
co_return;
}
int main() {
// spawn tasks
for (std::size_t i = 0; i != num_of_tasks; ++i) {
asio::co_spawn(ioc.get_executor(), proc(i), asio::detached);
}
std::osyncstream(std::cout) << "spawned" << std::endl;
// run ioc by threads
std::vector<std::thread> ths;
ths.reserve(num_of_threads);
for (int i = 0; i != num_of_threads; ++i) {
ths.emplace_back(
[&, i] {
ioc.run();
std::osyncstream(std::cout) << "thread " << i << " finished" << std::endl;
}
);
}
for (auto& th : ths) th.join();
} Godbolt link: https://godbolt.org/z/MrqdE84cd Output
|
Hi @redboltz , sorry for the delay.
Correct.
As I said above, the connection object is not thread-safe, therefore, it is expected that your code snippet is not working.
IIRC a Spawning the coroutines on strands should fix your code. |
@mzimbres thank you for the comment.
I prepared strand for each prepare strand and spawn coroutine: // prepare strands
std::vector<strand_t> strands;
strands.reserve(num_of_tasks);
for (std::size_t i = 0; i != num_of_tasks; ++i) {
strands.emplace_back(asio::make_strand(ioc.get_executor()));
}
// spawn tasks on strand
int index = 0;
for (auto& strand : strands) {
asio::co_spawn(
strand, // strand as coroutine's executor
proc(index++, strand), // strand for checking (optional)
asio::detached
);
} check executor and strand: // connect
auto exe = co_await asio::this_coro::executor;
// strand check
BOOST_ASSERT(exe == strand);
BOOST_ASSERT(strand.running_in_this_thread());
auto conn = std::make_shared<redis::connection>(exe);
conn->async_run(cfg, {}, asio::consign(asio::detached, conn));
Executor and strand are valid. However, the unexpected behavior I reported still happens. All code:#include <iostream>
#include <syncstream>
#include <atomic>
#include <thread>
#include <vector>
#include <boost/asio.hpp>
#include <boost/redis/src.hpp>
#include <boost/assert.hpp>
namespace asio = boost::asio;
namespace redis = boost::redis;
constexpr std::size_t num_of_threads = 8;
constexpr std::size_t num_of_tasks = 5;
constexpr std::size_t num_of_steps = 10;
std::atomic<int> tid_gen;
thread_local int tid = tid_gen++;
asio::io_context ioc;
using strand_t = asio::strand<asio::any_io_executor>;
asio::awaitable<void> proc(
int id,
strand_t& strand // for checking
) {
// config
redis::config cfg;
cfg.health_check_interval = std::chrono::seconds(0);
// connect
auto exe = co_await asio::this_coro::executor;
// strand check
BOOST_ASSERT(exe == strand);
BOOST_ASSERT(strand.running_in_this_thread());
auto conn = std::make_shared<redis::connection>(exe);
conn->async_run(cfg, {}, asio::consign(asio::detached, conn));
// request (SET)
for (std::size_t i = 0; i != num_of_steps; ++i) {
// strand check
BOOST_ASSERT(exe == strand);
BOOST_ASSERT(strand.running_in_this_thread());
redis::request req;
req.push("SET", id + i, id + i);
redis::response<std::string> resp;
co_await conn->async_exec(req, resp, asio::deferred);
std::osyncstream(std::cout)
<< " TaskId:" << id
<< " Step:" << i
<< " ThreadId:" << tid
<< " Resp:" << std::get<0>(resp).value() << std::endl;
}
// strand check
BOOST_ASSERT(exe == strand);
BOOST_ASSERT(strand.running_in_this_thread());
// disconnect
conn->cancel();
std::osyncstream(std::cout)
<< " TaskId:" << id
<< " ThreadId:" << tid
<< " Finish" << std::endl;
co_return;
}
int main() {
// prepare strands
std::vector<strand_t> strands;
strands.reserve(num_of_tasks);
for (std::size_t i = 0; i != num_of_tasks; ++i) {
strands.emplace_back(asio::make_strand(ioc.get_executor()));
}
// spawn tasks on strand
int index = 0;
for (auto& strand : strands) {
asio::co_spawn(
strand, // strand as coroutine's executor
proc(index++, strand), // strand for checking (optional)
asio::detached
);
}
std::osyncstream(std::cout) << "spawned" << std::endl;
// run ioc by threads
std::vector<std::thread> ths;
ths.reserve(num_of_threads);
for (int i = 0; i != num_of_threads; ++i) {
ths.emplace_back(
[&, i] {
ioc.run();
std::osyncstream(std::cout) << "thread " << i << " finished" << std::endl;
}
);
}
for (auto& th : ths) th.join();
} |
Ok, thanks. I will have a look at your example. |
@redboltz, constexpr std::size_t num_of_threads = 80;
constexpr std::size_t num_of_tasks = 5000;
constexpr std::size_t num_of_steps = 1000; What is your platform and compiler? Could you invoke conn->async_run(cfg, {}, asio::bind_executor(exe, asio::consign(asio::detached, conn))); |
CPU: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz redis-7.2.4-1
compile command
Also I tried g++ and got the same result.
I appried it but got the similar result.
Parameter settings are: constexpr std::size_t num_of_threads = 8;
constexpr std::size_t num_of_tasks = 5;
constexpr std::size_t num_of_steps = 10; |
@redboltz, I managed to reproduce the issue. I was using the latest version of the code in the develop branch which seems doesn't have this issue. |
Could you tell me the commit hash of the branch ? I will check it too. |
@ashtum try git-bisect to locate the trouble commit. Lets hope Marcelo uses a linear history. |
I'm using the latest code in the develop branch: https://github.com/boostorg/redis/tree/112bba722211b16ca4fbda16291329368809fb7e |
This is the merge that I think fixed the problem @redboltz is facing (as he himself pointed out). @vinniefalco What do you mean by liner history, rebase on develop and fast-forward the commits? What about a merge commit? |
I double checked. The problem is no longer reproduced. I also checked the combination of co_spawn with/without strand, and bind_executor.
As you mentioned, bind_executor is not related and passing strand to co_await is required. I don't know much about patch release policy but if this fix is release as https://www.boost.org/patches/ , I would appreciate. |
Due to lack of time I can't promise for 1.85 but I will pay attention to this in future releases. Sidenote: In your examples you are using one connection per coroutine, this is not efficient since it prevents automatic pipelining. Using a single connection does not only results in better performance but it also helps preventing exhausting file descriptors on the Redis server. |
THanks, I understand.
Consider the following scenario: MyServer using Boost.Redis. Senders are MyServer's clients. MyServer knows data size tendency per client. Two redis connections:sequenceDiagram
participant SmallSizeSender
participant LargeSizeSender
participant MyServer
participant RedisServer
LargeSizeSender->>MyServer: LargeData
MyServer->>RedisServer: SET LargeData
SmallSizeSender->>MyServer: SmallData
MyServer->>RedisServer: SET SmallData
RedisServer->>MyServer: Response for SmallData
MyServer->>SmallSizeSender: notify finished
RedisServer->>MyServer: Response for LargeData
MyServer->>LargeSizeSender: notify finished
However, if MyServer has one redis connection, MyServer gets the SmallData response after the LargeData response got. One redis connections:sequenceDiagram
participant SmallSizeSender
participant LargeSizeSender
participant MyServer
participant RedisServer
LargeSizeSender->>MyServer: LargeData
MyServer->>RedisServer: SET LargeData
SmallSizeSender->>MyServer: SmallData
MyServer->>RedisServer: SET SmallData
RedisServer->>MyServer: Response for LargeData
MyServer->>LargeSizeSender: notify finished
RedisServer->>MyServer: Response for SmallData (Need to wait LargeData process)
MyServer->>SmallSizeSender: notify finished
That is my motivation for using multiple redis connections. NOTE: I don't mean each client has one redis connection. The clients are categorized and share redis connection by category. |
I will design my server as follows:
I think it should work on 1.84.0 because the number of threads is one (*A). And most of redis thread works are send/recv to redis asynchnorously, so one thread is good enough, I guess. |
BTW, I found the following document: https://www.boost.org/doc/libs/1_84_0/libs/redis/doc/html/index.html#autotoc_md1
I think that it is wrong. It implies the shared connection object is thread safe at least calling async_exec member function. What do you think? |
I do rebase but I rarely have to as I don't work in more than one case at the same time. |
Concurrently assumes the same thread accesses and has nothing to do with thread-safety. If you are familiar with Boost.Beast for example, you will know that you can't have concurrent writes, namely more than one ongoing call to |
@redboltz Regarding your diagram. AFAIK, Redis can't process requests in parallel as you show in your first diagram. This is also why multiple connections are not likely to speed up you app performance, it can actually slow it down. |
In order to test redis behavior, I worte the following code: #include <string>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/redis/src.hpp>
namespace asio = boost::asio;
namespace redis = boost::redis;
int main() {
asio::io_context ioc;
redis::config cfg;
std::string val1(5000000LL, '1'); // big
std::string val2 = "VAL2"; // small
redis::request req1;
redis::request req2;
redis::response<std::string> resp1;
redis::response<std::string> resp2;
auto conn1 = std::make_shared<redis::connection>(ioc.get_executor());
auto conn2 = std::make_shared<redis::connection>(ioc.get_executor());
conn1->async_run(cfg, {}, asio::consign(asio::detached, conn1));
conn2->async_run(cfg, {}, asio::consign(asio::detached, conn2));
req1.push("SET", "KEY1", val1);
std::cout << __LINE__ << ": do async command1" << std::endl;
conn1->async_exec(
req1,
resp1,
[&](boost::system::error_code const& ec, std::size_t size) {
std::cout << __LINE__ << ":"
<< " Resp1:" << std::get<0>(resp1).value()
<< " ec:" << ec.message()
<< " size:" << size
<< std::endl;
conn1->cancel();
}
);
std::cout << __LINE__ << ": return async command1" << std::endl;
req2.push("SET", "KEY2", val2);
std::cout << __LINE__ << ": do async command2" << std::endl;
conn2->async_exec(
req2,
resp2,
[&](boost::system::error_code const& ec, std::size_t size) {
std::cout << __LINE__ << ":"
<< " Resp2:" << std::get<0>(resp2).value()
<< " ec:" << ec.message()
<< " size:" << size
<< std::endl;
conn2->cancel();
}
);
std::cout << __LINE__ << ": return async command2" << std::endl;
ioc.run();
} Then I got the following result:
The program has two connections. The connection related variables e.g.) conn, req, resp, ... has 1 or 2 suffix. However, after some sutdy, I noticed that the following
sequenceDiagram
participant conn1
participant conn2
participant network
participant redis
conn1->>network: Req1(Big)
conn2->> network: Req2(Small)
network->>redis: Req2(Small)
network->>redis: Req1(Big)
redis->>network: Resp2(Small)
network->>conn2: Resp2(Small)
redis->>network: Resp1(Big)
network->>conn1: Resp1(Big)
The order of req1 and req2 are reverted but it is caused by TCP transfer cost between conn1 and redis. Finally, I understand the multiple redis connection can not improve the performance, so I will use single connection. |
Thank you for clarification. I understand. The sentence could misleading, at least for me. Thread Safety Shared objects: Unsafe. |
Let me clarify the current status. Now, the original issue I pointed out is solved in the develop branch, but not yet in the release. So I use Boost 1.84.0. I'm not sure the issue closing policy in this project. Thank you very much for your help !! |
Overview
Boost version is 1.84.0.
It seems that the same object of
connection
from multiple threads is not safe. It is OK.I create multiple
connection
object per co_spawned function (proc()). And io_context is running on multiple threads.NOTE: io_context is thread safe. See:
Thread Safety
I wrote the following code. When I set
num_of_threads = 1
. All tasks and steps are finished expectedly. However, when I increasenum_of_threads
, then weired behavior happened. See Output.I think that even if
num_of_threads
is increased, the program should finish correctly (SET all key-value and finish).Minimal reproduce code
Output
Suucess case
Failure case
Unfinish
Assertion Failed
Observation
co_await conn->async_exec(req, resp, asio::deferred);
. I think that even if the ThreadId is different, no race condition is happend because the order of the code ofproc()
is serialized by coroutine mechanism.The text was updated successfully, but these errors were encountered: