From d106d0a9384a7acbf12d15076fac5808b253a459 Mon Sep 17 00:00:00 2001 From: Aous Naman Date: Sun, 21 Apr 2024 09:57:16 +1000 Subject: [PATCH] I think packets_handler is complete, yet to be tested --- .../ojph_stream_expand/ojph_stream_expand.cpp | 41 ++++-- .../stream_expand_support.cpp | 108 ++++++++++++--- .../stream_expand_support.h | 127 +++++++++++++----- 3 files changed, 215 insertions(+), 61 deletions(-) diff --git a/src/apps/ojph_stream_expand/ojph_stream_expand.cpp b/src/apps/ojph_stream_expand/ojph_stream_expand.cpp index 0148b36..b662f8e 100644 --- a/src/apps/ojph_stream_expand/ojph_stream_expand.cpp +++ b/src/apps/ojph_stream_expand/ojph_stream_expand.cpp @@ -54,7 +54,7 @@ bool get_arguments(int argc, char *argv[], char *&src_addr, char *&src_port, char *&target_name, ojph::ui32& num_threads, ojph::ui32& num_inflight_packets, - bool& display, bool& quiet) + bool& quiet, bool& display, bool& decode) { ojph::cli_interpreter interpreter; interpreter.init(argc, argv); @@ -67,8 +67,9 @@ bool get_arguments(int argc, char *argv[], interpreter.reinterpret("-num_threads", num_threads); interpreter.reinterpret("-num_packets", num_inflight_packets); - display = interpreter.reinterpret("-display"); quiet = interpreter.reinterpret("-quiet"); + display = interpreter.reinterpret("-display"); + decode = interpreter.reinterpret("-decode"); if (interpreter.is_exhausted() == false) { printf("The following arguments were not interpreted:\n"); @@ -97,6 +98,17 @@ bool get_arguments(int argc, char *argv[], printf("Please set \"-num_threads\" to 1 or more.\n"); return false; } + if (num_inflight_packets < 1) + { + printf("Please set \"-num_packets\" to 1 or more.\n"); + return false; + } + if (decode && target_name == NULL) + { + printf("Since \"-decode\" was specified, please set \"-target_name\" " + "for the target name of decoded files.\n"); + return false; + } return true; } @@ -104,8 +116,6 @@ bool get_arguments(int argc, char *argv[], ////////////////////////////////////////////////////////////////////////////// int main(int argc, char* argv[]) { - constexpr int buffer_size = 2048; // buffer size - char *recv_addr = NULL; char *recv_port = NULL; char *src_addr = NULL; @@ -113,8 +123,9 @@ int main(int argc, char* argv[]) char *target_name = NULL; ojph::ui32 num_threads = 1; ojph::ui32 num_inflight_packets = 5; - bool display = false; bool quiet = false; + bool display = false; + bool decode = false; if (argc <= 1) { printf( @@ -152,13 +163,13 @@ int main(int argc, char* argv[]) } if (!get_arguments(argc, argv, recv_addr, recv_port, src_addr, src_port, target_name, num_threads, num_inflight_packets, - display, quiet)) + quiet, display, decode)) { exit(-1); } ojph::stex::frames_handler frames_handler; - frames_handler.init(quiet, num_threads, target_name, display); + frames_handler.init(quiet, display, decode, num_threads, target_name); ojph::stex::packets_handler packets_handler; packets_handler.init(quiet, num_inflight_packets, &frames_handler); ojph::net::socket_manager smanager; @@ -230,21 +241,27 @@ int main(int argc, char* argv[]) ojph::stex::rtp_packet* packet = NULL; while (1) { - packet = packets_handler.exchange(packet); + if (packet == NULL || packet->num_bytes != 0) // num_bytes == 0 + packet = packets_handler.exchange(packet); // if packet was ignored + struct sockaddr_in si_other; socklen_t socklen = sizeof(si_other); // receive data -- this is a blocking call - packet->num_bytes = (int)recvfrom(s.intern(), (char*)packet->data, - buffer_size, 0, (struct sockaddr *) &si_other, &socklen); - if (packet->num_bytes < 0) + packet->num_bytes = 0; // if we ignore the packet, we can continue + int num_bytes = (int)recvfrom(s.intern(), (char*)packet->data, + packet->max_size, 0, (struct sockaddr *) &si_other, &socklen); + if (num_bytes < 0) { std::string err = smanager.get_last_error_message(); OJPH_INFO(0x02000008, "Failed to receive data : %s\n", err.data()); continue; // if we wish to continue } if ((src_addr && saddr != smanager.get_addr(si_other)) || - (src_port && sport != si_other.sin_port)) + (src_port && sport != si_other.sin_port)) { continue; + } + packet->num_bytes = (ojph::ui32)num_bytes; + if (!quiet && !src_printed) { constexpr int buf_size = 128; diff --git a/src/apps/ojph_stream_expand/stream_expand_support.cpp b/src/apps/ojph_stream_expand/stream_expand_support.cpp index a860253..29d26ae 100644 --- a/src/apps/ojph_stream_expand/stream_expand_support.cpp +++ b/src/apps/ojph_stream_expand/stream_expand_support.cpp @@ -57,7 +57,9 @@ void packets_handler::init(bool quiet, ui32 num_packets, frames_handler* frames) { assert(this->num_packets == 0); - avail = packet_store = new rtp_packet[num_packets]; + avail = packet_store = new rtp_packet[num_packets]; + for (ui32 i = 0; i < num_packets - 1; ++i) + packet_store[i].next = packet_store + i + 1; this->quiet = quiet; this->num_packets = num_packets; this->frames = frames; @@ -66,26 +68,91 @@ void packets_handler::init(bool quiet, ui32 num_packets, /////////////////////////////////////////////////////////////////////////////// rtp_packet* packets_handler::exchange(rtp_packet* p) { - - if (p != NULL) - { // check validity/supported features in p + if (p == NULL) { + assert(in_use == NULL && num_packets > 0); + // move from avail to in_use + rtp_packet* p = avail; + avail = avail->next; + p->next = in_use; + in_use = p; + return p; } + if (p->num_bytes == 0) + return p; + + // We can a series of test to remove/warn about unsupported options + // but we currently do not do that yet - if (p) + bool result = frames->push(p); + if (result == false) { - ui32 packet_type = p->get_packet_type(); - if (packet_type == rtp_packet::packet_type::PT_MAIN) - printf("A new frame %d\n", p->get_time_stamp()); + if (avail) + { // move from avail to in_use + p = avail; + avail = avail->next; + p->next = in_use; + in_use = p; + } + else + { + assert(p->next != NULL || num_packets == 1); + if (p->next != NULL) + { // use the oldest/last packet in in_use + assert(p == in_use); + rtp_packet *pp = p; // previous p + p = p->next; + while(p->next != NULL) { pp = p; p = p->next; } + pp->next = NULL; + p->next = in_use; + in_use = p; + } + } + return p; } + else { + // move packet to avail + assert(p == in_use); + in_use = in_use->next; + p->next = avail; + avail = p; + // test if you can push more packets + p = in_use; + rtp_packet *pp = p; // previous p + while (p != NULL) + { + result = frames->push(p); + if (result) + { + // move packet to avail + if (p == in_use) + { + in_use = in_use->next; + p->next = avail; + avail = p; + p = in_use; + } + else + { + pp->next = p->next; + p->next = avail; + avail = p; + p = pp->next; + } + } + else { + pp = p; + p = p->next; + } + } - if (avail != NULL) - { + // get one from avail and move it to in_use + p = avail; + avail = avail->next; + p->next = in_use; + in_use = p; + return p; } - else - { - } - return packet_store; } /////////////////////////////////////////////////////////////////////////////// @@ -104,19 +171,24 @@ frames_handler::~frames_handler() } /////////////////////////////////////////////////////////////////////////////// -void frames_handler::init(bool quiet, ui32 num_threads, - const char *target_name, bool display) +void frames_handler::init(bool quiet, bool display, bool decode, + ui32 num_threads, const char *target_name) { this->quiet = quiet; + this->display = display; + this->decode = decode; this->num_threads = num_threads; this->target_name = target_name; - this->display = display; - num_files = num_threads + 1; files = new stex_file[num_files]; } /////////////////////////////////////////////////////////////////////////////// +bool frames_handler::push(rtp_packet* p) +{ + + return false; +} } // !stex namespace } // !ojph namespace \ No newline at end of file diff --git a/src/apps/ojph_stream_expand/stream_expand_support.h b/src/apps/ojph_stream_expand/stream_expand_support.h index e044d87..5feaf78 100644 --- a/src/apps/ojph_stream_expand/stream_expand_support.h +++ b/src/apps/ojph_stream_expand/stream_expand_support.h @@ -76,6 +76,7 @@ struct rtp_packet rtp_packet() { num_bytes = 0; next = NULL; } public: + // RTP header ui32 get_rtp_version() { return ((ui32)data[0]) >> 6; } bool is_padded() { return (data[0] & 0x20) != 0; } bool is_extended() { return (data[0] & 0x10) != 0; } @@ -91,41 +92,95 @@ struct rtp_packet { return ntohl(*(ui32*)(data + 4)); } ui32 get_ssrc() // not used for the time being { return ntohl(*(ui32*)(data + 8)); } + + // common in main and body payload headers ui32 get_packet_type() { return ((ui32)data[12]) >> 6; } - ui32 get_TP() + ui32 get_TP() { return (((ui32)data[12]) >> 3) & 0x7; } - ui32 get_ORDH() - { return ((ui32)data[12]) & 0x7; } - bool is_PTSTAMP_used() - { return (((ui32)data[13]) & 0x80) != 0; } - ui32 get_XTRAC() - { return (((ui32)data[13]) >> 4) & 0x7; } - ui32 get_PTSTAMP() { + ui32 get_ORDH() { + if (get_packet_type() != PT_BODY) return ((ui32)data[12]) & 0x7; + else return (((ui32)data[13]) >> 7) & 0x1; + } + ui32 get_PTSTAMP() { ui32 result = (((ui32)data[13]) & 0xF) << 8; result |= (ui32)data[14]; return result; } - bool is_codestream_header_reusable() - { return (((ui32)data[16] >> 7) & 1) != 0; } - bool is_component_colorimetry_used() - { return (((ui32)data[16] >> 6) & 1) != 0; } - bool is_codeblock_caching_used() - { return (((ui32)data[16] >> 5) & 1) != 0; } - bool is_RANGE() - { return ((ui32)data[16] & 1) != 0; } - ui32 get_PRIMS() - { return (ui32)data[17]; } - ui32 get_TRANS() - { return (ui32)data[18]; } - ui32 get_MAT() - { return (ui32)data[19]; } + ui8* get_data() + { return data + 20; } + ui32 get_data_size() + { return (ui32)num_bytes - 20; } + + // only in main payload header + bool is_PTSTAMP_used() { + assert(get_packet_type() != PT_BODY); + return (((ui32)data[13]) & 0x80) != 0; + } + ui32 get_XTRAC() { + assert(get_packet_type() != PT_BODY); + return (((ui32)data[13]) >> 4) & 0x7; + } + bool is_codestream_header_reusable() { + assert(get_packet_type() != PT_BODY); + return (((ui32)data[16]) & 0x80) != 0; + } + bool is_component_colorimetry_used() { + assert(get_packet_type() != PT_BODY); + return (((ui32)data[16]) & 0x40) != 0; + } + bool is_codeblock_caching_used() { + assert(get_packet_type() != PT_BODY); + return (((ui32)data[16]) & 0x20) != 0; + } + bool is_RANGE() { + assert(get_packet_type() != PT_BODY); + return ((ui32)data[16] & 1) != 0; + } + ui32 get_PRIMS(){ + assert(get_packet_type() != PT_BODY); + return (ui32)data[17]; + } + ui32 get_TRANS() { + assert(get_packet_type() != PT_BODY); + return (ui32)data[18]; + } + ui32 get_MAT() { + assert(get_packet_type() != PT_BODY); + return (ui32)data[19]; + } + + // only in body payload header + ui32 get_RES() { + assert(get_packet_type() == PT_BODY); + return ((ui32)data[12]) & 0x7; + } + ui32 get_QUAL() { + assert(get_packet_type() == PT_BODY); + return (((ui32)data[13]) >> 4) & 0x7; + } + ui32 get_data_pos() { + ui32 result = 0; + if (get_packet_type() == PT_BODY) { + result = ((ui32)data[16]) << 4; + result |= (((ui32)data[17]) >> 4) & 0xF; + } + return result; + } + ui32 get_PID() { + assert(get_packet_type() == PT_BODY); + ui32 result = (((ui32)data[17]) & 0xF) << 16; + result |= ((ui32)data[18]) << 8; + result |= ((ui32)data[19]); + return result; + } + public: static constexpr int max_size = 2048; //!