Skip to content

Commit

Permalink
I think packets_handler is complete, yet to be tested
Browse files Browse the repository at this point in the history
  • Loading branch information
aous72 committed Apr 20, 2024
1 parent 584594f commit d106d0a
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 61 deletions.
41 changes: 29 additions & 12 deletions src/apps/ojph_stream_expand/ojph_stream_expand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ bool get_arguments(int argc, char *argv[],
char *&src_addr, char *&src_port,
char *&target_name, ojph::ui32& num_threads,
ojph::ui32& num_inflight_packets,
bool& display, bool& quiet)
bool& quiet, bool& display, bool& decode)
{
ojph::cli_interpreter interpreter;
interpreter.init(argc, argv);
Expand All @@ -67,8 +67,9 @@ bool get_arguments(int argc, char *argv[],
interpreter.reinterpret("-num_threads", num_threads);
interpreter.reinterpret("-num_packets", num_inflight_packets);

display = interpreter.reinterpret("-display");
quiet = interpreter.reinterpret("-quiet");
display = interpreter.reinterpret("-display");
decode = interpreter.reinterpret("-decode");

if (interpreter.is_exhausted() == false) {
printf("The following arguments were not interpreted:\n");
Expand Down Expand Up @@ -97,24 +98,34 @@ bool get_arguments(int argc, char *argv[],
printf("Please set \"-num_threads\" to 1 or more.\n");
return false;
}
if (num_inflight_packets < 1)
{
printf("Please set \"-num_packets\" to 1 or more.\n");
return false;
}
if (decode && target_name == NULL)
{
printf("Since \"-decode\" was specified, please set \"-target_name\" "
"for the target name of decoded files.\n");
return false;
}

return true;
}

//////////////////////////////////////////////////////////////////////////////
int main(int argc, char* argv[])
{
constexpr int buffer_size = 2048; // buffer size

char *recv_addr = NULL;
char *recv_port = NULL;
char *src_addr = NULL;
char *src_port = NULL;
char *target_name = NULL;
ojph::ui32 num_threads = 1;
ojph::ui32 num_inflight_packets = 5;
bool display = false;
bool quiet = false;
bool display = false;
bool decode = false;

if (argc <= 1) {
printf(
Expand Down Expand Up @@ -152,13 +163,13 @@ int main(int argc, char* argv[])
}
if (!get_arguments(argc, argv, recv_addr, recv_port, src_addr, src_port,
target_name, num_threads, num_inflight_packets,
display, quiet))
quiet, display, decode))
{
exit(-1);
}

ojph::stex::frames_handler frames_handler;
frames_handler.init(quiet, num_threads, target_name, display);
frames_handler.init(quiet, display, decode, num_threads, target_name);
ojph::stex::packets_handler packets_handler;
packets_handler.init(quiet, num_inflight_packets, &frames_handler);
ojph::net::socket_manager smanager;
Expand Down Expand Up @@ -230,21 +241,27 @@ int main(int argc, char* argv[])
ojph::stex::rtp_packet* packet = NULL;
while (1)
{
packet = packets_handler.exchange(packet);
if (packet == NULL || packet->num_bytes != 0) // num_bytes == 0
packet = packets_handler.exchange(packet); // if packet was ignored

struct sockaddr_in si_other;
socklen_t socklen = sizeof(si_other);
// receive data -- this is a blocking call
packet->num_bytes = (int)recvfrom(s.intern(), (char*)packet->data,
buffer_size, 0, (struct sockaddr *) &si_other, &socklen);
if (packet->num_bytes < 0)
packet->num_bytes = 0; // if we ignore the packet, we can continue
int num_bytes = (int)recvfrom(s.intern(), (char*)packet->data,
packet->max_size, 0, (struct sockaddr *) &si_other, &socklen);
if (num_bytes < 0)
{
std::string err = smanager.get_last_error_message();
OJPH_INFO(0x02000008, "Failed to receive data : %s\n", err.data());
continue; // if we wish to continue
}
if ((src_addr && saddr != smanager.get_addr(si_other)) ||
(src_port && sport != si_other.sin_port))
(src_port && sport != si_other.sin_port)) {
continue;
}
packet->num_bytes = (ojph::ui32)num_bytes;

if (!quiet && !src_printed)
{
constexpr int buf_size = 128;
Expand Down
108 changes: 90 additions & 18 deletions src/apps/ojph_stream_expand/stream_expand_support.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ void packets_handler::init(bool quiet, ui32 num_packets,
frames_handler* frames)
{
assert(this->num_packets == 0);
avail = packet_store = new rtp_packet[num_packets];
avail = packet_store = new rtp_packet[num_packets];
for (ui32 i = 0; i < num_packets - 1; ++i)
packet_store[i].next = packet_store + i + 1;
this->quiet = quiet;
this->num_packets = num_packets;
this->frames = frames;
Expand All @@ -66,26 +68,91 @@ void packets_handler::init(bool quiet, ui32 num_packets,
///////////////////////////////////////////////////////////////////////////////
rtp_packet* packets_handler::exchange(rtp_packet* p)
{

if (p != NULL)
{ // check validity/supported features in p
if (p == NULL) {
assert(in_use == NULL && num_packets > 0);
// move from avail to in_use
rtp_packet* p = avail;
avail = avail->next;
p->next = in_use;
in_use = p;
return p;
}
if (p->num_bytes == 0)
return p;

// We can a series of test to remove/warn about unsupported options
// but we currently do not do that yet

if (p)
bool result = frames->push(p);
if (result == false)
{
ui32 packet_type = p->get_packet_type();
if (packet_type == rtp_packet::packet_type::PT_MAIN)
printf("A new frame %d\n", p->get_time_stamp());
if (avail)
{ // move from avail to in_use
p = avail;
avail = avail->next;
p->next = in_use;
in_use = p;
}
else
{
assert(p->next != NULL || num_packets == 1);
if (p->next != NULL)
{ // use the oldest/last packet in in_use
assert(p == in_use);
rtp_packet *pp = p; // previous p
p = p->next;
while(p->next != NULL) { pp = p; p = p->next; }
pp->next = NULL;
p->next = in_use;
in_use = p;
}
}
return p;
}
else {
// move packet to avail
assert(p == in_use);
in_use = in_use->next;
p->next = avail;
avail = p;

// test if you can push more packets
p = in_use;
rtp_packet *pp = p; // previous p
while (p != NULL)
{
result = frames->push(p);
if (result)
{
// move packet to avail
if (p == in_use)
{
in_use = in_use->next;
p->next = avail;
avail = p;
p = in_use;
}
else
{
pp->next = p->next;
p->next = avail;
avail = p;
p = pp->next;
}
}
else {
pp = p;
p = p->next;
}
}

if (avail != NULL)
{
// get one from avail and move it to in_use
p = avail;
avail = avail->next;
p->next = in_use;
in_use = p;
return p;
}
else
{
}
return packet_store;
}

///////////////////////////////////////////////////////////////////////////////
Expand All @@ -104,19 +171,24 @@ frames_handler::~frames_handler()
}

///////////////////////////////////////////////////////////////////////////////
void frames_handler::init(bool quiet, ui32 num_threads,
const char *target_name, bool display)
void frames_handler::init(bool quiet, bool display, bool decode,
ui32 num_threads, const char *target_name)
{
this->quiet = quiet;
this->display = display;
this->decode = decode;
this->num_threads = num_threads;
this->target_name = target_name;
this->display = display;

num_files = num_threads + 1;
files = new stex_file[num_files];
}

///////////////////////////////////////////////////////////////////////////////
bool frames_handler::push(rtp_packet* p)
{

return false;
}

} // !stex namespace
} // !ojph namespace
Loading

0 comments on commit d106d0a

Please sign in to comment.