Skip to content

Commit

Permalink
add(server, test): add select() for handle multiple I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
ak0327 committed Aug 30, 2023
1 parent 7498736 commit fe59c9f
Show file tree
Hide file tree
Showing 6 changed files with 329 additions and 43 deletions.
1 change: 1 addition & 0 deletions includes/Result.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class Result {
}

bool is_ok() const { return _is_ok; }
bool is_err() const { return !_is_ok; }

OkType get_ok_value() const {
if (_is_ok) {
Expand Down
1 change: 1 addition & 0 deletions includes/webserv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# define STAT_ERROR (-1)
# define ERROR (-1)
# define OK 0
# define BREAK 1

# define INVALID_ARGUMENT_ERROR_MSG "[Error] invalid argument"
# define INVALID_PATH_ERROR_MSG "[Error] invalid file path"
Expand Down
32 changes: 26 additions & 6 deletions srcs/Client/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Client::~Client() {
if (_connect_fd != ERROR) {
errno = 0;
if (close(_connect_fd) == ERROR) {
std::cerr << strerror(errno) << std::endl;
std::cerr << RED "[Client Error] close: " << strerror(errno) << RESET << std::endl;
}
_connect_fd = ERROR;
}
Expand All @@ -31,7 +31,8 @@ int Client::create_connect_socket() {
errno = 0;
connect_fd = socket(AF_INET, SOCK_STREAM, 0);
if (connect_fd == ERROR) {
throw std::runtime_error(strerror(errno));
std::string err_str = "[Client Error] socket: " + std::string(strerror(errno));
throw std::runtime_error(RED + err_str + RESET);
}
return connect_fd;
}
Expand All @@ -48,29 +49,38 @@ struct sockaddr_in Client::create_connect_addr(const char *server_ip,
}

void Client::process_server_connect(const std::string &send_msg) {
// printf(MAGENTA "client(pid:%d) 1 start\n" RESET, getpid());
// printf(MAGENTA "client(pid:%d) 2 connect\n" RESET, getpid());
connect_to_server(this->_connect_fd, this->_addr);
// printf(MAGENTA "client(pid:%d) 3 send\n" RESET, getpid());
send_to_server(this->_connect_fd, send_msg);
// printf(MAGENTA "client(pid:%d) 4 recv\n" RESET, getpid());
this->_recv_message = recv_message_from_server(this->_connect_fd);
// printf(MAGENTA " client(pid:%d) recv_msg:[%s]\n" RESET, getpid(), this->_recv_message.c_str());
// printf(MAGENTA "client(pid:%d) 5 end\n" RESET, getpid());
}

void Client::connect_to_server(int connect_fd, struct sockaddr_in addr) {
socklen_t len = sizeof(addr);

errno = 0;
if (connect(connect_fd, (struct sockaddr *)&addr, len) == ERROR) {
throw std::runtime_error(strerror(errno));
std::string err_str = "[Client Error] connect: " + std::string(strerror(errno));
throw std::runtime_error(RED + err_str + RESET);
}
}

void Client::send_to_server(int connect_fd, const std::string &send_msg) {
ssize_t send_size;
// std::cout << "msg:" << send_msg << std::endl;
size_t msg_len = send_msg.size() + 1;
const char *msg = send_msg.c_str();

errno = 0;
send_size = send(connect_fd, msg, msg_len, 0);
send_size = send(connect_fd, msg, msg_len, MSG_DONTWAIT);
if (send_size == ERROR) {
throw std::runtime_error(strerror(errno));
std::string err_str = "[Client Error] send: " + std::string(strerror(errno));
throw std::runtime_error(RED + err_str + RESET);
}
}

Expand All @@ -79,18 +89,28 @@ std::string Client::recv_message_from_server(int connect_fd) {
char buf[BUFSIZ + 1];
std::string recv_msg;

// printf(MAGENTA " client(pid:%d) 4-1 recv_msg start\n" RESET, getpid());

while (true) {
// printf(MAGENTA " client(pid:%d) 4-2 recv_msg recv\n" RESET, getpid());
errno = 0;
recv_size = recv(connect_fd, &buf, BUFSIZ, 0);
if (recv_size == ERROR) {
throw std::runtime_error(strerror(errno));
// printf(MAGENTA " client(pid:%d) recv_err errno:%d\n" RESET, getpid(), errno);
std::string err_str = "[Client Error] recv: " + std::string(strerror(errno));
throw std::runtime_error(RED + err_str + RESET);
}
buf[recv_size] = '\0';
// printf(MAGENTA " client(pid:%d) 4-3 buf:[%s]\n" RESET, getpid(), buf);
recv_msg += buf;
if (recv_size < BUFSIZ) {
// std::cout << MAGENTA " 3-6" RESET << std::endl;
break;
}
// printf(MAGENTA " client(pid:%d) 4-4 next\n" RESET, getpid());
}
// printf(MAGENTA " client(pid:%d) 4-5 recv_msg fin\n" RESET, getpid());

return recv_msg;
}

Expand Down
167 changes: 138 additions & 29 deletions srcs/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
#include <errno.h>
#include <sys/socket.h>
#include <unistd.h>
#include <algorithm>
#include <cstdio>
#include <cstring>
#include <ctime>
#include <iostream>
#include <vector>
#include "webserv.hpp"
#include "Color.hpp"
#include "Result.hpp"
Expand All @@ -13,52 +16,158 @@
Server::Server(const char *server_ip,
const char *server_port)
: _socket(server_ip, server_port),
_connect_fd(ERROR),
_recv_message() {
_recv_message(),
_client_fds(std::vector<int>(MAX_SESSION, INIT_FD)),
_fds() {
if (this->_socket.get_status() == ERROR) { // todo: use result
throw std::runtime_error("[Error] server initialization error");
throw std::runtime_error(RED "[Server Error] server initialization error" RESET);
}
}

Server::~Server() {
if (this->_connect_fd != ERROR) {
close_connection(this->_connect_fd);
this->_connect_fd = ERROR;
Server::~Server() {}

void Server::process_client_connection() {
while (true) {
// select
Result<int, std::string> init_result = wait_for_io_ready_fd();
if (init_result.is_err()) {
throw std::runtime_error(RED + init_result.get_err_value() + RESET);
}
if (init_result.get_ok_value() == SELECT_TIMEOUT) {
std::cout << "[INFO] server timeout" << std::endl;
break;
}

// accept
Result<int, std::string> connect_result = accept_and_store_connect_fd();
if (connect_result.is_err()) {
throw std::runtime_error(RED + connect_result.get_err_value() + RESET);
}

// connect (except newly accepted fd)
Result<int, std::string> communicate_result = communicate_with_ready_client();
if (communicate_result.is_err()) {
throw std::runtime_error(RED + communicate_result.get_err_value() + RESET);
}
}
}

void Server::process_client_connection() {
// select
// todo
void Server::init_fd_set() {
std::vector<int>::iterator fd;

// accept
Result<int, std::string> accept_result = accept_connection(this->_socket.get_socket_fd());
if (!accept_result.is_ok()) {
throw std::runtime_error("[Error] accept:" + accept_result.get_err_value());
FD_ZERO(&this->_fds);
FD_SET(this->_socket.get_socket_fd(), &this->_fds);
for (fd = this->_client_fds.begin(); fd != this->_client_fds.end(); ++fd) {
if (*fd == INIT_FD) {
continue;
}
FD_SET(*fd, &this->_fds);
}
}

int Server::get_max_fd() {
int max_fd;

max_fd = *std::max_element(this->_client_fds.begin(), this->_client_fds.end());
return std::max(max_fd, this->_socket.get_socket_fd());
}

Result<int, std::string> Server::wait_for_io_ready_fd() {
int max_fd;

init_fd_set();
max_fd = get_max_fd();
Result<int, std::string> select_result = select_fds(max_fd, &this->_fds);
if (select_result.is_err()) {
return Result<int, std::string>::err("[Server Error] select:" + select_result.get_err_value());
}
return select_result;
}

Result<int, std::string> Server::accept_and_store_connect_fd() {
int connect_fd;
bool limit_over;

if (!FD_ISSET(this->_socket.get_socket_fd(), &this->_fds)) {
return Result<int, std::string>::ok(OK);
}
this->_connect_fd = accept_result.get_ok_value();

// recv
Result<std::string, std::string>recv_result = recv_request(this->_connect_fd);
if (!recv_result.is_ok()) {
throw std::runtime_error("[Error] recv:" + recv_result.get_err_value());
Result<int, std::string> accept_result = accept_connection(this->_socket.get_socket_fd());
if (accept_result.is_err()) {
return Result<int, std::string>::err("[Server Error] accept:" + accept_result.get_err_value());
}
connect_fd = accept_result.get_ok_value();
limit_over = true;
for (std::vector<int>::iterator fd = this->_client_fds.begin(); fd != this->_client_fds.end(); ++fd) {
if (*fd == INIT_FD) {
*fd = connect_fd;
limit_over = false;
break;
}
}
if (limit_over) {
std::cerr << "[INFO] server over max connection"<< std::endl;
errno = 0;
if (close(connect_fd) == ERROR) {
std::cerr << "[Server Error] close :"<< strerror(errno) << std::endl;
}
}
this->_recv_message = recv_result.get_err_value();
// std::cout << YELLOW "recv_msg(string):[" << this->_recv_message << "]" RESET << std::endl;
return Result<int, std::string>::ok(OK);
}

Result<int, std::string> Server::communicate_with_ready_client() {
for (std::vector<int>::iterator fd = this->_client_fds.begin(); fd != this->_client_fds.end(); ++fd) {
if (*fd == INIT_FD) {
continue;
}
if (!FD_ISSET(*fd, &this->_fds)) {
continue;
}

// request, response
HttpRequest request = HttpRequest(this->_recv_message);
HttpResponse response = HttpResponse(request);
// recv
Result<std::string, std::string>recv_result = recv_request(*fd);
if (recv_result.is_err()) {
return Result<int, std::string>::err("[Server Error] recv:" + recv_result.get_err_value());
}
this->_recv_message = recv_result.get_ok_value();
// printf(BLUE " server recv_msg:[%s]\n" RESET, this->_recv_message.c_str());

// request, response
HttpRequest request = HttpRequest(this->_recv_message);
HttpResponse response = HttpResponse(request);

// send
Result<int, std::string> send_result = send_response(*fd, response);
if (send_result.is_err()) {
// printf(BLUE " server send error\n" RESET);
return Result<int, std::string>::err("[Server Error] send:" + send_result.get_err_value());
}

// send
Result<int, std::string> send_result = send_response(this->_connect_fd, response);
if (!send_result.is_ok()) {
throw std::runtime_error("[Error] send:" + send_result.get_err_value());
FD_CLR(*fd, &this->_fds);
close_connection(*fd);
*fd = INIT_FD;
}
return Result<int, std::string>::ok(OK);
}

std::string Server::get_recv_message() const { return this->_recv_message; }

Result<int, std::string> Server::select_fds(int max_fd, fd_set *fds) {
struct timeval timeout = {};
int select_ret;

// timeout < 1.5sec, communicate error ??
timeout.tv_sec = 1;
timeout.tv_usec = 500 * 1000; // 500ms

errno = 0;
select_ret = select(max_fd + 1, fds, NULL, NULL, &timeout);
if (select_ret == ERROR) {
return Result<int, std::string>::err(strerror(errno));
}
return Result<int, std::string>::ok(select_ret);
}

Result<int, std::string> Server::accept_connection(int socket_fd) {
int connect_fd;

Expand All @@ -77,7 +186,7 @@ Result<std::string, std::string> Server::recv_request(int connect_fd) {

while (true) {
errno = 0;
recv_size = recv(connect_fd, buf, BUFSIZ, MSG_DONTWAIT);
recv_size = recv(connect_fd, buf, BUFSIZ, FLAG_NONE); // todo: flg=MSG_DONTWAIT, errno=EAGAIN -> continue?
if (recv_size == ERROR || recv_size > BUFSIZ) {
return Result<std::string, std::string>::err(strerror(errno));
}
Expand Down
19 changes: 17 additions & 2 deletions srcs/Server/Server.hpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
#pragma once

# include <string>
# include <vector>
# include "Result.hpp"
# include "Socket.hpp"

# define FLAG_NONE 0
# define FLAG_NONE 0
# define MAX_SESSION 128
# define INIT_FD (-1)
# define SELECT_TIMEOUT 0

# define TEST_RESPONSE_MSG "test response"

// Mock -> gmock ??
Expand Down Expand Up @@ -35,9 +40,19 @@ class Server {

private:
Socket _socket;
int _connect_fd; // close in destructor
std::string _recv_message;
std::vector<int> _client_fds;
fd_set _fds;


void init_fd_set();
int get_max_fd();

Result<int, std::string> wait_for_io_ready_fd();
Result<int, std::string> accept_and_store_connect_fd();
Result<int, std::string> communicate_with_ready_client();

static Result<int, std::string> select_fds(int max_fd, fd_set *fds);
static Result<int, std::string> accept_connection(int socket_fd);
static Result<std::string, std::string> recv_request(int connect_fd);
static Result<int, std::string> send_response(int connect_fd, const HttpResponse &response);
Expand Down
Loading

0 comments on commit fe59c9f

Please sign in to comment.