From 66655f140221d6789e5e5bf509438758c2372f38 Mon Sep 17 00:00:00 2001 From: takira Date: Thu, 31 Aug 2023 00:17:30 +0900 Subject: [PATCH] add(server, test): add select() for handle multiple I/O --- includes/Result.hpp | 1 + includes/webserv.hpp | 1 + srcs/Client/Client.cpp | 32 +++++-- srcs/Server/Server.cpp | 169 ++++++++++++++++++++++++++++------ srcs/Server/Server.hpp | 19 +++- test/unit_test/TestServer.cpp | 152 ++++++++++++++++++++++++++++-- 6 files changed, 331 insertions(+), 43 deletions(-) diff --git a/includes/Result.hpp b/includes/Result.hpp index 59eadd88..8b6caadc 100644 --- a/includes/Result.hpp +++ b/includes/Result.hpp @@ -20,6 +20,7 @@ class Result { } bool is_ok() const { return _is_ok; } + bool is_err() const { return !_is_ok; } OkType get_ok_value() const { if (_is_ok) { diff --git a/includes/webserv.hpp b/includes/webserv.hpp index f3f5fdd8..8e2bf468 100644 --- a/includes/webserv.hpp +++ b/includes/webserv.hpp @@ -14,6 +14,7 @@ # define STAT_ERROR (-1) # define ERROR (-1) # define OK 0 +# define BREAK 1 # define INVALID_ARGUMENT_ERROR_MSG "[Error] invalid argument" # define INVALID_PATH_ERROR_MSG "[Error] invalid file path" diff --git a/srcs/Client/Client.cpp b/srcs/Client/Client.cpp index 9e046d10..b136ecef 100644 --- a/srcs/Client/Client.cpp +++ b/srcs/Client/Client.cpp @@ -20,7 +20,7 @@ Client::~Client() { if (_connect_fd != ERROR) { errno = 0; if (close(_connect_fd) == ERROR) { - std::cerr << strerror(errno) << std::endl; + std::cerr << RED "[Client Error] close: " << strerror(errno) << RESET << std::endl; } _connect_fd = ERROR; } @@ -31,7 +31,8 @@ int Client::create_connect_socket() { errno = 0; connect_fd = socket(AF_INET, SOCK_STREAM, 0); if (connect_fd == ERROR) { - throw std::runtime_error(strerror(errno)); + std::string err_str = "[Client Error] socket: " + std::string(strerror(errno)); + throw std::runtime_error(RED + err_str + RESET); } return connect_fd; } @@ -48,9 +49,15 @@ struct sockaddr_in Client::create_connect_addr(const char *server_ip, } void Client::process_server_connect(const std::string &send_msg) { + // printf(MAGENTA "client(pid:%d) 1 start\n" RESET, getpid()); + // printf(MAGENTA "client(pid:%d) 2 connect\n" RESET, getpid()); connect_to_server(this->_connect_fd, this->_addr); + // printf(MAGENTA "client(pid:%d) 3 send\n" RESET, getpid()); send_to_server(this->_connect_fd, send_msg); + // printf(MAGENTA "client(pid:%d) 4 recv\n" RESET, getpid()); this->_recv_message = recv_message_from_server(this->_connect_fd); + // printf(MAGENTA " client(pid:%d) recv_msg:[%s]\n" RESET, getpid(), this->_recv_message.c_str()); + // printf(MAGENTA "client(pid:%d) 5 end\n" RESET, getpid()); } void Client::connect_to_server(int connect_fd, struct sockaddr_in addr) { @@ -58,19 +65,22 @@ void Client::connect_to_server(int connect_fd, struct sockaddr_in addr) { errno = 0; if (connect(connect_fd, (struct sockaddr *)&addr, len) == ERROR) { - throw std::runtime_error(strerror(errno)); + std::string err_str = "[Client Error] connect: " + std::string(strerror(errno)); + throw std::runtime_error(RED + err_str + RESET); } } void Client::send_to_server(int connect_fd, const std::string &send_msg) { ssize_t send_size; + // std::cout << "msg:" << send_msg << std::endl; size_t msg_len = send_msg.size() + 1; const char *msg = send_msg.c_str(); errno = 0; - send_size = send(connect_fd, msg, msg_len, 0); + send_size = send(connect_fd, msg, msg_len, MSG_DONTWAIT); if (send_size == ERROR) { - throw std::runtime_error(strerror(errno)); + std::string err_str = "[Client Error] send: " + std::string(strerror(errno)); + throw std::runtime_error(RED + err_str + RESET); } } @@ -79,18 +89,28 @@ std::string Client::recv_message_from_server(int connect_fd) { char buf[BUFSIZ + 1]; std::string recv_msg; + // printf(MAGENTA " client(pid:%d) 4-1 recv_msg start\n" RESET, getpid()); + while (true) { + // printf(MAGENTA " client(pid:%d) 4-2 recv_msg recv\n" RESET, getpid()); errno = 0; recv_size = recv(connect_fd, &buf, BUFSIZ, 0); if (recv_size == ERROR) { - throw std::runtime_error(strerror(errno)); + // printf(MAGENTA " client(pid:%d) recv_err errno:%d\n" RESET, getpid(), errno); + std::string err_str = "[Client Error] recv: " + std::string(strerror(errno)); + throw std::runtime_error(RED + err_str + RESET); } buf[recv_size] = '\0'; + // printf(MAGENTA " client(pid:%d) 4-3 buf:[%s]\n" RESET, getpid(), buf); recv_msg += buf; if (recv_size < BUFSIZ) { + // std::cout << MAGENTA " 3-6" RESET << std::endl; break; } + // printf(MAGENTA " client(pid:%d) 4-4 next\n" RESET, getpid()); } + // printf(MAGENTA " client(pid:%d) 4-5 recv_msg fin\n" RESET, getpid()); + return recv_msg; } diff --git a/srcs/Server/Server.cpp b/srcs/Server/Server.cpp index 3b6caede..791b891e 100644 --- a/srcs/Server/Server.cpp +++ b/srcs/Server/Server.cpp @@ -2,9 +2,12 @@ #include #include #include +#include #include #include +#include #include +#include #include "webserv.hpp" #include "Color.hpp" #include "Result.hpp" @@ -13,52 +16,160 @@ Server::Server(const char *server_ip, const char *server_port) : _socket(server_ip, server_port), - _connect_fd(ERROR), - _recv_message() { + _recv_message(), + _client_fds(std::vector(MAX_SESSION, INIT_FD)), + _fds() { if (this->_socket.get_status() == ERROR) { // todo: use result - throw std::runtime_error("[Error] server initialization error"); + throw std::runtime_error(RED "[Server Error] server initialization error" RESET); } } -Server::~Server() { - if (this->_connect_fd != ERROR) { - close_connection(this->_connect_fd); - this->_connect_fd = ERROR; +Server::~Server() {} + +void Server::process_client_connection() { + Result result; + + while (true) { + // select + result = wait_for_io_ready_fd(); + if (result.is_err()) { + throw std::runtime_error(RED + result.get_err_value() + RESET); + } + if (result.get_ok_value() == SELECT_TIMEOUT) { + std::cout << "[INFO] server timeout" << std::endl; + break; + } + + // accept + result = accept_and_store_connect_fd(); + if (result.is_err()) { + throw std::runtime_error(RED + result.get_err_value() + RESET); + } + + // connect (except newly accepted fd) + result = communicate_with_ready_client(); + if (result.is_err()) { + throw std::runtime_error(RED + result.get_err_value() + RESET); + } } } -void Server::process_client_connection() { - // select - // todo +void Server::init_fd_set() { + std::vector::iterator fd; - // accept - Result accept_result = accept_connection(this->_socket.get_socket_fd()); - if (!accept_result.is_ok()) { - throw std::runtime_error("[Error] accept:" + accept_result.get_err_value()); + FD_ZERO(&this->_fds); + FD_SET(this->_socket.get_socket_fd(), &this->_fds); + for (fd = this->_client_fds.begin(); fd != this->_client_fds.end(); ++fd) { + if (*fd == INIT_FD) { + continue; + } + FD_SET(*fd, &this->_fds); } - this->_connect_fd = accept_result.get_ok_value(); +} + +int Server::get_max_fd() { + int max_fd; + + max_fd = *std::max_element(this->_client_fds.begin(), this->_client_fds.end()); + return std::max(max_fd, this->_socket.get_socket_fd()); +} + +Result Server::wait_for_io_ready_fd() { + int max_fd; - // recv - Resultrecv_result = recv_request(this->_connect_fd); - if (!recv_result.is_ok()) { - throw std::runtime_error("[Error] recv:" + recv_result.get_err_value()); + init_fd_set(); + max_fd = get_max_fd(); + Result select_result = select_fds(max_fd, &this->_fds); + if (select_result.is_err()) { + return Result::err("[Server Error] select:" + select_result.get_err_value()); } - this->_recv_message = recv_result.get_err_value(); - // std::cout << YELLOW "recv_msg(string):[" << this->_recv_message << "]" RESET << std::endl; + return select_result; +} + +Result Server::accept_and_store_connect_fd() { + int connect_fd; + bool limit_over; - // request, response - HttpRequest request = HttpRequest(this->_recv_message); - HttpResponse response = HttpResponse(request); + if (!FD_ISSET(this->_socket.get_socket_fd(), &this->_fds)) { + return Result::ok(OK); + } - // send - Result send_result = send_response(this->_connect_fd, response); - if (!send_result.is_ok()) { - throw std::runtime_error("[Error] send:" + send_result.get_err_value()); + Result accept_result = accept_connection(this->_socket.get_socket_fd()); + if (accept_result.is_err()) { + return Result::err("[Server Error] accept:" + accept_result.get_err_value()); } + connect_fd = accept_result.get_ok_value(); + limit_over = true; + for (std::vector::iterator fd = this->_client_fds.begin(); fd != this->_client_fds.end(); ++fd) { + if (*fd == INIT_FD) { + *fd = connect_fd; + limit_over = false; + break; + } + } + if (limit_over) { + std::cerr << "[INFO] server over max connection"<< std::endl; + errno = 0; + if (close(connect_fd) == ERROR) { + std::cerr << "[Server Error] close :"<< strerror(errno) << std::endl; + } + } + return Result::ok(OK); +} + +Result Server::communicate_with_ready_client() { + for (std::vector::iterator fd = this->_client_fds.begin(); fd != this->_client_fds.end(); ++fd) { + if (*fd == INIT_FD) { + continue; + } + if (!FD_ISSET(*fd, &this->_fds)) { + continue; + } + + // recv + Resultrecv_result = recv_request(*fd); + if (recv_result.is_err()) { + return Result::err("[Server Error] recv:" + recv_result.get_err_value()); + } + this->_recv_message = recv_result.get_ok_value(); + // printf(BLUE " server recv_msg:[%s]\n" RESET, this->_recv_message.c_str()); + + // request, response + HttpRequest request = HttpRequest(this->_recv_message); + HttpResponse response = HttpResponse(request); + + // send + Result send_result = send_response(*fd, response); + if (send_result.is_err()) { + // printf(BLUE " server send error\n" RESET); + return Result::err("[Server Error] send:" + send_result.get_err_value()); + } + + FD_CLR(*fd, &this->_fds); + close_connection(*fd); + *fd = INIT_FD; + } + return Result::ok(OK); } std::string Server::get_recv_message() const { return this->_recv_message; } +Result Server::select_fds(int max_fd, fd_set *fds) { + struct timeval timeout = {}; + int select_ret; + + // timeout < 1.5sec, communicate error ?? + timeout.tv_sec = 1; + timeout.tv_usec = 500 * 1000; // 500ms + + errno = 0; + select_ret = select(max_fd + 1, fds, NULL, NULL, &timeout); + if (select_ret == ERROR) { + return Result::err(strerror(errno)); + } + return Result::ok(select_ret); +} + Result Server::accept_connection(int socket_fd) { int connect_fd; @@ -77,7 +188,7 @@ Result Server::recv_request(int connect_fd) { while (true) { errno = 0; - recv_size = recv(connect_fd, buf, BUFSIZ, MSG_DONTWAIT); + recv_size = recv(connect_fd, buf, BUFSIZ, FLAG_NONE); // todo: flg=MSG_DONTWAIT, errno=EAGAIN -> continue? if (recv_size == ERROR || recv_size > BUFSIZ) { return Result::err(strerror(errno)); } diff --git a/srcs/Server/Server.hpp b/srcs/Server/Server.hpp index 08971c36..c97803b1 100644 --- a/srcs/Server/Server.hpp +++ b/srcs/Server/Server.hpp @@ -1,10 +1,15 @@ #pragma once # include +# include # include "Result.hpp" # include "Socket.hpp" -# define FLAG_NONE 0 +# define FLAG_NONE 0 +# define MAX_SESSION 128 +# define INIT_FD (-1) +# define SELECT_TIMEOUT 0 + # define TEST_RESPONSE_MSG "test response" // Mock -> gmock ?? @@ -35,9 +40,19 @@ class Server { private: Socket _socket; - int _connect_fd; // close in destructor std::string _recv_message; + std::vector _client_fds; + fd_set _fds; + + + void init_fd_set(); + int get_max_fd(); + + Result wait_for_io_ready_fd(); + Result accept_and_store_connect_fd(); + Result communicate_with_ready_client(); + static Result select_fds(int max_fd, fd_set *fds); static Result accept_connection(int socket_fd); static Result recv_request(int connect_fd); static Result send_response(int connect_fd, const HttpResponse &response); diff --git a/test/unit_test/TestServer.cpp b/test/unit_test/TestServer.cpp index 0f08da1b..a6dfb7e6 100644 --- a/test/unit_test/TestServer.cpp +++ b/test/unit_test/TestServer.cpp @@ -1,9 +1,10 @@ #include #include +#include #include #include #include -#include +#include #include "gtest/gtest.h" #include "webserv.hpp" #include "Client.hpp" @@ -29,10 +30,17 @@ static void run_server_and_client(const char *server_ip, std::string &server_recv_msg, std::string &client_recv_msg); +static void run_server_and_multi_client(const char *server_ip, + const char *server_port, + const std::string &client_send_msg, + std::string &server_recv_msg, + std::vector &client_recv_msgs, + int client_count); // int port = 49152; TEST(ServerUnitTest, Constructor) { - EXPECT_NO_THROW(Server(SERVER_IP, SERVER_PORT));} + EXPECT_NO_THROW(Server(SERVER_IP, SERVER_PORT)); +} TEST(ServerUnitTest, ConstructorThrowException) { EXPECT_ANY_THROW(Server("hoge", SERVER_PORT)); @@ -56,9 +64,12 @@ TEST(ServerUnitTest, ConnectClientCase1) { client_recv_msg); EXPECT_EQ(msg, server_recv_msg); EXPECT_EQ("test response", client_recv_msg); + std::cerr << YELLOW " msg :[" << msg << "]" RESET << std::endl; + std::cerr << YELLOW " server_recv_msg:[" << server_recv_msg << "]" RESET << std::endl; + std::cerr << YELLOW " client_recv_msg:[" << client_recv_msg << "]" RESET << std::endl; } catch (std::exception const &e) { - std::cerr << e.what() << std::endl; + FAIL() << e.what() << std::endl; } } @@ -75,9 +86,12 @@ TEST(ServerUnitTest, ConnectClientCase2) { client_recv_msg); EXPECT_EQ(msg, server_recv_msg); EXPECT_EQ("test response", client_recv_msg); + std::cerr << YELLOW " msg :[" << msg << "]" RESET << std::endl; + std::cerr << YELLOW " server_recv_msg:[" << server_recv_msg << "]" RESET << std::endl; + std::cerr << YELLOW " client_recv_msg:[" << client_recv_msg << "]" RESET << std::endl; } catch (std::exception const &e) { - std::cerr << e.what() << std::endl; + FAIL() << e.what() << std::endl; } } @@ -94,9 +108,12 @@ TEST(ServerUnitTest, ConnectClientCase3) { client_recv_msg); EXPECT_EQ(msg, server_recv_msg); EXPECT_EQ("test response", client_recv_msg); + std::cerr << YELLOW " msg :[" << msg << "]" RESET << std::endl; + std::cerr << YELLOW " server_recv_msg:[" << server_recv_msg << "]" RESET << std::endl; + std::cerr << YELLOW " client_recv_msg:[" << client_recv_msg << "]" RESET << std::endl; } catch (std::exception const &e) { - std::cerr << e.what() << std::endl; + FAIL() << e.what() << std::endl; } } @@ -125,9 +142,12 @@ TEST(ServerUnitTest, ConnectClientCase4) { client_recv_msg); EXPECT_EQ(msg, server_recv_msg); EXPECT_EQ("test response", client_recv_msg); + std::cerr << YELLOW " msg :[" << msg << "]" RESET << std::endl; + std::cerr << YELLOW " server_recv_msg:[" << server_recv_msg << "]" RESET << std::endl; + std::cerr << YELLOW " client_recv_msg:[" << client_recv_msg << "]" RESET << std::endl; } catch (std::exception const &e) { - std::cerr << e.what() << std::endl; + FAIL() << e.what() << std::endl; } } @@ -155,6 +175,84 @@ TEST(ServerUnitTest, ConnectClientErrorCase2) { client_recv_msg)); } +TEST(ServerUnitTest, ConnectMultiClientCase1) { + int client_count = 2; + std::string msg = "test request"; + std::string server_recv_msg; + std::vector client_recv_msgs(client_count); + + try { + run_server_and_multi_client(SERVER_IP, + SERVER_PORT, + msg, + server_recv_msg, + client_recv_msgs, + client_count); + EXPECT_EQ(msg, server_recv_msg); + std::cerr << YELLOW " msg :[" << msg << "]" RESET << std::endl; + std::cerr << YELLOW " server_recv_msg:[" << server_recv_msg << "]" RESET << std::endl; + for (int i = 0; i < client_count; ++i) { + EXPECT_EQ("test response", client_recv_msgs[i]); + std::cerr << YELLOW " client_recv_msg(" << i << "):[" << client_recv_msgs[i] << "]" RESET << std::endl; + } + } + catch (std::exception const &e) { + FAIL() << e.what(); + } +} + +TEST(ServerUnitTest, ConnectMultiClientCase2) { + int client_count = 5; + std::string msg = "test request"; + std::string server_recv_msg; + std::vector client_recv_msgs(client_count); + + try { + run_server_and_multi_client(SERVER_IP, + SERVER_PORT, + msg, + server_recv_msg, + client_recv_msgs, + client_count); + EXPECT_EQ(msg, server_recv_msg); + std::cerr << YELLOW " msg :[" << msg << "]" RESET << std::endl; + std::cerr << YELLOW " server_recv_msg:[" << server_recv_msg << "]" RESET << std::endl; + for (int i = 0; i < client_count; ++i) { + EXPECT_EQ("test response", client_recv_msgs[i]); + std::cerr << YELLOW " client_recv_msg(" << i << "):[" << client_recv_msgs[i] << "]" RESET << std::endl; + } + } + catch (std::exception const &e) { + FAIL() << e.what(); + } +} + +TEST(ServerUnitTest, ConnectMultiClientCase3) { + int client_count = 20; + std::string msg = "test request"; + std::string server_recv_msg; + std::vector client_recv_msgs(client_count); + + try { + run_server_and_multi_client(SERVER_IP, + SERVER_PORT, + msg, + server_recv_msg, + client_recv_msgs, + client_count); + EXPECT_EQ(msg, server_recv_msg); + std::cerr << YELLOW " msg :[" << msg << "]" RESET << std::endl; + std::cerr << YELLOW " server_recv_msg:[" << server_recv_msg << "]" RESET << std::endl; + for (int i = 0; i < client_count; ++i) { + EXPECT_EQ("test response", client_recv_msgs[i]); + std::cerr << YELLOW " client_recv_msg(" << i << "):[" << client_recv_msgs[i] << "]" RESET << std::endl; + } + } + catch (std::exception const &e) { + FAIL() << e.what(); + } +} + /* helper */ static void *run_server(void *server_info) { s_server *s = (s_server *)server_info; @@ -222,3 +320,45 @@ static void run_server_and_client(const char *server_ip, server_recv_msg = server_info.recv_msg; client_recv_msg = client_info.recv_msg; } + +static void run_server_and_multi_client(const char *server_ip, + const char *server_port, + const std::string &client_send_msg, + std::string &server_recv_msg, + std::vector &client_recv_msgs, + int client_count) { + s_server server_info = {server_ip, server_port, ""}; + std::vector client_infos(client_count, {server_ip, server_port, client_send_msg, ""}); + pthread_t server_tid; + std::vector client_tids(client_count); + int ret_server, ret_client; + bool is_server_success, is_client_success; + + ret_server = pthread_create(&server_tid, NULL, run_server, (void *)&server_info); + if (ret_server != OK) { + throw std::runtime_error("pthread_create error"); + } + for (int i = 0; i < client_count; ++i) { + ret_client = pthread_create(&client_tids[i], NULL, run_client, (void *)&client_infos[i]); + if (ret_client != OK) { + throw std::runtime_error("pthread_create error"); + } + } + + ret_server = pthread_join(server_tid, (void **)&is_server_success); + if (ret_server != OK || !is_server_success) { + throw std::runtime_error("server error"); + } + for (int i = 0; i < client_count; ++i) { + ret_client = pthread_join(client_tids[i], (void **)&is_client_success); + if (ret_client != OK || !is_client_success) { + throw std::runtime_error("client error"); + } + } + // std::cout << "server_recv_msg:" << server_info.recv_msg << std::endl; + // std::cout << "client_recv_msg:" << client_info.recv_msg << std::endl; + server_recv_msg = server_info.recv_msg; + for (int i = 0; i < client_count; ++i) { + client_recv_msgs[i] = client_infos[i].recv_msg; + } +} \ No newline at end of file