Skip to content

Commit

Permalink
Use splitters from coroio
Browse files Browse the repository at this point in the history
  • Loading branch information
resetius committed Dec 2, 2023
1 parent e16939d commit 644c4cb
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 128 deletions.
131 changes: 4 additions & 127 deletions client/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,133 +7,10 @@
uint64_t inflight = 0;
uint64_t maxInflight = 128;

struct TLine {
std::string_view Part1;
std::string_view Part2;

size_t Size() const {
return Part1.size() + Part2.size();
}

operator bool() const {
return !Part1.empty();
}
};

struct TLineSplitter {
public:
TLineSplitter(int maxLen)
: WPos(0)
, RPos(0)
, Size(0)
, Cap(maxLen * 2)
, Data(Cap, 0)
, View(Data)
{ }

TLine Pop() {
auto end = View.substr(RPos, Size);
auto begin = View.substr(0, Size - end.size());

auto p1 = end.find('\n');
if (p1 == std::string_view::npos) {
auto p2 = begin.find('\n');
if (p2 == std::string_view::npos) {
return {};
}

RPos = p2 + 1;
Size -= end.size() + p2 + 1;
return TLine { end, begin.substr(0, p2 + 1) };
} else {
RPos += p1 + 1;
Size -= p1 + 1;
return TLine { end.substr(0, p1 + 1), {} };
}
}

void Push(const char* buf, size_t size) {
if (Size + size > Data.size()) {
throw std::runtime_error("Overflow");
}

auto first = std::min(size, Cap - WPos);
memcpy(&Data[WPos], buf, first);
memcpy(&Data[0], buf + first, std::max<size_t>(0, size - first));
WPos = (WPos + size) % Cap;
Size = Size + size;
}

private:
size_t WPos;
size_t RPos;
size_t Size;
size_t Cap;
std::string Data;
std::string_view View;
};

struct TZeroCopyLineSplitter {
public:
TZeroCopyLineSplitter(int maxLen)
: WPos(0)
, RPos(0)
, Size(0)
, Cap(maxLen * 2)
, Data(Cap, 0)
, View(Data)
{ }

TLine Pop() {
auto end = View.substr(RPos, Size);
auto begin = View.substr(0, Size - end.size());

auto p1 = end.find('\n');
if (p1 == std::string_view::npos) {
auto p2 = begin.find('\n');
if (p2 == std::string_view::npos) {
return {};
}

RPos = p2 + 1;
Size -= end.size() + p2 + 1;
return TLine { end, begin.substr(0, p2 + 1) };
} else {
RPos += p1 + 1;
Size -= p1 + 1;
return TLine { end.substr(0, p1 + 1), {} };
}
}

std::string_view Acquire(size_t size) {
size = std::min(size, Cap - Size);
if (size == 0) {
throw std::runtime_error("Overflow");
}
auto first = std::min(size, Cap - WPos);
if (first) {
return {&Data[WPos], first};
} else {
return {&Data[0], size};
}
}

void Commit(size_t size) {
WPos = (WPos + size) % Cap;
Size += size;
}

private:
size_t WPos;
size_t RPos;
size_t Size;
size_t Cap;
std::string Data;
std::string_view View;
};
using namespace NNet;

template<typename Poller>
NNet::TTestTask ClientReader(Poller& poller, typename Poller::TSocket& socket) {
TTestTask ClientReader(Poller& poller, typename Poller::TSocket& socket) {
try {
while (true) {
auto response = co_await TReader(socket).Read();
Expand All @@ -148,9 +25,9 @@ NNet::TTestTask ClientReader(Poller& poller, typename Poller::TSocket& socket) {
}

template<typename Poller>
NNet::TSimpleTask Client(Poller& poller, NNet::TAddress addr) {
TSimpleTask Client(Poller& poller, TAddress addr) {
typename Poller::TSocket socket(std::move(addr), poller);
NNet::TSocket input{NNet::TAddress{}, 0, poller}; // stdin
TSocket input{TAddress{}, 0, poller}; // stdin
TZeroCopyLineSplitter splitter(2 * 1024);
co_await socket.Connect();
std::cout << "Connected\n";
Expand Down
2 changes: 1 addition & 1 deletion coroio
Submodule coroio updated 2 files
+135 −0 coroio/sockutils.hpp
+85 −0 tests/tests.cpp

0 comments on commit 644c4cb

Please sign in to comment.