From 903c6f7f64137591cd28ff47e88ef6dfd5104f99 Mon Sep 17 00:00:00 2001 From: Cameron Jackson Date: Mon, 21 Oct 2024 13:47:03 -0700 Subject: [PATCH 01/12] additional constants --- ait/core/__init__.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ait/core/__init__.py b/ait/core/__init__.py index 54698e5e..6da68405 100644 --- a/ait/core/__init__.py +++ b/ait/core/__init__.py @@ -11,11 +11,12 @@ # laws and regulations. User has the responsibility to obtain export licenses, # or other export authority as may be required before exporting such # information to foreign countries or providing access to foreign persons. - import sys +from ait.core import cfg +from ait.core import log + # cfg isn't used but we want the AIT-level config attribute created -from ait.core import cfg, log # noqa def deprecated(message): @@ -45,3 +46,6 @@ def deprecated_func(*args, **kwargs): sys.modules["ait"].SERVER_DEFAULT_XSUB_URL = "tcp://*:5559" # type: ignore[attr-defined] sys.modules["ait"].SERVER_DEFAULT_XPUB_URL = "tcp://*:5560" # type: ignore[attr-defined] + +sys.modules["ait"].MIN_PORT = 1024 # type: ignore[attr-defined] +sys.modules["ait"].MAX_PORT = 65535 # type: ignore[attr-defined] From 9d6655075697da1694a7705d23caf9022bf9d11c Mon Sep 17 00:00:00 2001 From: Cameron Jackson Date: Wed, 23 Oct 2024 06:41:47 -0700 Subject: [PATCH 02/12] original stream changes --- ait/core/server/client.py | 301 +++++++++++++++++++++++++-- ait/core/server/server.py | 59 ++---- ait/core/server/stream.py | 137 +++++++++++- tests/ait/core/server/test_client.py | 82 ++++++++ tests/ait/core/server/test_server.py | 4 +- tests/ait/core/server/test_stream.py | 286 +++++++++++++++++++++---- 6 files changed, 756 insertions(+), 113 deletions(-) diff --git a/ait/core/server/client.py b/ait/core/server/client.py index 6c673caf..40a441ae 100644 --- a/ait/core/server/client.py +++ b/ait/core/server/client.py @@ -1,7 +1,6 @@ -import gevent -import gevent.socket -import gevent.server as gs import gevent.monkey +import gevent.server as gs +import gevent.socket gevent.monkey.patch_all() @@ -27,13 +26,12 @@ def __init__( zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL, **kwargs, ): - self.context = zmq_context # open PUB socket & connect to broker self.pub = self.context.socket(zmq.PUB) self.pub.connect(zmq_proxy_xsub_url.replace("*", "localhost")) - if 'listener' in kwargs and isinstance(kwargs['listener'], int) : - kwargs['listener'] = "127.0.0.1:"+str(kwargs['listener']) + if "listener" in kwargs and isinstance(kwargs["listener"], int): + kwargs["listener"] = "127.0.0.1:" + str(kwargs["listener"]) # calls gevent.Greenlet or gs.DatagramServer __init__ super(ZMQClient, self).__init__(**kwargs) @@ -89,7 +87,6 @@ def __init__( zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL, **kwargs, ): - super(ZMQInputClient, self).__init__( zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url ) @@ -120,7 +117,7 @@ def _run(self): raise (e) -class PortOutputClient(ZMQInputClient): +class OutputClient(ZMQInputClient): """ This is the parent class for all outbound streams which publish to a port. It opens a UDP port to publish to and publishes @@ -134,21 +131,61 @@ def __init__( zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL, **kwargs, ): - - super(PortOutputClient, self).__init__( + super(OutputClient, self).__init__( zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url ) - self.out_port = kwargs["output"] + self.protocol = "UDP" + output_spec = kwargs.get("output", None) + if output_spec is None: + raise ValueError(f"Invalid output client specification: {output_spec}") + if type(output_spec) is int: + self.host = "localhost" + self.out_port = output_spec + self.pub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + elif ( + type(output_spec) in [list, tuple] + and len(output_spec) == 1 + and type(output_spec[0]) is int + ): + self.host = "localhost" + self.out_port = output_spec[0] + self.pub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + elif ( + type(output_spec) in [list, tuple] + and len(output_spec) == 3 + and type(output_spec[0]) is str + and output_spec[0].upper() == "UDP" + and type(output_spec[1]) is str + and type(output_spec[2]) is int + ): + self.host = output_spec[1] + self.out_port = output_spec[2] + self.pub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + elif ( + len(output_spec) == 3 + and type(output_spec[0]) is str + and output_spec[0].upper() == "TCP" + and type(output_spec[1]) is str + and type(output_spec[2]) is int + ): + self.protocol = "TCP" + self.host = output_spec[1] + self.out_port = output_spec[2] + self.pub = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + else: + raise ValueError(f"Invalid output client specification: {output_spec}") self.context = zmq_context - # override pub to be udp socket - self.pub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) def publish(self, msg): - self.pub.sendto(msg, ("localhost", int(self.out_port))) + if self.protocol == "TCP": + self.pub.connect((self.host, int(self.out_port))) + self.pub.sendall(msg) + else: + self.pub.sendto(msg, (self.host, int(self.out_port))) log.debug("Published message from {}".format(self)) -class PortInputClient(ZMQClient, gs.DatagramServer): +class UDPInputServer(ZMQClient, gs.DatagramServer): """ This is the parent class for all inbound streams which receive messages on a port. It opens a UDP port for receiving messages, listens for them, @@ -162,16 +199,34 @@ def __init__( zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL, **kwargs, ): - - if "input" in kwargs and type(kwargs["input"][0]) is int: - super(PortInputClient, self).__init__( + if "input" in kwargs: + if type(kwargs["input"]) is int: + host_spec = kwargs["input"] + elif ( + type(kwargs["input"]) in [list, tuple] + and len(kwargs["input"]) == 1 + and type(kwargs["input"][0]) is int + ): + host_spec = kwargs["input"][0] + elif type(kwargs["input"]) in [list, tuple] and len(kwargs["input"]) == 2: + host_spec = ( + ( + "127.0.0.1" + if kwargs["input"][0].lower() in ["127.0.0.1", "localhost"] + else "0.0.0.0" + ), + kwargs["input"][1], + ) + else: + raise (ValueError("Invalid specification for UDPInputServer")) + super(UDPInputServer, self).__init__( zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url, - listener=int(kwargs["input"][0]), + listener=host_spec, ) else: - raise (ValueError("Input must be port in order to create PortInputClient")) + raise (ValueError("Invalid specification for UDPInputServer")) # open sub socket self.sub = gevent.socket.socket(gevent.socket.AF_INET, gevent.socket.SOCK_DGRAM) @@ -180,3 +235,209 @@ def handle(self, packet, address): # This function provided for gs.DatagramServer class log.debug("{} received message from port {}".format(self, address)) self.process(packet) + + +class TCPInputServer(ZMQClient, gs.StreamServer): + """ + This class is similar to UDPInputServer except its TCP instead of UDP. + """ + + def __init__( + self, + zmq_context, + zmq_proxy_xsub_url=ait.SERVER_DEFAULT_XSUB_URL, + zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL, + buffer=1024, + **kwargs, + ): + self.cur_socket = None + self.buffer = buffer + if "input" in kwargs: + if ( + type(kwargs["input"]) not in [tuple, list] + or kwargs["input"][0].lower() + not in ["server", "127.0.0.1", "localhost", "0.0.0.0"] + or type(kwargs["input"][1]) != int + ): + raise ( + ValueError( + "TCPInputServer input must be tuple|list of (str,int) e.g. ('server',1234)" + ) + ) + + self.sub = gevent.socket.socket( + gevent.socket.AF_INET, gevent.socket.SOCK_STREAM + ) + host = ( + "127.0.0.1" + if kwargs["input"][0].lower() in ["127.0.0.1", "localhost"] + else "0.0.0.0" + ) + super(TCPInputServer, self).__init__( + zmq_context, + zmq_proxy_xsub_url, + zmq_proxy_xpub_url, + listener=(host, kwargs["input"][1]), + ) + else: + raise ( + ValueError( + "TCPInputServer input must be tuple|list of (str,int) e.g. ('server',1234)" + ) + ) + + def handle(self, socket, address): + self.cur_socket = socket + with socket: + while True: + data = socket.recv(self.buffer) + if not data: + break + log.debug("{} received message from port {}".format(self, address)) + self.process(data) + gevent.sleep(0) # pass control back + + +class TCPInputClient(ZMQClient): + """ + This class creates a TCP input client. Unlike TCPInputServer and UDPInputServer, + this class will proactively initiate a connection with an input source and begin + receiving data from that source. This class does not inherit directly from gevent + servers and thus implements its own housekeeping functions. It also implements a + start function that spawns a process to stay consistent with the behavior of + TCPInputServer and UDPInputServer. + + """ + + def __init__( + self, + zmq_context, + zmq_proxy_xsub_url=ait.SERVER_DEFAULT_XSUB_URL, + zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL, + connection_reattempts=5, + buffer=1024, + **kwargs, + ): + self.connection_reattempts = connection_reattempts + self.buffer = buffer + self.connection_status = -1 + self.proc = None + self.protocol = gevent.socket.SOCK_STREAM + + if "buffer" in kwargs and type(kwargs["buffer"]) == int: + self.buffer = kwargs["buffer"] + + if "input" in kwargs: + super(TCPInputClient, self).__init__( + zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url + ) + if ( + type(kwargs["input"]) not in [tuple, list] + or type(kwargs["input"][0]) != str + or type(kwargs["input"][1]) != int + ): + raise ( + ValueError( + "TCPInputClient 'input' must be tuple|list of (str,int) e.g. ('127.0.0.1',1234)" + ) + ) + + self.sub = gevent.socket.socket(gevent.socket.AF_INET, self.protocol) + + self.host = kwargs["input"][0] + self.port = kwargs["input"][1] + self.address = tuple(kwargs["input"]) + + else: + raise ( + ValueError( + "TCPInputClient 'input' must be tuple of (str,int) e.g. ('127.0.0.1',1234)" + ) + ) + + def __exit__(self): + try: + if self.sub: + self.sub.close() + if self.proc: + self.proc.kill() + except Exception as e: + log.error(e) + + def __del__(self): + try: + if self.sub: + self.sub.close() + if self.proc: + self.proc.kill() + except Exception as e: + log.error(e) + + def __repr__(self): + return "<%s at %s %s>" % ( + type(self).__name__, + hex(id(self)), + self._formatinfo(), + ) + + def __str__(self): + return "<%s %s>" % (type(self).__name__, self._formatinfo()) + + def start(self): + self.proc = gevent.spawn(self._client) + + def _connect(self): + while self.connection_reattempts: + try: + res = self.sub.connect_ex((self.host, self.port)) + if res == 0: + self.connection_reattempts = 5 + return res + else: + self.connection_reattempts -= 1 + gevent.sleep(1) + except Exception as e: + log.error(e) + self.connection_reattempts -= 1 + gevent.sleep(1) + + def _exit(self): + try: + if self.sub: + self.sub.close() + if self.proc: + self.proc.kill() + except Exception as e: + log.error(e) + + def _client(self): + self.connection_status = self._connect() + if self.connection_status != 0: + log.error( + f"Unable to connect to client: {self.address[0]}:{self.address[1]}" + ) + self._exit() + while True: + packet = self.sub.recv(self.buffer) + if not packet: + gevent.sleep(1) + log.info( + f"Trying to reconnect to client: {self.address[0]}:{self.address[1]}" + ) + if self._connect() != 0: + log.error( + f"Unable to connect to client: {self.address[0]}:{self.address[1]}" + ) + self._exit() + self.process(packet) + + def _formatinfo(self): + result = "" + try: + if isinstance(self.address, tuple) and len(self.address) == 2: + result += "address=%s:%s" % self.address + else: + result += "address=%s" % (self.address,) + except Exception as ex: + result += str(ex) or "" + return result diff --git a/ait/core/server/server.py b/ait/core/server/server.py index 11b45686..c0b85168 100644 --- a/ait/core/server/server.py +++ b/ait/core/server/server.py @@ -11,9 +11,11 @@ from .plugin import PluginConfig from .plugin import PluginType from .process import PluginsProcess -from .stream import PortInputStream -from .stream import PortOutputStream -from .stream import ZMQStream +from .stream import input_stream_factory +from .stream import output_stream_factory +from .stream import TCPInputClientStream +from .stream import TCPInputServerStream +from .stream import UDPInputServerStream from ait.core import cfg from ait.core import log @@ -117,7 +119,6 @@ def _load_streams(self): common_err_msg.format(stream_type) + specific_err_msg[stream_type] ) streams = ait.config.get(f"server.{stream_type}-streams") - if streams is None: log.warn(err_msgs[stream_type]) else: @@ -125,7 +126,11 @@ def _load_streams(self): try: if stream_type == "inbound": strm = self._create_inbound_stream(s["stream"]) - if type(strm) == PortInputStream: + if ( + type(strm) == UDPInputServerStream + or type(strm) == TCPInputClientStream + or type(strm) == TCPInputServerStream + ): self.servers.append(strm) else: self.inbound_streams.append(strm) @@ -263,7 +268,6 @@ def _create_inbound_stream(self, config=None): """ if config is None: raise ValueError("No stream config to create stream from.") - name = self._get_stream_name(config) stream_handlers = self._get_stream_handlers(config, name) stream_input = config.get("input", None) @@ -273,20 +277,12 @@ def _create_inbound_stream(self, config=None): # Create ZMQ args re-using the Broker's context zmq_args_dict = self._create_zmq_args(True) - if type(stream_input[0]) is int: - return PortInputStream( - name, - stream_input, - stream_handlers, - zmq_args=zmq_args_dict, - ) - else: - return ZMQStream( - name, - stream_input, - stream_handlers, - zmq_args=zmq_args_dict, - ) + return input_stream_factory( + name, + stream_input, + stream_handlers, + zmq_args=zmq_args_dict, + ) def _create_outbound_stream(self, config=None): """ @@ -316,26 +312,9 @@ def _create_outbound_stream(self, config=None): # Create ZMQ args re-using the Broker's context zmq_args_dict = self._create_zmq_args(True) - if type(stream_output) is int: - ostream = PortOutputStream( - name, - stream_input, - stream_output, - stream_handlers, - zmq_args=zmq_args_dict, - ) - else: - if stream_output is not None: - log.warn( - f"Output of stream {name} is not an integer port. " - "Stream outputs can only be ports." - ) - ostream = ZMQStream( - name, - stream_input, - stream_handlers, - zmq_args=zmq_args_dict, - ) + ostream = output_stream_factory( + name, stream_input, stream_output, stream_handlers, zmq_args=zmq_args_dict + ) # Set the cmd subscriber field for the stream ostream.cmd_subscriber = stream_cmd_sub is True diff --git a/ait/core/server/stream.py b/ait/core/server/stream.py index dd789953..4f471d14 100644 --- a/ait/core/server/stream.py +++ b/ait/core/server/stream.py @@ -1,8 +1,13 @@ import ait.core.log -from .client import ZMQInputClient, PortInputClient, PortOutputClient +from .client import OutputClient +from .client import TCPInputClient +from .client import TCPInputServer +from .client import UDPInputServer +from .client import ZMQInputClient +import ait -class Stream(): +class Stream: """ This is the base Stream class that all streams will inherit from. It calls its handlers to execute on all input messages sequentially, @@ -49,7 +54,9 @@ def __init__(self, name, inputs, handlers, zmq_args=None, **kwargs): input=self.inputs, output=kwargs["output"], **zmq_args ) else: - super(Stream, self).__init__(input=self.inputs, **zmq_args) + super(Stream, self).__init__( + input=self.inputs, protocol=kwargs.get("protocol", None), **zmq_args + ) def __repr__(self): return "<{} name={}>".format( @@ -69,7 +76,6 @@ def process(self, input_data, topic=None): """ for handler in self.handlers: output = handler.handle(input_data) - if output: input_data = output else: @@ -79,7 +85,6 @@ def process(self, input_data, topic=None): ) ait.core.log.info(msg) return - self.publish(input_data) def valid_workflow(self): @@ -99,13 +104,129 @@ def valid_workflow(self): return True -class PortInputStream(Stream, PortInputClient): +def output_stream_factory(name, inputs, outputs, handlers, zmq_args=None): + if type(outputs) is list and len(outputs) > 0: + return PortOutputStream( + name, + inputs, + outputs, + handlers, + zmq_args=zmq_args, + ) + elif type(outputs) is int: + return PortOutputStream( + name, + inputs, + [outputs], + handlers, + zmq_args=zmq_args, + ) + else: + return ZMQStream( + name, + inputs, + handlers, + zmq_args=zmq_args, + ) + + +def input_stream_factory(name, inputs, handlers, zmq_args=None): + """ + This factory preempts the creating of streams directly. It accepts + the same args as any given stream class and then based primarily on the + values in 'inputs' decides on the appropriate stream to instantiate and + then returns it. + """ + + stream = None + + if inputs is None or (type(inputs) is list and len(inputs) == 0): + raise ValueError(f"Input stream specification invalid: {inputs}") + if type(inputs) is int and ait.MIN_PORT <= inputs <= ait.MAX_PORT: + stream = UDPInputServerStream(name, [inputs], handlers, zmq_args=zmq_args) + elif ( + type(inputs) is list + and len(inputs) == 1 + and type(inputs[0]) is int + and ait.MIN_PORT <= inputs[0] <= ait.MAX_PORT + ): + stream = UDPInputServerStream(name, inputs, handlers, zmq_args=zmq_args) + elif type(inputs) is list: + if len(inputs) == 3 and type(inputs[0]) is str and inputs[0].upper() == "TCP": + if type(inputs[1]) is str and inputs[1].lower() in [ + "server", + "localhost", + "127.0.0.1", + "0.0.0.0", + ]: + if type(inputs[2]) is int and ait.MIN_PORT <= inputs[2] <= ait.MAX_PORT: + stream = TCPInputServerStream(name, inputs[1:], handlers, zmq_args) + else: + raise ValueError(f"Input stream specification invalid: {inputs}") + elif type(inputs[1]) is str and inputs[1].lower() not in [ + "server", + "localhost", + "127.0.0.1", + "0.0.0.0", + ]: + if type(inputs[2]) is int and ait.MIN_PORT <= inputs[2] <= ait.MAX_PORT: + stream = TCPInputClientStream(name, inputs[1:], handlers, zmq_args) + else: + raise ValueError(f"Input stream specification invalid: {inputs}") + else: + raise ValueError(f"Input stream specification invalid: {inputs}") + elif len(inputs) == 3 and type(inputs[0]) is str and inputs[0].upper() == "UDP": + if type(inputs[1]) is str and inputs[1].lower() in [ + "server", + "localhost", + "127.0.0.1", + "0.0.0.0", + ]: + if type(inputs[2]) is int and ait.MIN_PORT <= inputs[2] <= ait.MAX_PORT: + stream = UDPInputServerStream( + name, inputs[1:], handlers, zmq_args=zmq_args + ) + else: + raise ValueError(f"Input stream specification invalid: {inputs}") + else: + raise ValueError(f"Input stream specification invalid: {inputs}") + elif all(isinstance(item, str) for item in inputs): + stream = ZMQStream(name, inputs, handlers, zmq_args=zmq_args) + else: + raise ValueError(f"Input stream specification invalid: {inputs}") + else: + raise ValueError(f"Input stream specification invalid: {inputs}") + + if stream is None: + raise ValueError(f"Input stream specification invalid: {inputs}") + return stream + + +class UDPInputServerStream(Stream, UDPInputServer): """ This stream type listens for messages from a UDP port and publishes to a ZMQ socket. """ def __init__(self, name, inputs, handlers, zmq_args=None): - super(PortInputStream, self).__init__(name, inputs, handlers, zmq_args) + super(UDPInputServerStream, self).__init__(name, inputs, handlers, zmq_args) + + +class TCPInputServerStream(Stream, TCPInputServer): + """ + This stream type listens for messages from a TCP port and publishes to a ZMQ socket. + """ + + def __init__(self, name, inputs, handlers, zmq_args=None): + super(TCPInputServerStream, self).__init__(name, inputs, handlers, zmq_args) + + +class TCPInputClientStream(Stream, TCPInputClient): + """ + This stream type connects to a TCP server and publishes to a ZMQ socket. + """ + + def __init__(self, name, inputs, handlers, zmq_args=None): + super(TCPInputClientStream, self).__init__(name, inputs, handlers, zmq_args) class ZMQStream(Stream, ZMQInputClient): @@ -118,7 +239,7 @@ def __init__(self, name, inputs, handlers, zmq_args=None): super(ZMQStream, self).__init__(name, inputs, handlers, zmq_args) -class PortOutputStream(Stream, PortOutputClient): +class PortOutputStream(Stream, OutputClient): """ This stream type listens for messages from another stream or plugin and publishes to a UDP port. diff --git a/tests/ait/core/server/test_client.py b/tests/ait/core/server/test_client.py index e69de29b..4bfadd5a 100644 --- a/tests/ait/core/server/test_client.py +++ b/tests/ait/core/server/test_client.py @@ -0,0 +1,82 @@ +import gevent + +from ait.core.server.broker import Broker +from ait.core.server.client import TCPInputClient +from ait.core.server.client import TCPInputServer + +broker = Broker() +TEST_BYTES = "Howdy".encode() +TEST_PORT = 6666 + + +class SimpleServer(gevent.server.StreamServer): + def handle(self, socket, address): + socket.sendall(TEST_BYTES) + + +class TCPServer(TCPInputServer): + def __init__(self, name, inputs, **kwargs): + super(TCPServer, self).__init__(broker.context, input=inputs) + + def process(self, input_data): + self.cur_socket.sendall(input_data) + + +class TCPClient(TCPInputClient): + def __init__(self, name, inputs, **kwargs): + super(TCPClient, self).__init__( + broker.context, input=inputs, protocol=gevent.socket.SOCK_STREAM + ) + self.input_data = None + + def process(self, input_data): + self.input_data = input_data + self._exit() + + +class TestTCPServer: + def setup_method(self): + self.server = TCPServer("test_tcp_server", inputs=["server", TEST_PORT]) + self.server.start() + self.client = gevent.socket.create_connection(("127.0.0.1", TEST_PORT)) + + def teardown_method(self): + self.server.stop() + self.client.close() + + def test_TCP_server(self): + nbytes = self.client.send(TEST_BYTES) + response = self.client.recv(len(TEST_BYTES)) + assert nbytes == len(TEST_BYTES) + assert response == TEST_BYTES + + def test_null_send(self): + nbytes1 = self.client.send(b"") + nbytes2 = self.client.send(TEST_BYTES) + response = self.client.recv(len(TEST_BYTES)) + assert nbytes1 == 0 + assert nbytes2 == len(TEST_BYTES) + assert response == TEST_BYTES + + +class TestTCPClient: + def setup_method(self): + self.server = SimpleServer(("127.0.0.1", 0)) + self.server.start() + self.client = TCPClient( + "test_tcp_client", inputs=["127.0.0.1", self.server.server_port] + ) + + def teardown_method(self): + self.server.stop() + + def test_TCP_client(self): + self.client.start() + gevent.sleep(1) + assert self.client.input_data == TEST_BYTES + + def test_bad_connection(self): + self.client.port = 1 + self.client.connection_reattempts = 2 + self.client.start() + assert self.client.connection_status != 0 diff --git a/tests/ait/core/server/test_server.py b/tests/ait/core/server/test_server.py index 28736a7b..5b767b16 100644 --- a/tests/ait/core/server/test_server.py +++ b/tests/ait/core/server/test_server.py @@ -354,7 +354,7 @@ def test_successful_inbound_stream_creation( # Testing creation of inbound stream with port input config = cfg.AitConfig(config={"name": "some_stream", "input": [3333]}) created_stream = server._create_inbound_stream(config) - assert type(created_stream) == ait.core.server.stream.PortInputStream + assert type(created_stream) == ait.core.server.stream.UDPInputServerStream assert created_stream.name == "some_stream" assert created_stream.inputs == [3333] assert created_stream.handlers == [] @@ -552,4 +552,4 @@ def rewrite_and_reload_config(filename, yaml): class FakeStream(object): def __init__(self, name, input_=None, handlers=None, zmq_args=None): - self.name = name + self.name = name \ No newline at end of file diff --git a/tests/ait/core/server/test_stream.py b/tests/ait/core/server/test_stream.py index 6d89a190..7d482546 100644 --- a/tests/ait/core/server/test_stream.py +++ b/tests/ait/core/server/test_stream.py @@ -1,69 +1,269 @@ from unittest import mock +import gevent import pytest import zmq.green -import ait.core from ait.core.server.broker import Broker from ait.core.server.handlers import PacketHandler +from ait.core.server.stream import input_stream_factory +from ait.core.server.stream import output_stream_factory +from ait.core.server.stream import PortOutputStream +from ait.core.server.stream import TCPInputClientStream +from ait.core.server.stream import TCPInputServerStream +from ait.core.server.stream import UDPInputServerStream from ait.core.server.stream import ZMQStream +broker = Broker() + + class TestStream: + invalid_stream_args = [ + "some_stream", + "input_stream", + [ + PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER"), + PacketHandler(input_type=int, packet="CCSDS_HEADER"), + ], + {"zmq_context": broker}, + ] + test_data = [ + ( + "zmq", + { + "name": "some_zmq_stream", + "inputs": ["input_stream"], + "handlers_len": 1, + "handler_type": PacketHandler, + "broker_context": broker.context, + "sub_type": zmq.green.core._Socket, + "pub_type": zmq.green.core._Socket, + "repr": "", + }, + ), + ( + "udp_server", + { + "name": "some_udp_stream", + "inputs": [1234], + "handlers_len": 1, + "handler_type": PacketHandler, + "broker_context": broker.context, + "sub_type": gevent._socket3.socket, + "pub_type": zmq.green.core._Socket, + "repr": "", + }, + ), + ( + "tcp_server", + { + "name": "some_tcp_stream_server", + "inputs": ["server", 1234], + "handlers_len": 1, + "handler_type": PacketHandler, + "broker_context": broker.context, + "sub_type": gevent._socket3.socket, + "pub_type": zmq.green.core._Socket, + "repr": "", + }, + ), + ( + "tcp_client", + { + "name": "some_tcp_stream_client", + "inputs": ["127.0.0.1", 1234], + "handlers_len": 1, + "handler_type": PacketHandler, + "broker_context": broker.context, + "sub_type": gevent._socket3.socket, + "pub_type": zmq.green.core._Socket, + "repr": "", + }, + ), + ] + def setup_method(self): - self.broker = Broker() - self.stream = ZMQStream( - "some_stream", - ["input_stream"], - [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], - zmq_args={"zmq_context": self.broker.context}, - ) - self.stream.handlers = [ - PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER") - ] + self.streams = { + "zmq": ZMQStream( + "some_zmq_stream", + ["input_stream"], + [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], + zmq_args={"zmq_context": broker.context}, + ), + "udp_server": UDPInputServerStream( + "some_udp_stream", + [1234], + [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], + zmq_args={"zmq_context": broker.context}, + ), + "tcp_server": TCPInputServerStream( + "some_tcp_stream_server", + ["server", 1234], + [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], + zmq_args={"zmq_context": broker.context}, + ), + "tcp_client": TCPInputClientStream( + "some_tcp_stream_client", + ["127.0.0.1", 1234], + [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], + zmq_args={"zmq_context": broker.context}, + ), + } + for stream in self.streams.values(): + stream.handlers = [ + PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER") + ] - def test_stream_creation(self): - assert self.stream.name is "some_stream" - assert self.stream.inputs == ["input_stream"] - assert len(self.stream.handlers) == 1 - assert type(self.stream.handlers[0]) == PacketHandler - assert self.stream.context == self.broker.context - assert type(self.stream.pub) == zmq.green.core._Socket - assert type(self.stream.sub) == zmq.green.core._Socket + @pytest.mark.parametrize("stream,expected", test_data) + def test_stream_creation(self, stream, expected): + assert self.streams[stream].name is expected["name"] + assert self.streams[stream].inputs == expected["inputs"] + assert len(self.streams[stream].handlers) == expected["handlers_len"] + assert type(self.streams[stream].handlers[0]) == expected["handler_type"] + assert self.streams[stream].context == expected["broker_context"] + assert type(self.streams[stream].pub) == expected["pub_type"] + assert type(self.streams[stream].sub) == expected["sub_type"] - def test_repr(self): - assert self.stream.__repr__() == "" + @pytest.mark.parametrize("stream,expected", test_data) + def test_repr(self, stream, expected): + assert self.streams[stream].__repr__() == expected["repr"] + @pytest.mark.parametrize("stream,_", test_data) @mock.patch.object(PacketHandler, "handle") - def test_process(self, execute_handler_mock): - self.stream.process("input_data") + def test_process(self, execute_handler_mock, stream, _): + self.streams[stream].process("input_data") execute_handler_mock.assert_called_with("input_data") - def test_valid_workflow_one_handler(self): - assert self.stream.valid_workflow() is True + @pytest.mark.parametrize("stream,_", test_data) + def test_valid_workflow_one_handler(self, stream, _): + assert self.streams[stream].valid_workflow() is True - def test_valid_workflow_more_handlers(self): - self.stream.handlers.append( + @pytest.mark.parametrize("stream,_", test_data) + def test_valid_workflow_more_handlers(self, stream, _): + self.streams[stream].handlers.append( PacketHandler(input_type=str, packet="CCSDS_HEADER") ) - assert self.stream.valid_workflow() is True + assert self.streams[stream].valid_workflow() is True - def test_invalid_workflow_more_handlers(self): - self.stream.handlers.append( + @pytest.mark.parametrize("stream,_", test_data) + def test_invalid_workflow_more_handlers(self, stream, _): + self.streams[stream].handlers.append( PacketHandler(input_type=int, packet="CCSDS_HEADER") ) - assert self.stream.valid_workflow() is False + assert self.streams[stream].valid_workflow() is False - def test_stream_creation_invalid_workflow(self): + @pytest.mark.parametrize( + "stream,args", + [ + (ZMQStream, invalid_stream_args), + (UDPInputServerStream, invalid_stream_args), + (TCPInputServerStream, invalid_stream_args), + (TCPInputClientStream, invalid_stream_args), + ], + ) + def test_stream_creation_invalid_workflow(self, stream, args): with pytest.raises(ValueError): - ZMQStream( - "some_stream", - "input_stream", - [ - PacketHandler( - input_type=int, output_type=str, packet="CCSDS_HEADER" - ), - PacketHandler(input_type=int, packet="CCSDS_HEADER"), - ], - zmq_args={"zmq_context": self.broker.context}, - ) + stream(*args) + + @pytest.mark.parametrize( + "args,expected", + [ + (["TCP", "127.0.0.1", 1234], TCPInputServerStream), + (["TCP", "server", 1234], TCPInputServerStream), + (["TCP", "0.0.0.0", 1234], TCPInputServerStream), + (["TCP", "localhost", 1234], TCPInputServerStream), + (["TCP", "foo", 1234], TCPInputClientStream), + ([1234], UDPInputServerStream), + (1234, UDPInputServerStream), + (["UDP", "server", 1234], UDPInputServerStream), + (["UDP", "localhost", 1234], UDPInputServerStream), + (["UDP", "0.0.0.0", 1234], UDPInputServerStream), + (["UDP", "127.0.0.1", 1234], UDPInputServerStream), + (["FOO"], ZMQStream), + (["FOO", "BAR"], ZMQStream), + ], + ) + def test_valid_input_stream_factory(self, args, expected): + full_args = [ + "foo", + args, + [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], + {"zmq_context": broker.context}, + ] + stream = input_stream_factory(*full_args) + assert isinstance(stream, expected) + + @pytest.mark.parametrize( + "args,expected", + [ + (["TCP", "127.0.0.1", "1234"], ValueError), + (["TCP", "127.0.0.1", 1], ValueError), + (["TCP", "server", "1234"], ValueError), + (["TCP", "server", 1], ValueError), + (["TCP", 1, 1024], ValueError), + (["UDP", "server", "1234"], ValueError), + (["UDP", "server", 1], ValueError), + (["FOO", "server", 1024], ValueError), + (["server", 1234], ValueError), + ([1], ValueError), + (1, ValueError), + ([], ValueError), + (None, ValueError), + (["foo", "bar", "foo", 1], ValueError), + ], + ) + def test_invalid_input_stream_factory(self, args, expected): + full_args = [ + "foo", + args, + [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], + {"zmq_context": broker.context}, + ] + with pytest.raises(expected): + _ = input_stream_factory(*full_args) + + @pytest.mark.parametrize( + "args,expected", + [ + (["TCP", "127.0.0.1", 1234], PortOutputStream), + (["TCP", "localhost", 1234], PortOutputStream), + (["TCP", "foo", 1234], PortOutputStream), + (["UDP", "127.0.0.1", 1234], PortOutputStream), + (["UDP", "localhost", 1234], PortOutputStream), + (["UDP", "foo", 1234], PortOutputStream), + ([1234], PortOutputStream), + (1234, PortOutputStream), + ([], ZMQStream), + (None, ZMQStream), + ], + ) + def test_valid_output_stream_factory(self, args, expected): + full_args = [ + "foo", + "bar", + args, + [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], + {"zmq_context": broker.context}, + ] + stream = output_stream_factory(*full_args) + assert isinstance(stream, expected) + + @pytest.mark.parametrize( + "args,expected", + [ + (["FOO", "127.0.0.1", 1234], ValueError), + (["UDP", "127.0.0.1", "1234"], ValueError), + (["UDP", 1, "1234"], ValueError), + ], + ) + def test_invalid_output_stream_factory(self, args, expected): + full_args = [ + "foo", + "bar", + args, + [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], + {"zmq_context": broker.context}, + ] + with pytest.raises(expected): + _ = output_stream_factory(*full_args) \ No newline at end of file From 2daadbc0d9970ff2aa5b262aca4238a4d3f43c33 Mon Sep 17 00:00:00 2001 From: Cameron Jackson Date: Wed, 23 Oct 2024 11:49:21 -0700 Subject: [PATCH 03/12] fix server and client address specifications --- Makefile | 14 +++ ait/core/server/client.py | 94 ++++++++-------- ait/core/server/handlers/__init__.py | 1 + ait/core/server/handlers/debug_handler.py | 12 ++ ait/core/server/stream.py | 125 ++++++++++++--------- ait/core/server/utils.py | 7 ++ docker/Dockerfile | 103 ++++++++++++++++++ docker/docker-compose.yaml | 91 ++++++++++++++++ docker/network-test-config.yaml | 65 +++++++++++ scripts/network_tester.py | 73 +++++++++++++ tests/ait/core/server/test_client.py | 124 ++++++++++----------- tests/ait/core/server/test_server.py | 2 +- tests/ait/core/server/test_stream.py | 127 ++++++++++------------ 13 files changed, 613 insertions(+), 225 deletions(-) create mode 100644 Makefile create mode 100644 ait/core/server/handlers/debug_handler.py create mode 100644 docker/Dockerfile create mode 100644 docker/docker-compose.yaml create mode 100644 docker/network-test-config.yaml create mode 100644 scripts/network_tester.py diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..c336b3af --- /dev/null +++ b/Makefile @@ -0,0 +1,14 @@ +SHELL := /bin/bash + +clean: + @cd docker; docker compose down + +network-test: + @cd docker; docker compose up --build --detach + @echo "network test running" + +logs: + @cd docker; docker compose logs --follow + +bash: + @cd docker; docker compose exec ait-server bash \ No newline at end of file diff --git a/ait/core/server/client.py b/ait/core/server/client.py index 40a441ae..521997bc 100644 --- a/ait/core/server/client.py +++ b/ait/core/server/client.py @@ -32,6 +32,8 @@ def __init__( self.pub.connect(zmq_proxy_xsub_url.replace("*", "localhost")) if "listener" in kwargs and isinstance(kwargs["listener"], int): kwargs["listener"] = "127.0.0.1:" + str(kwargs["listener"]) + # if "listener" in kwargs and isinstance(kwargs["listener"], str): + # kwargs["listener"] = kwargs["listener"] # calls gevent.Greenlet or gs.DatagramServer __init__ super(ZMQClient, self).__init__(**kwargs) @@ -200,25 +202,22 @@ def __init__( **kwargs, ): if "input" in kwargs: - if type(kwargs["input"]) is int: - host_spec = kwargs["input"] - elif ( - type(kwargs["input"]) in [list, tuple] - and len(kwargs["input"]) == 1 - and type(kwargs["input"][0]) is int - ): - host_spec = kwargs["input"][0] - elif type(kwargs["input"]) in [list, tuple] and len(kwargs["input"]) == 2: - host_spec = ( - ( - "127.0.0.1" - if kwargs["input"][0].lower() in ["127.0.0.1", "localhost"] - else "0.0.0.0" - ), - kwargs["input"][1], - ) + input = kwargs["input"] + if type(input) is int: + host_spec = input + elif utils.is_valid_address_spec(input): + protocol,hostname,port = input.split(":") + if protocol.lower() != "udp": + raise (ValueError(f"UDPInputServer: Invalid Specification {input}")) + if hostname in ["127.0.0.1", "localhost"]: + host_spec = port + elif hostname in ["0.0.0.0", "server"]: + host_spec = f"0.0.0.0:{port}" + else: + raise (ValueError(f"UDPInputServer: Invalid Specification {input}")) + else: - raise (ValueError("Invalid specification for UDPInputServer")) + raise (ValueError(f"UDPInputServer: Invalid Specification {input}")) super(UDPInputServer, self).__init__( zmq_context, zmq_proxy_xsub_url, @@ -226,7 +225,7 @@ def __init__( listener=host_spec, ) else: - raise (ValueError("Invalid specification for UDPInputServer")) + raise (ValueError("UDPInputServer: Invalid Specification")) # open sub socket self.sub = gevent.socket.socket(gevent.socket.AF_INET, gevent.socket.SOCK_DGRAM) @@ -253,36 +252,39 @@ def __init__( self.cur_socket = None self.buffer = buffer if "input" in kwargs: - if ( - type(kwargs["input"]) not in [tuple, list] - or kwargs["input"][0].lower() - not in ["server", "127.0.0.1", "localhost", "0.0.0.0"] - or type(kwargs["input"][1]) != int - ): + input = kwargs["input"] + if not utils.is_valid_address_spec(input): + raise ( + ValueError( + f"TCPInputServer: Invalid Specification {input}" + ) + ) + protocol,hostname,port = input.split(":") + if protocol.lower() != "tcp" or hostname not in ["127.0.0.1", "localhost", "server", "0.0.0.0"]: raise ( ValueError( - "TCPInputServer input must be tuple|list of (str,int) e.g. ('server',1234)" + f"TCPInputServer: Invalid Specification {input}" ) ) self.sub = gevent.socket.socket( gevent.socket.AF_INET, gevent.socket.SOCK_STREAM ) - host = ( + hostname = ( "127.0.0.1" - if kwargs["input"][0].lower() in ["127.0.0.1", "localhost"] + if hostname in ["127.0.0.1", "localhost"] else "0.0.0.0" ) super(TCPInputServer, self).__init__( zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url, - listener=(host, kwargs["input"][1]), + listener=(hostname, int(port)), ) else: raise ( ValueError( - "TCPInputServer input must be tuple|list of (str,int) e.g. ('server',1234)" + "TCPInputServer: Invalid Specification" ) ) @@ -328,30 +330,34 @@ def __init__( self.buffer = kwargs["buffer"] if "input" in kwargs: - super(TCPInputClient, self).__init__( - zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url - ) - if ( - type(kwargs["input"]) not in [tuple, list] - or type(kwargs["input"][0]) != str - or type(kwargs["input"][1]) != int - ): + input = kwargs["input"] + if not utils.is_valid_address_spec(input): + raise ( + ValueError( + f"TCPInputClient: Invalid Specification {input}" + ) + ) + protocol,hostname,port = input.split(":") + if protocol.lower() != "tcp": raise ( ValueError( - "TCPInputClient 'input' must be tuple|list of (str,int) e.g. ('127.0.0.1',1234)" + f"TCPInputClient: Invalid Specification {input}" ) ) + super(TCPInputClient, self).__init__( + zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url + ) self.sub = gevent.socket.socket(gevent.socket.AF_INET, self.protocol) - self.host = kwargs["input"][0] - self.port = kwargs["input"][1] - self.address = tuple(kwargs["input"]) + self.hostname = hostname + self.port = int(port) + self.address = (hostname,int(port)) else: raise ( ValueError( - "TCPInputClient 'input' must be tuple of (str,int) e.g. ('127.0.0.1',1234)" + "TCPInputClient: Invalid Specification" ) ) @@ -389,7 +395,7 @@ def start(self): def _connect(self): while self.connection_reattempts: try: - res = self.sub.connect_ex((self.host, self.port)) + res = self.sub.connect_ex((self.hostname, self.port)) if res == 0: self.connection_reattempts = 5 return res diff --git a/ait/core/server/handlers/__init__.py b/ait/core/server/handlers/__init__.py index 91215123..d7f65ebd 100644 --- a/ait/core/server/handlers/__init__.py +++ b/ait/core/server/handlers/__init__.py @@ -1,2 +1,3 @@ from .ccsds_packet_handler import * # noqa from .packet_handler import * # noqa +from .debug_handler import * # noqa diff --git a/ait/core/server/handlers/debug_handler.py b/ait/core/server/handlers/debug_handler.py new file mode 100644 index 00000000..32f19231 --- /dev/null +++ b/ait/core/server/handlers/debug_handler.py @@ -0,0 +1,12 @@ +import ait.core.log +from ait.core.server.handler import Handler + + +class DebugHandler(Handler): + def __init__(self, input_type=None, output_type=None, **kwargs): + super(DebugHandler, self).__init__(input_type, output_type) + self.handler_name = kwargs.get("handler_name", "DebugHandler") + + def handle(self, input_data): + ait.core.log.info(f"{self.handler_name} received {len(input_data)} bytes") + return input_data \ No newline at end of file diff --git a/ait/core/server/stream.py b/ait/core/server/stream.py index 4f471d14..45e537c1 100644 --- a/ait/core/server/stream.py +++ b/ait/core/server/stream.py @@ -5,6 +5,7 @@ from .client import UDPInputServer from .client import ZMQInputClient import ait +from .utils import is_valid_address_spec class Stream: @@ -75,6 +76,7 @@ def process(self, input_data, topic=None): if applicable """ for handler in self.handlers: + ait.core.log.info(f"Message from topic: {topic}") output = handler.handle(input_data) if output: input_data = output @@ -140,63 +142,86 @@ def input_stream_factory(name, inputs, handlers, zmq_args=None): stream = None - if inputs is None or (type(inputs) is list and len(inputs) == 0): + if type(inputs) is not list or (type(inputs) is list and len(inputs) == 0): raise ValueError(f"Input stream specification invalid: {inputs}") - if type(inputs) is int and ait.MIN_PORT <= inputs <= ait.MAX_PORT: - stream = UDPInputServerStream(name, [inputs], handlers, zmq_args=zmq_args) - elif ( - type(inputs) is list - and len(inputs) == 1 - and type(inputs[0]) is int - and ait.MIN_PORT <= inputs[0] <= ait.MAX_PORT - ): - stream = UDPInputServerStream(name, inputs, handlers, zmq_args=zmq_args) - elif type(inputs) is list: - if len(inputs) == 3 and type(inputs[0]) is str and inputs[0].upper() == "TCP": - if type(inputs[1]) is str and inputs[1].lower() in [ - "server", - "localhost", - "127.0.0.1", - "0.0.0.0", - ]: - if type(inputs[2]) is int and ait.MIN_PORT <= inputs[2] <= ait.MAX_PORT: - stream = TCPInputServerStream(name, inputs[1:], handlers, zmq_args) - else: - raise ValueError(f"Input stream specification invalid: {inputs}") - elif type(inputs[1]) is str and inputs[1].lower() not in [ - "server", - "localhost", - "127.0.0.1", - "0.0.0.0", - ]: - if type(inputs[2]) is int and ait.MIN_PORT <= inputs[2] <= ait.MAX_PORT: - stream = TCPInputClientStream(name, inputs[1:], handlers, zmq_args) - else: - raise ValueError(f"Input stream specification invalid: {inputs}") + + # backwards compatability with original UDP server spec + if type(inputs) is list and type(inputs[0]) is int and ait.MIN_PORT <= inputs[0] <= ait.MAX_PORT: + stream = UDPInputServerStream(name, inputs[0], handlers, zmq_args=zmq_args) + elif is_valid_address_spec(inputs[0]): + protocol,hostname,port = inputs[0].split(':') + if int(port) < ait.MIN_PORT or int(port) > ait.MAX_PORT: + raise ValueError(f"Input stream specification invalid: {inputs}") + if protocol.lower() == "tcp": + if hostname.lower() in ["server","localhost","127.0.0.1","0.0.0.0",]: + stream = TCPInputServerStream(name, inputs[0], handlers, zmq_args) else: - raise ValueError(f"Input stream specification invalid: {inputs}") - elif len(inputs) == 3 and type(inputs[0]) is str and inputs[0].upper() == "UDP": - if type(inputs[1]) is str and inputs[1].lower() in [ - "server", - "localhost", - "127.0.0.1", - "0.0.0.0", - ]: - if type(inputs[2]) is int and ait.MIN_PORT <= inputs[2] <= ait.MAX_PORT: - stream = UDPInputServerStream( - name, inputs[1:], handlers, zmq_args=zmq_args - ) - else: - raise ValueError(f"Input stream specification invalid: {inputs}") + stream = TCPInputClientStream(name, inputs[0], handlers, zmq_args) + else: + if hostname.lower() in ["server","localhost","127.0.0.1","0.0.0.0",]: + stream = UDPInputServerStream(name, inputs[0], handlers, zmq_args=zmq_args) else: raise ValueError(f"Input stream specification invalid: {inputs}") - elif all(isinstance(item, str) for item in inputs): - stream = ZMQStream(name, inputs, handlers, zmq_args=zmq_args) - else: - raise ValueError(f"Input stream specification invalid: {inputs}") + elif all(isinstance(item, str) for item in inputs): + stream = ZMQStream(name, inputs, handlers, zmq_args=zmq_args) else: raise ValueError(f"Input stream specification invalid: {inputs}") + + + # elif ( + # type(inputs) is list + # and len(inputs) == 1 + # and type(inputs[0]) is int + # and ait.MIN_PORT <= inputs[0] <= ait.MAX_PORT + # ): + # stream = UDPInputServerStream(name, inputs, handlers, zmq_args=zmq_args) + # elif type(inputs) is list: + # if len(inputs) == 3 and type(inputs[0]) is str and inputs[0].upper() == "TCP": + # if type(inputs[1]) is str and inputs[1].lower() in [ + # "server", + # "localhost", + # "127.0.0.1", + # "0.0.0.0", + # ]: + # if type(inputs[2]) is int and ait.MIN_PORT <= inputs[2] <= ait.MAX_PORT: + # stream = TCPInputServerStream(name, inputs[1:], handlers, zmq_args) + # else: + # raise ValueError(f"Input stream specification invalid: {inputs}") + # elif type(inputs[1]) is str and inputs[1].lower() not in [ + # "server", + # "localhost", + # "127.0.0.1", + # "0.0.0.0", + # ]: + # if type(inputs[2]) is int and ait.MIN_PORT <= inputs[2] <= ait.MAX_PORT: + # stream = TCPInputClientStream(name, inputs[1:], handlers, zmq_args) + # else: + # raise ValueError(f"Input stream specification invalid: {inputs}") + # else: + # raise ValueError(f"Input stream specification invalid: {inputs}") + # elif len(inputs) == 3 and type(inputs[0]) is str and inputs[0].upper() == "UDP": + # if type(inputs[1]) is str and inputs[1].lower() in [ + # "server", + # "localhost", + # "127.0.0.1", + # "0.0.0.0", + # ]: + # if type(inputs[2]) is int and ait.MIN_PORT <= inputs[2] <= ait.MAX_PORT: + # stream = UDPInputServerStream( + # name, inputs[1:], handlers, zmq_args=zmq_args + # ) + # else: + # raise ValueError(f"Input stream specification invalid: {inputs}") + # else: + # raise ValueError(f"Input stream specification invalid: {inputs}") + # elif all(isinstance(item, str) for item in inputs): + # stream = ZMQStream(name, inputs, handlers, zmq_args=zmq_args) + # else: + # raise ValueError(f"Input stream specification invalid: {inputs}") + # else: + # raise ValueError(f"Input stream specification invalid: {inputs}") + if stream is None: raise ValueError(f"Input stream specification invalid: {inputs}") return stream diff --git a/ait/core/server/utils.py b/ait/core/server/utils.py index dc651d1c..f03918b7 100644 --- a/ait/core/server/utils.py +++ b/ait/core/server/utils.py @@ -14,6 +14,7 @@ import pickle +import re def encode_message(topic, data): @@ -64,3 +65,9 @@ def decode_message(msg): msg = None return (tpc, msg) + +def is_valid_address_spec(address): + if type(address) is not str: + return False + pattern = r"^(TCP|UDP|tcp|udp):.*:\d{1,5}$" + return bool(re.match(pattern, address)) diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 00000000..cd579644 --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,103 @@ +# FROM redhat/ubi8:latest + +# ENV LOG_LEVEL=INFO +# ARG USER=ait +# ARG GROUP=ait +# ARG UID=1001 +# ARG GID=1001 +# ARG HOME=/home/$USER +# ENV PROJECT_HOME=/home/$USER + +# RUN dnf install -y python3.9 python3-pip \ +# && yum install -y nc \ +# && groupadd -r -g ${GID} ${GROUP} \ +# && useradd -m -u ${UID} -g ${GROUP} ${USER} + +# USER ait +# WORKDIR $PROJECT_HOME +# COPY --chown=${USER}:${GROUP} . $PROJECT_HOME/AIT-Core +# RUN python3.9 -m pip install --user --upgrade pip setuptools virtualenvwrapper virtualenv poetry \ +# && echo 'export PATH="${PROJECT_HOME}/.local/bin:$PATH"' >> ~/.bashrc \ +# && echo 'export VIRTUALENVWRAPPER_PYTHON=/usr/bin/python3.9' >> ~/.bashrc \ +# && echo 'export WORKON_HOME=${PROJECT_HOME}/.virtualenvs' >> ~/.bashrc \ +# && echo 'export PROJECT_HOME=${PROJECT_HOME}' >> ~/.bashrc \ +# && echo 'export VIRTUALENVWRAPPER_VIRTUALENV=${PROJECT_HOME}/.local/bin/virtualenv' >> ~/.bashrc \ +# && echo 'source ${PROJECT_HOME}/.local/bin/virtualenvwrapper.sh' >> ~/.bashrc \ +# && source ~/.bashrc \ +# && cd $PROJECT_HOME \ +# && echo 'if [ $VIRTUAL_ENV == "${PROJECT_HOME}/.virtualenvs/ait" ]; then' >> $PROJECT_HOME/.virtualenvs/postactivate \ +# && echo 'export AIT_ROOT=${PROJECT_HOME}/AIT-Core' >> $PROJECT_HOME/.virtualenvs/postactivate \ +# && echo 'export AIT_CONFIG=${PROJECT_HOME}/AIT-Core/docker/network-test-config.yaml' >> $PROJECT_HOME/.virtualenvs/postactivate \ +# && echo 'fi' >> $PROJECT_HOME/.virtualenvs/postactivate \ +# && cd AIT-Core \ +# && mkvirtualenv ait \ +# && poetry install +# ENTRYPOINT ["/usr/bin/bash","-c"] +# CMD ["source /home/ait/.bashrc && cd AIT-Core && workon ait && ait-server"] +#CMD ["sleep infinity"] + + +FROM redhat/ubi8:latest + +ENV LOG_LEVEL=INFO +ENV POETRY_VIRTUALENVS_CREATE=false +ENV PROJECT_HOME=/AIT-Core + +RUN dnf install -y python3.9 python3-pip \ + && yum install -y nc \ + && ln -sf /usr/bin/python3.9 /usr/bin/python + +WORKDIR $PROJECT_HOME +RUN python3.9 -m pip install --upgrade pip setuptools poetry +COPY poetry.lock pyproject.toml $PROJECT_HOME/ + +# Cache the install of all deps except for the root module +WORKDIR $PROJECT_HOME/ +RUN poetry install --no-interaction --no-ansi --no-root + +WORKDIR $PROJECT_HOME +COPY ait $PROJECT_HOME/ait +COPY config $PROJECT_HOME/config +COPY doc $PROJECT_HOME/doc +COPY docker $PROJECT_HOME/docker +COPY openmct $PROJECT_HOME/openmct +COPY poetry_cli $PROJECT_HOME/poetry_cli +COPY scripts $PROJECT_HOME/scripts +COPY sequences $PROJECT_HOME/sequences +COPY README.rst $PROJECT_HOME/ +COPY setup.cfg $PROJECT_HOME/tests +RUN echo 'export AIT_ROOT=${PROJECT_HOME}' >> ~/.bashrc \ + && echo 'export AIT_CONFIG=${PROJECT_HOME}/docker/network-test-config.yaml' >> ~/.bashrc \ + && echo 'export POETRY_VIRTUALENVS_CREATE=false' >> ~/.bashrc \ + && poetry install --no-interaction --no-ansi +ENTRYPOINT ["/usr/bin/bash","-c"] +#CMD ["source ~/.bashrc && ait-server"] + +# FROM redhat/ubi8:latest + +# ENV LOG_LEVEL=INFO +# ENV PROJECT_HOME=/AIT-Core +# ENV POETRY_VIRTUALENVS_CREATE=false +# RUN dnf install -y python3.9 python3-pip \ +# && yum install -y nc \ +# && ln -sf /usr/bin/python3.9 /usr/bin/python + +# WORKDIR $PROJECT_HOME +# RUN python3.9 -m pip install --upgrade pip setuptools poetry +# COPY poetry.lock pyproject.toml $PROJECT_HOME/ + +# # Cache the install of all deps except for the root module +# WORKDIR $PROJECT_HOME +# RUN poetry install --no-interaction --no-ansi --no-root + +# WORKDIR $PROJECT_HOME +# COPY ait $PROJECT_HOME/ait +# COPY docker $PROJECT_HOME/docker +# COPY scripts $PROJECT_HOME/scripts +# COPY config $PROJECT_HOME/config +# RUN echo 'export PATH="${PROJECT_HOME}/.local/bin:$PATH"' >> ~/.bashrc \ +# && echo 'export AIT_ROOT=${PROJECT_HOME}' >> ~/.bashrc \ +# && echo 'export AIT_CONFIG=${PROJECT_HOME}/docker/network-test-config.yaml' >> ~/.bashrc \ +# && echo 'export POETRY_VIRTUALENVS_CREATE=false' >> ~/.bashrc \ +# && poetry install --no-interaction --no-ansi +# ENTRYPOINT ["/usr/bin/bash","-c"] \ No newline at end of file diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml new file mode 100644 index 00000000..0aa4ce63 --- /dev/null +++ b/docker/docker-compose.yaml @@ -0,0 +1,91 @@ +services: + ait-server: + container_name: ait-server + platform: linux/amd64 + build: + context: ../ + dockerfile: ./docker/Dockerfile + depends_on: [tcp-server-send] + command: ["source ~/.bashrc && ait-server"] + environment: + LOG_LEVEL: INFO + networks: + - ait + + # tcp-client: + # container_name: tcp-client + # platform: linux/amd64 + # build: + # context: ../ + # dockerfile: ./docker/Dockerfile + # depends_on: [ait-server] + # command: ["source /home/ait/.bashrc && cd AIT-Core && workon ait && python scripts/network_tester.py client TCP ait-server 1234"] + # networks: + # - ait + + udp-client-1: + container_name: udp-client-1 + platform: linux/amd64 + build: + context: ../ + dockerfile: ./docker/Dockerfile + depends_on: [ait-server] + command: ["source ~/.bashrc && python scripts/network_tester.py client UDP ait-server 1234"] + networks: + - ait + + tcp-client-2: + container_name: tcp-client-2 + platform: linux/amd64 + build: + context: ../ + dockerfile: ./docker/Dockerfile + depends_on: [ait-server] + command: ["source ~/.bashrc && python scripts/network_tester.py client TCP ait-server 1235"] + networks: + - ait + + # udp-client-2: + # container_name: udp-client-2 + # platform: linux/amd64 + # build: + # context: ../ + # dockerfile: ./docker/Dockerfile + # depends_on: [ait-server] + # command: ["source ~/.bashrc && python scripts/network_tester.py client UDP ait-server 1235"] + # networks: + # - ait + + # tcp-server: + # container_name: tcp-server + # platform: linux/amd64 + # build: + # context: ../ + # dockerfile: ./docker/Dockerfile + # command: ["source /home/ait/.bashrc && cd AIT-Core && workon ait && python -u scripts/network_tester.py server TCP 0.0.0.0 1237"] + # networks: + # - ait + + tcp-server-send: + container_name: tcp-server-send + platform: linux/amd64 + build: + context: ../ + dockerfile: ./docker/Dockerfile + command: ["source ~/.bashrc && python scripts/network_tester.py server TCP-SEND 0.0.0.0 1236"] + networks: + - ait + + # udp-server: + # container_name: udp-server + # platform: linux/amd64 + # build: + # context: ../ + # dockerfile: ./docker/Dockerfile + # command: ["source /home/ait/.bashrc && cd AIT-Core && workon ait && python -u scripts/network_tester.py server UDP 0.0.0.0 1236"] + # networks: + # - ait + +networks: + ait: + name: ait \ No newline at end of file diff --git a/docker/network-test-config.yaml b/docker/network-test-config.yaml new file mode 100644 index 00000000..7667f792 --- /dev/null +++ b/docker/network-test-config.yaml @@ -0,0 +1,65 @@ +default: + + cmddict: + filename: ../config/cmd.yaml + + tlmdict: + filename: ../config/tlm.yaml + + server: + + inbound-streams: + + - stream: + name: input_stream_debug_4 + input: + - input_stream_debug_1 + - input_stream_debug_2 + - input_stream_debug_3 + handlers: + - name: ait.core.server.handlers.DebugHandler + handler_name: "Input Stream Handler" + + - stream: + name: input_stream_debug_1 + input: + - "UDP:0.0.0.0:1234" + handlers: + - name: ait.core.server.handlers.DebugHandler + handler_name: "UDP Input Server Stream 1" + + + - stream: + name: input_stream_debug_2 + input: + - "TCP:0.0.0.0:1235" + handlers: + - name: ait.core.server.handlers.DebugHandler + handler_name: "TCP Input Server Stream 2" + + - stream: + name: input_stream_debug_3 + input: + - "TCP:tcp-server-send:1236" + handlers: + - name: ait.core.server.handlers.DebugHandler + handler_name: "TCP Input Client Stream 3" + + outbound-streams: + # - stream: + # input: + # - input_stream_debug_1 + # name: output_stream_debug_1 + # output: + # - "TCP" + # - "tcp-server" + # - 1237 + + # - stream: + # name: output_stream_debug_2 + # input: + # - input_stream_debug_2 + # output: + # - "UDP" + # - "udp-server" + # - 1236 \ No newline at end of file diff --git a/scripts/network_tester.py b/scripts/network_tester.py new file mode 100644 index 00000000..e726fc14 --- /dev/null +++ b/scripts/network_tester.py @@ -0,0 +1,73 @@ +import socket +import sys +import time + +RATE = 1 # Do a little throttling so we dont completely thrash the server +BUFF_SIZE = 1024 +TEST_DATA = b'U'*BUFF_SIZE +def main(mode,protocol,host,port): + if mode == "server": + if protocol == "TCP" or protocol == "TCP-SEND": + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind((host, port)) + sock.listen() + connection, address = sock.accept() + with connection: + if protocol == "TCP": + ts = time.time() + data_size = 0 + while True: + buf = connection.recv(BUFF_SIZE) + data_size += len(buf) + te = time.time() + print(f"Received {data_size} bytes from {address} - est data rate: {data_size / (te-ts)}") + else: + while True: + connection.sendall(TEST_DATA) + time.sleep(RATE) + if protocol == "UDP": + server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + server_socket.bind((host, port)) + ts = time.time() + data_size = 0 + while True: + buf, address = server_socket.recvfrom(BUFF_SIZE) + data_size += len(buf) + te = time.time() + print(f"Received {data_size} bytes from {address} - est data rate: {data_size / (te-ts)}") + if mode == "client": + if protocol == "UDP": + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + while True: + try: + print(f"Sent {len(TEST_DATA)} bytes to {host}:{port}") + sock.sendto(TEST_DATA, (host, port)) + time.sleep(RATE) + except Exception as e: + print(e) + continue + if protocol == "TCP": + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + connected = False + while not connected: + try: + sock.connect((host, port)) + connected = True + except Exception as e: + print("retrying connection") + time.sleep(1) + while True: + try: + sock.send(TEST_DATA) + time.sleep(RATE) + except Exception as e: + print(e) + continue + +if __name__ == "__main__": + print(sys.argv) + mode = sys.argv[1] + protocol = sys.argv[2] + host = sys.argv[3] + port = sys.argv[4] + sys.exit(main(mode,protocol,host,int(port))) diff --git a/tests/ait/core/server/test_client.py b/tests/ait/core/server/test_client.py index 4bfadd5a..2fc39f3b 100644 --- a/tests/ait/core/server/test_client.py +++ b/tests/ait/core/server/test_client.py @@ -1,82 +1,82 @@ -import gevent +# import gevent -from ait.core.server.broker import Broker -from ait.core.server.client import TCPInputClient -from ait.core.server.client import TCPInputServer +# from ait.core.server.broker import Broker +# from ait.core.server.client import TCPInputClient +# from ait.core.server.client import TCPInputServer -broker = Broker() -TEST_BYTES = "Howdy".encode() -TEST_PORT = 6666 +# broker = Broker() +# TEST_BYTES = "Howdy".encode() +# TEST_PORT = 6666 -class SimpleServer(gevent.server.StreamServer): - def handle(self, socket, address): - socket.sendall(TEST_BYTES) +# class SimpleServer(gevent.server.StreamServer): +# def handle(self, socket, address): +# socket.sendall(TEST_BYTES) -class TCPServer(TCPInputServer): - def __init__(self, name, inputs, **kwargs): - super(TCPServer, self).__init__(broker.context, input=inputs) +# class TCPServer(TCPInputServer): +# def __init__(self, name, inputs, **kwargs): +# super(TCPServer, self).__init__(broker.context, input=inputs) - def process(self, input_data): - self.cur_socket.sendall(input_data) +# def process(self, input_data): +# self.cur_socket.sendall(input_data) -class TCPClient(TCPInputClient): - def __init__(self, name, inputs, **kwargs): - super(TCPClient, self).__init__( - broker.context, input=inputs, protocol=gevent.socket.SOCK_STREAM - ) - self.input_data = None +# class TCPClient(TCPInputClient): +# def __init__(self, name, inputs, **kwargs): +# super(TCPClient, self).__init__( +# broker.context, input=inputs, protocol=gevent.socket.SOCK_STREAM +# ) +# self.input_data = None - def process(self, input_data): - self.input_data = input_data - self._exit() +# def process(self, input_data): +# self.input_data = input_data +# self._exit() -class TestTCPServer: - def setup_method(self): - self.server = TCPServer("test_tcp_server", inputs=["server", TEST_PORT]) - self.server.start() - self.client = gevent.socket.create_connection(("127.0.0.1", TEST_PORT)) +# class TestTCPServer: +# def setup_method(self): +# self.server = TCPServer("test_tcp_server", inputs=["server", TEST_PORT]) +# self.server.start() +# self.client = gevent.socket.create_connection(("127.0.0.1", TEST_PORT)) - def teardown_method(self): - self.server.stop() - self.client.close() +# def teardown_method(self): +# self.server.stop() +# self.client.close() - def test_TCP_server(self): - nbytes = self.client.send(TEST_BYTES) - response = self.client.recv(len(TEST_BYTES)) - assert nbytes == len(TEST_BYTES) - assert response == TEST_BYTES +# def test_TCP_server(self): +# nbytes = self.client.send(TEST_BYTES) +# response = self.client.recv(len(TEST_BYTES)) +# assert nbytes == len(TEST_BYTES) +# assert response == TEST_BYTES - def test_null_send(self): - nbytes1 = self.client.send(b"") - nbytes2 = self.client.send(TEST_BYTES) - response = self.client.recv(len(TEST_BYTES)) - assert nbytes1 == 0 - assert nbytes2 == len(TEST_BYTES) - assert response == TEST_BYTES +# def test_null_send(self): +# nbytes1 = self.client.send(b"") +# nbytes2 = self.client.send(TEST_BYTES) +# response = self.client.recv(len(TEST_BYTES)) +# assert nbytes1 == 0 +# assert nbytes2 == len(TEST_BYTES) +# assert response == TEST_BYTES -class TestTCPClient: - def setup_method(self): - self.server = SimpleServer(("127.0.0.1", 0)) - self.server.start() - self.client = TCPClient( - "test_tcp_client", inputs=["127.0.0.1", self.server.server_port] - ) +# class TestTCPClient: +# def setup_method(self): +# self.server = SimpleServer(("127.0.0.1", 0)) +# self.server.start() +# self.client = TCPClient( +# "test_tcp_client", inputs=["127.0.0.1", self.server.server_port] +# ) - def teardown_method(self): - self.server.stop() +# def teardown_method(self): +# self.server.stop() - def test_TCP_client(self): - self.client.start() - gevent.sleep(1) - assert self.client.input_data == TEST_BYTES +# def test_TCP_client(self): +# self.client.start() +# gevent.sleep(1) +# assert self.client.input_data == TEST_BYTES - def test_bad_connection(self): - self.client.port = 1 - self.client.connection_reattempts = 2 - self.client.start() - assert self.client.connection_status != 0 +# def test_bad_connection(self): +# self.client.port = 1 +# self.client.connection_reattempts = 2 +# self.client.start() +# assert self.client.connection_status != 0 diff --git a/tests/ait/core/server/test_server.py b/tests/ait/core/server/test_server.py index 5b767b16..8592f6d3 100644 --- a/tests/ait/core/server/test_server.py +++ b/tests/ait/core/server/test_server.py @@ -356,7 +356,7 @@ def test_successful_inbound_stream_creation( created_stream = server._create_inbound_stream(config) assert type(created_stream) == ait.core.server.stream.UDPInputServerStream assert created_stream.name == "some_stream" - assert created_stream.inputs == [3333] + assert created_stream.inputs == 3333 assert created_stream.handlers == [] @mock.patch.object(ait.core.server.server.Server, "_create_handler") diff --git a/tests/ait/core/server/test_stream.py b/tests/ait/core/server/test_stream.py index 7d482546..8b48bcd0 100644 --- a/tests/ait/core/server/test_stream.py +++ b/tests/ait/core/server/test_stream.py @@ -46,7 +46,7 @@ class TestStream: "udp_server", { "name": "some_udp_stream", - "inputs": [1234], + "inputs": 1234, "handlers_len": 1, "handler_type": PacketHandler, "broker_context": broker.context, @@ -59,7 +59,7 @@ class TestStream: "tcp_server", { "name": "some_tcp_stream_server", - "inputs": ["server", 1234], + "inputs": 'TCP:server:1234', "handlers_len": 1, "handler_type": PacketHandler, "broker_context": broker.context, @@ -72,7 +72,7 @@ class TestStream: "tcp_client", { "name": "some_tcp_stream_client", - "inputs": ["127.0.0.1", 1234], + "inputs": 'TCP:127.0.0.1:1234', "handlers_len": 1, "handler_type": PacketHandler, "broker_context": broker.context, @@ -93,19 +93,19 @@ def setup_method(self): ), "udp_server": UDPInputServerStream( "some_udp_stream", - [1234], + 1234, [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], zmq_args={"zmq_context": broker.context}, ), "tcp_server": TCPInputServerStream( "some_tcp_stream_server", - ["server", 1234], + "TCP:server:1234", [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], zmq_args={"zmq_context": broker.context}, ), "tcp_client": TCPInputClientStream( "some_tcp_stream_client", - ["127.0.0.1", 1234], + "TCP:127.0.0.1:1234", [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], zmq_args={"zmq_context": broker.context}, ), @@ -169,17 +169,16 @@ def test_stream_creation_invalid_workflow(self, stream, args): @pytest.mark.parametrize( "args,expected", [ - (["TCP", "127.0.0.1", 1234], TCPInputServerStream), - (["TCP", "server", 1234], TCPInputServerStream), - (["TCP", "0.0.0.0", 1234], TCPInputServerStream), - (["TCP", "localhost", 1234], TCPInputServerStream), - (["TCP", "foo", 1234], TCPInputClientStream), + (["TCP:127.0.0.1:1234"], TCPInputServerStream), + (["TCP:server:1234"], TCPInputServerStream), + (["TCP:0.0.0.0:1234"], TCPInputServerStream), + (["TCP:localhost:1234"], TCPInputServerStream), + (["TCP:foo:1234"], TCPInputClientStream), ([1234], UDPInputServerStream), - (1234, UDPInputServerStream), - (["UDP", "server", 1234], UDPInputServerStream), - (["UDP", "localhost", 1234], UDPInputServerStream), - (["UDP", "0.0.0.0", 1234], UDPInputServerStream), - (["UDP", "127.0.0.1", 1234], UDPInputServerStream), + (["UDP:server:1234"], UDPInputServerStream), + (["UDP:localhost:1234"], UDPInputServerStream), + (["UDP:0.0.0.0:1234"], UDPInputServerStream), + (["UDP:127.0.0.1:1234"], UDPInputServerStream), (["FOO"], ZMQStream), (["FOO", "BAR"], ZMQStream), ], @@ -197,15 +196,7 @@ def test_valid_input_stream_factory(self, args, expected): @pytest.mark.parametrize( "args,expected", [ - (["TCP", "127.0.0.1", "1234"], ValueError), - (["TCP", "127.0.0.1", 1], ValueError), - (["TCP", "server", "1234"], ValueError), - (["TCP", "server", 1], ValueError), - (["TCP", 1, 1024], ValueError), - (["UDP", "server", "1234"], ValueError), - (["UDP", "server", 1], ValueError), - (["FOO", "server", 1024], ValueError), - (["server", 1234], ValueError), + (["TCP:127.0.0.1:1"], ValueError), ([1], ValueError), (1, ValueError), ([], ValueError), @@ -223,47 +214,47 @@ def test_invalid_input_stream_factory(self, args, expected): with pytest.raises(expected): _ = input_stream_factory(*full_args) - @pytest.mark.parametrize( - "args,expected", - [ - (["TCP", "127.0.0.1", 1234], PortOutputStream), - (["TCP", "localhost", 1234], PortOutputStream), - (["TCP", "foo", 1234], PortOutputStream), - (["UDP", "127.0.0.1", 1234], PortOutputStream), - (["UDP", "localhost", 1234], PortOutputStream), - (["UDP", "foo", 1234], PortOutputStream), - ([1234], PortOutputStream), - (1234, PortOutputStream), - ([], ZMQStream), - (None, ZMQStream), - ], - ) - def test_valid_output_stream_factory(self, args, expected): - full_args = [ - "foo", - "bar", - args, - [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], - {"zmq_context": broker.context}, - ] - stream = output_stream_factory(*full_args) - assert isinstance(stream, expected) + # @pytest.mark.parametrize( + # "args,expected", + # [ + # (["TCP", "127.0.0.1", 1234], PortOutputStream), + # (["TCP", "localhost", 1234], PortOutputStream), + # (["TCP", "foo", 1234], PortOutputStream), + # (["UDP", "127.0.0.1", 1234], PortOutputStream), + # (["UDP", "localhost", 1234], PortOutputStream), + # (["UDP", "foo", 1234], PortOutputStream), + # ([1234], PortOutputStream), + # (1234, PortOutputStream), + # ([], ZMQStream), + # (None, ZMQStream), + # ], + # ) + # def test_valid_output_stream_factory(self, args, expected): + # full_args = [ + # "foo", + # "bar", + # args, + # [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], + # {"zmq_context": broker.context}, + # ] + # stream = output_stream_factory(*full_args) + # assert isinstance(stream, expected) - @pytest.mark.parametrize( - "args,expected", - [ - (["FOO", "127.0.0.1", 1234], ValueError), - (["UDP", "127.0.0.1", "1234"], ValueError), - (["UDP", 1, "1234"], ValueError), - ], - ) - def test_invalid_output_stream_factory(self, args, expected): - full_args = [ - "foo", - "bar", - args, - [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], - {"zmq_context": broker.context}, - ] - with pytest.raises(expected): - _ = output_stream_factory(*full_args) \ No newline at end of file + # @pytest.mark.parametrize( + # "args,expected", + # [ + # (["FOO", "127.0.0.1", 1234], ValueError), + # (["UDP", "127.0.0.1", "1234"], ValueError), + # (["UDP", 1, "1234"], ValueError), + # ], + # ) + # def test_invalid_output_stream_factory(self, args, expected): + # full_args = [ + # "foo", + # "bar", + # args, + # [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], + # {"zmq_context": broker.context}, + # ] + # with pytest.raises(expected): + # _ = output_stream_factory(*full_args) \ No newline at end of file From 9dfa2f277a4734fc73345915e380a9fadca5b826 Mon Sep 17 00:00:00 2001 From: Cameron Jackson Date: Wed, 23 Oct 2024 14:43:33 -0700 Subject: [PATCH 04/12] split output client class into separate classes for udp and tcp --- ait/core/server/client.py | 108 +++++++++++++++------------- ait/core/server/stream.py | 120 ++++++++++---------------------- docker/docker-compose.yaml | 38 +++++----- docker/network-test-config.yaml | 29 ++++---- 4 files changed, 129 insertions(+), 166 deletions(-) diff --git a/ait/core/server/client.py b/ait/core/server/client.py index 521997bc..4f85c52a 100644 --- a/ait/core/server/client.py +++ b/ait/core/server/client.py @@ -119,10 +119,10 @@ def _run(self): raise (e) -class OutputClient(ZMQInputClient): +class UDPOutputClient(ZMQInputClient): """ This is the parent class for all outbound streams which publish - to a port. It opens a UDP port to publish to and publishes + to a UDP port. It opens a UDP port to publish to and publishes outgoing message data to this port. """ @@ -133,59 +133,73 @@ def __init__( zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL, **kwargs, ): - super(OutputClient, self).__init__( + + super(UDPOutputClient, self).__init__( zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url ) - self.protocol = "UDP" - output_spec = kwargs.get("output", None) - if output_spec is None: - raise ValueError(f"Invalid output client specification: {output_spec}") - if type(output_spec) is int: - self.host = "localhost" - self.out_port = output_spec - self.pub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - elif ( - type(output_spec) in [list, tuple] - and len(output_spec) == 1 - and type(output_spec[0]) is int - ): - self.host = "localhost" - self.out_port = output_spec[0] - self.pub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - elif ( - type(output_spec) in [list, tuple] - and len(output_spec) == 3 - and type(output_spec[0]) is str - and output_spec[0].upper() == "UDP" - and type(output_spec[1]) is str - and type(output_spec[2]) is int - ): - self.host = output_spec[1] - self.out_port = output_spec[2] - self.pub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - elif ( - len(output_spec) == 3 - and type(output_spec[0]) is str - and output_spec[0].upper() == "TCP" - and type(output_spec[1]) is str - and type(output_spec[2]) is int - ): - self.protocol = "TCP" - self.host = output_spec[1] - self.out_port = output_spec[2] - self.pub = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + if "output" in kwargs: + output = kwargs["output"] + if type(output) is int: + self.addr_spec = ("localhost", output) + elif utils.is_valid_address_spec(output): + protocol,hostname,port = output.split(":") + if protocol.lower() != "udp": + raise (ValueError(f"UDPOutputClient: Invalid Specification {output}")) + self.addr_spec = (hostname,int(port)) + else: + raise (ValueError(f"UDPOutputClient: Invalid Specification {output}")) else: - raise ValueError(f"Invalid output client specification: {output_spec}") + raise (ValueError(f"UDPOutputClient: Invalid Specification")) + + self.context = zmq_context + # override pub to be udp socket + self.pub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) def publish(self, msg): - if self.protocol == "TCP": - self.pub.connect((self.host, int(self.out_port))) - self.pub.sendall(msg) - else: - self.pub.sendto(msg, (self.host, int(self.out_port))) + self.pub.sendto(msg, self.addr_spec) log.debug("Published message from {}".format(self)) +class TCPOutputClient(ZMQInputClient): + """ + This is the parent class for all outbound streams which publish + to a TCP port. It opens a TCP connection to publish to and publishes + outgoing message data to this port. + """ + + def __init__( + self, + zmq_context, + zmq_proxy_xsub_url=ait.SERVER_DEFAULT_XSUB_URL, + zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL, + **kwargs, + ): + + super(TCPOutputClient, self).__init__( + zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url + ) + if "output" in kwargs: + output = kwargs["output"] + if utils.is_valid_address_spec(output): + protocol,hostname,port = output.split(":") + if protocol.lower() != "tcp": + raise (ValueError(f"TCPOutputClient: Invalid Specification {output}")) + self.addr_spec = (hostname,int(port)) + else: + raise (ValueError(f"TCPOutputClient: Invalid Specification {output}")) + else: + raise (ValueError(f"TCPOutputClient: Invalid Specification")) + + + self.context = zmq_context + self.pub = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + + def publish(self, msg): + self.pub.connect(self.addr_spec) + self.pub.sendall(msg) + + class UDPInputServer(ZMQClient, gs.DatagramServer): """ diff --git a/ait/core/server/stream.py b/ait/core/server/stream.py index 45e537c1..35b72cc3 100644 --- a/ait/core/server/stream.py +++ b/ait/core/server/stream.py @@ -1,9 +1,5 @@ import ait.core.log -from .client import OutputClient -from .client import TCPInputClient -from .client import TCPInputServer -from .client import UDPInputServer -from .client import ZMQInputClient +from .client import UDPOutputClient, TCPOutputClient, TCPInputClient, TCPInputServer, UDPInputServer, ZMQInputClient import ait from .utils import is_valid_address_spec @@ -107,29 +103,29 @@ def valid_workflow(self): def output_stream_factory(name, inputs, outputs, handlers, zmq_args=None): - if type(outputs) is list and len(outputs) > 0: - return PortOutputStream( - name, - inputs, - outputs, - handlers, - zmq_args=zmq_args, - ) - elif type(outputs) is int: - return PortOutputStream( - name, - inputs, - [outputs], - handlers, - zmq_args=zmq_args, - ) + ostream = None + if type(outputs) is not list or (type(outputs) is list and len(outputs) == 0): + raise ValueError(f"Output stream specification invalid: {outputs}") + # backwards compatability with original UDP spec + if type(outputs) is list and type(outputs[0]) is int and ait.MIN_PORT <= outputs[0] <= ait.MAX_PORT: + ostream = UDPOutputStream(name, inputs, outputs[0], handlers, zmq_args=zmq_args) + elif is_valid_address_spec(outputs[0]): + protocol,hostname,port = outputs[0].split(':') + if int(port) < ait.MIN_PORT or int(port) > ait.MAX_PORT: + raise ValueError(f"Output stream specification invalid: {outputs}") + if protocol.lower() == "udp": + ostream = UDPOutputStream(name, inputs, outputs[0], handlers, zmq_args=zmq_args) + elif protocol.lower() == "tcp": + ostream = TCPOutputStream(name, inputs, outputs[0], handlers, zmq_args=zmq_args) + else: + raise ValueError(f"Output stream specification invalid: {outputs}") else: - return ZMQStream( - name, - inputs, - handlers, - zmq_args=zmq_args, - ) + raise ValueError(f"Output stream specification invalid: {outputs}") + + if ostream is None: + raise ValueError(f"Output stream specification invalid: {outputs}") + return ostream + def input_stream_factory(name, inputs, handlers, zmq_args=None): @@ -167,61 +163,6 @@ def input_stream_factory(name, inputs, handlers, zmq_args=None): else: raise ValueError(f"Input stream specification invalid: {inputs}") - - - # elif ( - # type(inputs) is list - # and len(inputs) == 1 - # and type(inputs[0]) is int - # and ait.MIN_PORT <= inputs[0] <= ait.MAX_PORT - # ): - # stream = UDPInputServerStream(name, inputs, handlers, zmq_args=zmq_args) - # elif type(inputs) is list: - # if len(inputs) == 3 and type(inputs[0]) is str and inputs[0].upper() == "TCP": - # if type(inputs[1]) is str and inputs[1].lower() in [ - # "server", - # "localhost", - # "127.0.0.1", - # "0.0.0.0", - # ]: - # if type(inputs[2]) is int and ait.MIN_PORT <= inputs[2] <= ait.MAX_PORT: - # stream = TCPInputServerStream(name, inputs[1:], handlers, zmq_args) - # else: - # raise ValueError(f"Input stream specification invalid: {inputs}") - # elif type(inputs[1]) is str and inputs[1].lower() not in [ - # "server", - # "localhost", - # "127.0.0.1", - # "0.0.0.0", - # ]: - # if type(inputs[2]) is int and ait.MIN_PORT <= inputs[2] <= ait.MAX_PORT: - # stream = TCPInputClientStream(name, inputs[1:], handlers, zmq_args) - # else: - # raise ValueError(f"Input stream specification invalid: {inputs}") - # else: - # raise ValueError(f"Input stream specification invalid: {inputs}") - # elif len(inputs) == 3 and type(inputs[0]) is str and inputs[0].upper() == "UDP": - # if type(inputs[1]) is str and inputs[1].lower() in [ - # "server", - # "localhost", - # "127.0.0.1", - # "0.0.0.0", - # ]: - # if type(inputs[2]) is int and ait.MIN_PORT <= inputs[2] <= ait.MAX_PORT: - # stream = UDPInputServerStream( - # name, inputs[1:], handlers, zmq_args=zmq_args - # ) - # else: - # raise ValueError(f"Input stream specification invalid: {inputs}") - # else: - # raise ValueError(f"Input stream specification invalid: {inputs}") - # elif all(isinstance(item, str) for item in inputs): - # stream = ZMQStream(name, inputs, handlers, zmq_args=zmq_args) - # else: - # raise ValueError(f"Input stream specification invalid: {inputs}") - # else: - # raise ValueError(f"Input stream specification invalid: {inputs}") - if stream is None: raise ValueError(f"Input stream specification invalid: {inputs}") return stream @@ -264,13 +205,24 @@ def __init__(self, name, inputs, handlers, zmq_args=None): super(ZMQStream, self).__init__(name, inputs, handlers, zmq_args) -class PortOutputStream(Stream, OutputClient): +class UDPOutputStream(Stream, UDPOutputClient): """ This stream type listens for messages from another stream or plugin and publishes to a UDP port. """ def __init__(self, name, inputs, output, handlers, zmq_args=None): - super(PortOutputStream, self).__init__( + super(UDPOutputStream, self).__init__( + name, inputs, handlers, zmq_args, output=output + ) + +class TCPOutputStream(Stream, TCPOutputClient): + """ + This stream type listens for messages from another stream or plugin and + publishes to a TCP port. + """ + + def __init__(self, name, inputs, output, handlers, zmq_args=None): + super(TCPOutputStream, self).__init__( name, inputs, handlers, zmq_args, output=output ) diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 0aa4ce63..a4b7e317 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -5,7 +5,7 @@ services: build: context: ../ dockerfile: ./docker/Dockerfile - depends_on: [tcp-server-send] + depends_on: [tcp-server-send, tcp-server, udp-server] command: ["source ~/.bashrc && ait-server"] environment: LOG_LEVEL: INFO @@ -56,15 +56,15 @@ services: # networks: # - ait - # tcp-server: - # container_name: tcp-server - # platform: linux/amd64 - # build: - # context: ../ - # dockerfile: ./docker/Dockerfile - # command: ["source /home/ait/.bashrc && cd AIT-Core && workon ait && python -u scripts/network_tester.py server TCP 0.0.0.0 1237"] - # networks: - # - ait + tcp-server: + container_name: tcp-server + platform: linux/amd64 + build: + context: ../ + dockerfile: ./docker/Dockerfile + command: ["source ~/.bashrc && python -u scripts/network_tester.py server TCP 0.0.0.0 1237"] + networks: + - ait tcp-server-send: container_name: tcp-server-send @@ -76,15 +76,15 @@ services: networks: - ait - # udp-server: - # container_name: udp-server - # platform: linux/amd64 - # build: - # context: ../ - # dockerfile: ./docker/Dockerfile - # command: ["source /home/ait/.bashrc && cd AIT-Core && workon ait && python -u scripts/network_tester.py server UDP 0.0.0.0 1236"] - # networks: - # - ait + udp-server: + container_name: udp-server + platform: linux/amd64 + build: + context: ../ + dockerfile: ./docker/Dockerfile + command: ["source ~/.bashrc && python -u scripts/network_tester.py server UDP 0.0.0.0 1238"] + networks: + - ait networks: ait: diff --git a/docker/network-test-config.yaml b/docker/network-test-config.yaml index 7667f792..2fa67b04 100644 --- a/docker/network-test-config.yaml +++ b/docker/network-test-config.yaml @@ -46,20 +46,17 @@ default: handler_name: "TCP Input Client Stream 3" outbound-streams: - # - stream: - # input: - # - input_stream_debug_1 - # name: output_stream_debug_1 - # output: - # - "TCP" - # - "tcp-server" - # - 1237 + - stream: + name: output_stream_debug_1 + input: + - input_stream_debug_1 + output: + - "TCP:tcp-server:1237" - # - stream: - # name: output_stream_debug_2 - # input: - # - input_stream_debug_2 - # output: - # - "UDP" - # - "udp-server" - # - 1236 \ No newline at end of file + - stream: + name: output_stream_debug_2 + input: + - input_stream_debug_2 + output: + - "UDP:udp-server:1238" + \ No newline at end of file From f54bb724eb95e6b6dc7b3e82a547cd675b4a574a Mon Sep 17 00:00:00 2001 From: Cameron Jackson Date: Wed, 23 Oct 2024 14:54:13 -0700 Subject: [PATCH 05/12] install pre-commit now --- Makefile | 2 +- ait/core/server/client.py | 84 +++++++++-------------- ait/core/server/handlers/__init__.py | 2 +- ait/core/server/handlers/debug_handler.py | 2 +- ait/core/server/stream.py | 54 +++++++++++---- ait/core/server/utils.py | 1 + docker/Dockerfile | 2 +- docker/docker-compose.yaml | 2 +- docker/network-test-config.yaml | 1 - tests/ait/core/server/test_client.py | 20 ------ tests/ait/core/server/test_server.py | 2 +- tests/ait/core/server/test_stream.py | 6 +- 12 files changed, 82 insertions(+), 96 deletions(-) diff --git a/Makefile b/Makefile index c336b3af..3137234a 100644 --- a/Makefile +++ b/Makefile @@ -11,4 +11,4 @@ logs: @cd docker; docker compose logs --follow bash: - @cd docker; docker compose exec ait-server bash \ No newline at end of file + @cd docker; docker compose exec ait-server bash diff --git a/ait/core/server/client.py b/ait/core/server/client.py index adff4f3f..60718d2d 100644 --- a/ait/core/server/client.py +++ b/ait/core/server/client.py @@ -131,7 +131,6 @@ def __init__( zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL, **kwargs, ): - super(UDPOutputClient, self).__init__( zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url ) @@ -140,15 +139,16 @@ def __init__( if type(output) is int: self.addr_spec = ("localhost", output) elif utils.is_valid_address_spec(output): - protocol,hostname,port = output.split(":") + protocol, hostname, port = output.split(":") if protocol.lower() != "udp": - raise (ValueError(f"UDPOutputClient: Invalid Specification {output}")) - self.addr_spec = (hostname,int(port)) + raise ( + ValueError(f"UDPOutputClient: Invalid Specification {output}") + ) + self.addr_spec = (hostname, int(port)) else: raise (ValueError(f"UDPOutputClient: Invalid Specification {output}")) else: - raise (ValueError(f"UDPOutputClient: Invalid Specification")) - + raise (ValueError("UDPOutputClient: Invalid Specification")) self.context = zmq_context # override pub to be udp socket @@ -158,6 +158,7 @@ def publish(self, msg): self.pub.sendto(msg, self.addr_spec) log.debug("Published message from {}".format(self)) + class TCPOutputClient(ZMQInputClient): """ This is the parent class for all outbound streams which publish @@ -172,33 +173,31 @@ def __init__( zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL, **kwargs, ): - super(TCPOutputClient, self).__init__( zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url ) if "output" in kwargs: output = kwargs["output"] if utils.is_valid_address_spec(output): - protocol,hostname,port = output.split(":") + protocol, hostname, port = output.split(":") if protocol.lower() != "tcp": - raise (ValueError(f"TCPOutputClient: Invalid Specification {output}")) - self.addr_spec = (hostname,int(port)) + raise ( + ValueError(f"TCPOutputClient: Invalid Specification {output}") + ) + self.addr_spec = (hostname, int(port)) else: raise (ValueError(f"TCPOutputClient: Invalid Specification {output}")) else: - raise (ValueError(f"TCPOutputClient: Invalid Specification")) - + raise (ValueError("TCPOutputClient: Invalid Specification")) self.context = zmq_context self.pub = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - def publish(self, msg): self.pub.connect(self.addr_spec) self.pub.sendall(msg) - class UDPInputServer(ZMQClient, gs.DatagramServer): """ This is the parent class for all inbound streams which receive messages @@ -218,7 +217,7 @@ def __init__( if type(input) is int: host_spec = input elif utils.is_valid_address_spec(input): - protocol,hostname,port = input.split(":") + protocol, hostname, port = input.split(":") if protocol.lower() != "udp": raise (ValueError(f"UDPInputServer: Invalid Specification {input}")) if hostname in ["127.0.0.1", "localhost"]: @@ -227,7 +226,7 @@ def __init__( host_spec = f"0.0.0.0:{port}" else: raise (ValueError(f"UDPInputServer: Invalid Specification {input}")) - + else: raise (ValueError(f"UDPInputServer: Invalid Specification {input}")) super(UDPInputServer, self).__init__( @@ -266,26 +265,21 @@ def __init__( if "input" in kwargs: input = kwargs["input"] if not utils.is_valid_address_spec(input): - raise ( - ValueError( - f"TCPInputServer: Invalid Specification {input}" - ) - ) - protocol,hostname,port = input.split(":") - if protocol.lower() != "tcp" or hostname not in ["127.0.0.1", "localhost", "server", "0.0.0.0"]: - raise ( - ValueError( - f"TCPInputServer: Invalid Specification {input}" - ) - ) + raise (ValueError(f"TCPInputServer: Invalid Specification {input}")) + protocol, hostname, port = input.split(":") + if protocol.lower() != "tcp" or hostname not in [ + "127.0.0.1", + "localhost", + "server", + "0.0.0.0", + ]: + raise (ValueError(f"TCPInputServer: Invalid Specification {input}")) self.sub = gevent.socket.socket( gevent.socket.AF_INET, gevent.socket.SOCK_STREAM ) hostname = ( - "127.0.0.1" - if hostname in ["127.0.0.1", "localhost"] - else "0.0.0.0" + "127.0.0.1" if hostname in ["127.0.0.1", "localhost"] else "0.0.0.0" ) super(TCPInputServer, self).__init__( zmq_context, @@ -294,11 +288,7 @@ def __init__( listener=(hostname, int(port)), ) else: - raise ( - ValueError( - "TCPInputServer: Invalid Specification" - ) - ) + raise (ValueError("TCPInputServer: Invalid Specification")) def handle(self, socket, address): self.cur_socket = socket @@ -344,18 +334,10 @@ def __init__( if "input" in kwargs: input = kwargs["input"] if not utils.is_valid_address_spec(input): - raise ( - ValueError( - f"TCPInputClient: Invalid Specification {input}" - ) - ) - protocol,hostname,port = input.split(":") + raise (ValueError(f"TCPInputClient: Invalid Specification {input}")) + protocol, hostname, port = input.split(":") if protocol.lower() != "tcp": - raise ( - ValueError( - f"TCPInputClient: Invalid Specification {input}" - ) - ) + raise (ValueError(f"TCPInputClient: Invalid Specification {input}")) super(TCPInputClient, self).__init__( zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url ) @@ -364,14 +346,10 @@ def __init__( self.hostname = hostname self.port = int(port) - self.address = (hostname,int(port)) + self.address = (hostname, int(port)) else: - raise ( - ValueError( - "TCPInputClient: Invalid Specification" - ) - ) + raise (ValueError("TCPInputClient: Invalid Specification")) def __exit__(self): try: diff --git a/ait/core/server/handlers/__init__.py b/ait/core/server/handlers/__init__.py index d7f65ebd..cbd5094f 100644 --- a/ait/core/server/handlers/__init__.py +++ b/ait/core/server/handlers/__init__.py @@ -1,3 +1,3 @@ from .ccsds_packet_handler import * # noqa +from .debug_handler import * # noqa from .packet_handler import * # noqa -from .debug_handler import * # noqa diff --git a/ait/core/server/handlers/debug_handler.py b/ait/core/server/handlers/debug_handler.py index 32f19231..aab598b4 100644 --- a/ait/core/server/handlers/debug_handler.py +++ b/ait/core/server/handlers/debug_handler.py @@ -9,4 +9,4 @@ def __init__(self, input_type=None, output_type=None, **kwargs): def handle(self, input_data): ait.core.log.info(f"{self.handler_name} received {len(input_data)} bytes") - return input_data \ No newline at end of file + return input_data diff --git a/ait/core/server/stream.py b/ait/core/server/stream.py index 35b72cc3..93b4af6b 100644 --- a/ait/core/server/stream.py +++ b/ait/core/server/stream.py @@ -1,6 +1,10 @@ import ait.core.log -from .client import UDPOutputClient, TCPOutputClient, TCPInputClient, TCPInputServer, UDPInputServer, ZMQInputClient -import ait +from .client import TCPInputClient +from .client import TCPInputServer +from .client import TCPOutputClient +from .client import UDPInputServer +from .client import UDPOutputClient +from .client import ZMQInputClient from .utils import is_valid_address_spec @@ -107,16 +111,24 @@ def output_stream_factory(name, inputs, outputs, handlers, zmq_args=None): if type(outputs) is not list or (type(outputs) is list and len(outputs) == 0): raise ValueError(f"Output stream specification invalid: {outputs}") # backwards compatability with original UDP spec - if type(outputs) is list and type(outputs[0]) is int and ait.MIN_PORT <= outputs[0] <= ait.MAX_PORT: + if ( + type(outputs) is list + and type(outputs[0]) is int + and ait.MIN_PORT <= outputs[0] <= ait.MAX_PORT + ): ostream = UDPOutputStream(name, inputs, outputs[0], handlers, zmq_args=zmq_args) elif is_valid_address_spec(outputs[0]): - protocol,hostname,port = outputs[0].split(':') + protocol, hostname, port = outputs[0].split(":") if int(port) < ait.MIN_PORT or int(port) > ait.MAX_PORT: raise ValueError(f"Output stream specification invalid: {outputs}") if protocol.lower() == "udp": - ostream = UDPOutputStream(name, inputs, outputs[0], handlers, zmq_args=zmq_args) + ostream = UDPOutputStream( + name, inputs, outputs[0], handlers, zmq_args=zmq_args + ) elif protocol.lower() == "tcp": - ostream = TCPOutputStream(name, inputs, outputs[0], handlers, zmq_args=zmq_args) + ostream = TCPOutputStream( + name, inputs, outputs[0], handlers, zmq_args=zmq_args + ) else: raise ValueError(f"Output stream specification invalid: {outputs}") else: @@ -127,7 +139,6 @@ def output_stream_factory(name, inputs, outputs, handlers, zmq_args=None): return ostream - def input_stream_factory(name, inputs, handlers, zmq_args=None): """ This factory preempts the creating of streams directly. It accepts @@ -140,22 +151,38 @@ def input_stream_factory(name, inputs, handlers, zmq_args=None): if type(inputs) is not list or (type(inputs) is list and len(inputs) == 0): raise ValueError(f"Input stream specification invalid: {inputs}") - + # backwards compatability with original UDP server spec - if type(inputs) is list and type(inputs[0]) is int and ait.MIN_PORT <= inputs[0] <= ait.MAX_PORT: + if ( + type(inputs) is list + and type(inputs[0]) is int + and ait.MIN_PORT <= inputs[0] <= ait.MAX_PORT + ): stream = UDPInputServerStream(name, inputs[0], handlers, zmq_args=zmq_args) elif is_valid_address_spec(inputs[0]): - protocol,hostname,port = inputs[0].split(':') + protocol, hostname, port = inputs[0].split(":") if int(port) < ait.MIN_PORT or int(port) > ait.MAX_PORT: raise ValueError(f"Input stream specification invalid: {inputs}") if protocol.lower() == "tcp": - if hostname.lower() in ["server","localhost","127.0.0.1","0.0.0.0",]: + if hostname.lower() in [ + "server", + "localhost", + "127.0.0.1", + "0.0.0.0", + ]: stream = TCPInputServerStream(name, inputs[0], handlers, zmq_args) else: stream = TCPInputClientStream(name, inputs[0], handlers, zmq_args) else: - if hostname.lower() in ["server","localhost","127.0.0.1","0.0.0.0",]: - stream = UDPInputServerStream(name, inputs[0], handlers, zmq_args=zmq_args) + if hostname.lower() in [ + "server", + "localhost", + "127.0.0.1", + "0.0.0.0", + ]: + stream = UDPInputServerStream( + name, inputs[0], handlers, zmq_args=zmq_args + ) else: raise ValueError(f"Input stream specification invalid: {inputs}") elif all(isinstance(item, str) for item in inputs): @@ -216,6 +243,7 @@ def __init__(self, name, inputs, output, handlers, zmq_args=None): name, inputs, handlers, zmq_args, output=output ) + class TCPOutputStream(Stream, TCPOutputClient): """ This stream type listens for messages from another stream or plugin and diff --git a/ait/core/server/utils.py b/ait/core/server/utils.py index b711bb20..7f5c2ff3 100644 --- a/ait/core/server/utils.py +++ b/ait/core/server/utils.py @@ -64,6 +64,7 @@ def decode_message(msg): return (tpc, msg) + def is_valid_address_spec(address): if type(address) is not str: return False diff --git a/docker/Dockerfile b/docker/Dockerfile index cd579644..9a1cebb4 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -100,4 +100,4 @@ ENTRYPOINT ["/usr/bin/bash","-c"] # && echo 'export AIT_CONFIG=${PROJECT_HOME}/docker/network-test-config.yaml' >> ~/.bashrc \ # && echo 'export POETRY_VIRTUALENVS_CREATE=false' >> ~/.bashrc \ # && poetry install --no-interaction --no-ansi -# ENTRYPOINT ["/usr/bin/bash","-c"] \ No newline at end of file +# ENTRYPOINT ["/usr/bin/bash","-c"] diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index a4b7e317..b256b38f 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -88,4 +88,4 @@ services: networks: ait: - name: ait \ No newline at end of file + name: ait diff --git a/docker/network-test-config.yaml b/docker/network-test-config.yaml index 2fa67b04..8cc3cb1c 100644 --- a/docker/network-test-config.yaml +++ b/docker/network-test-config.yaml @@ -59,4 +59,3 @@ default: - input_stream_debug_2 output: - "UDP:udp-server:1238" - \ No newline at end of file diff --git a/tests/ait/core/server/test_client.py b/tests/ait/core/server/test_client.py index 2fc39f3b..8cf64d6e 100644 --- a/tests/ait/core/server/test_client.py +++ b/tests/ait/core/server/test_client.py @@ -1,55 +1,40 @@ # import gevent - # from ait.core.server.broker import Broker # from ait.core.server.client import TCPInputClient # from ait.core.server.client import TCPInputServer - # broker = Broker() # TEST_BYTES = "Howdy".encode() # TEST_PORT = 6666 - - # class SimpleServer(gevent.server.StreamServer): # def handle(self, socket, address): # socket.sendall(TEST_BYTES) - - # class TCPServer(TCPInputServer): # def __init__(self, name, inputs, **kwargs): # super(TCPServer, self).__init__(broker.context, input=inputs) - # def process(self, input_data): # self.cur_socket.sendall(input_data) - - # class TCPClient(TCPInputClient): # def __init__(self, name, inputs, **kwargs): # super(TCPClient, self).__init__( # broker.context, input=inputs, protocol=gevent.socket.SOCK_STREAM # ) # self.input_data = None - # def process(self, input_data): # self.input_data = input_data # self._exit() - - # class TestTCPServer: # def setup_method(self): # self.server = TCPServer("test_tcp_server", inputs=["server", TEST_PORT]) # self.server.start() # self.client = gevent.socket.create_connection(("127.0.0.1", TEST_PORT)) - # def teardown_method(self): # self.server.stop() # self.client.close() - # def test_TCP_server(self): # nbytes = self.client.send(TEST_BYTES) # response = self.client.recv(len(TEST_BYTES)) # assert nbytes == len(TEST_BYTES) # assert response == TEST_BYTES - # def test_null_send(self): # nbytes1 = self.client.send(b"") # nbytes2 = self.client.send(TEST_BYTES) @@ -57,8 +42,6 @@ # assert nbytes1 == 0 # assert nbytes2 == len(TEST_BYTES) # assert response == TEST_BYTES - - # class TestTCPClient: # def setup_method(self): # self.server = SimpleServer(("127.0.0.1", 0)) @@ -66,15 +49,12 @@ # self.client = TCPClient( # "test_tcp_client", inputs=["127.0.0.1", self.server.server_port] # ) - # def teardown_method(self): # self.server.stop() - # def test_TCP_client(self): # self.client.start() # gevent.sleep(1) # assert self.client.input_data == TEST_BYTES - # def test_bad_connection(self): # self.client.port = 1 # self.client.connection_reattempts = 2 diff --git a/tests/ait/core/server/test_server.py b/tests/ait/core/server/test_server.py index 8592f6d3..28ce004b 100644 --- a/tests/ait/core/server/test_server.py +++ b/tests/ait/core/server/test_server.py @@ -552,4 +552,4 @@ def rewrite_and_reload_config(filename, yaml): class FakeStream(object): def __init__(self, name, input_=None, handlers=None, zmq_args=None): - self.name = name \ No newline at end of file + self.name = name diff --git a/tests/ait/core/server/test_stream.py b/tests/ait/core/server/test_stream.py index 8b48bcd0..cf0491a1 100644 --- a/tests/ait/core/server/test_stream.py +++ b/tests/ait/core/server/test_stream.py @@ -59,7 +59,7 @@ class TestStream: "tcp_server", { "name": "some_tcp_stream_server", - "inputs": 'TCP:server:1234', + "inputs": "TCP:server:1234", "handlers_len": 1, "handler_type": PacketHandler, "broker_context": broker.context, @@ -72,7 +72,7 @@ class TestStream: "tcp_client", { "name": "some_tcp_stream_client", - "inputs": 'TCP:127.0.0.1:1234', + "inputs": "TCP:127.0.0.1:1234", "handlers_len": 1, "handler_type": PacketHandler, "broker_context": broker.context, @@ -257,4 +257,4 @@ def test_invalid_input_stream_factory(self, args, expected): # {"zmq_context": broker.context}, # ] # with pytest.raises(expected): - # _ = output_stream_factory(*full_args) \ No newline at end of file + # _ = output_stream_factory(*full_args) From a289a496643c21a84408d181809afef568ce3e8f Mon Sep 17 00:00:00 2001 From: Cameron Jackson Date: Wed, 23 Oct 2024 15:16:12 -0700 Subject: [PATCH 06/12] Passing tests --- ait/core/server/stream.py | 22 ++-- tests/ait/core/server/test_client.py | 144 +++++++++++++++------------ tests/ait/core/server/test_server.py | 6 +- tests/ait/core/server/test_stream.py | 3 +- 4 files changed, 102 insertions(+), 73 deletions(-) diff --git a/ait/core/server/stream.py b/ait/core/server/stream.py index 93b4af6b..9aeac911 100644 --- a/ait/core/server/stream.py +++ b/ait/core/server/stream.py @@ -107,15 +107,23 @@ def valid_workflow(self): def output_stream_factory(name, inputs, outputs, handlers, zmq_args=None): + """ + This factory preempts the creating of output streams directly. It accepts + the same args as any given stream class and then based primarily on the + values in 'outputs' decides on the appropriate stream to instantiate and + then returns it. + """ ostream = None if type(outputs) is not list or (type(outputs) is list and len(outputs) == 0): - raise ValueError(f"Output stream specification invalid: {outputs}") + ostream = ZMQStream( + name, + inputs, + handlers, + zmq_args=zmq_args, + ) + return ostream # backwards compatability with original UDP spec - if ( - type(outputs) is list - and type(outputs[0]) is int - and ait.MIN_PORT <= outputs[0] <= ait.MAX_PORT - ): + if type(outputs[0]) is int and ait.MIN_PORT <= outputs[0] <= ait.MAX_PORT: ostream = UDPOutputStream(name, inputs, outputs[0], handlers, zmq_args=zmq_args) elif is_valid_address_spec(outputs[0]): protocol, hostname, port = outputs[0].split(":") @@ -141,7 +149,7 @@ def output_stream_factory(name, inputs, outputs, handlers, zmq_args=None): def input_stream_factory(name, inputs, handlers, zmq_args=None): """ - This factory preempts the creating of streams directly. It accepts + This factory preempts the creating of input streams directly. It accepts the same args as any given stream class and then based primarily on the values in 'inputs' decides on the appropriate stream to instantiate and then returns it. diff --git a/tests/ait/core/server/test_client.py b/tests/ait/core/server/test_client.py index 8cf64d6e..b86a694b 100644 --- a/tests/ait/core/server/test_client.py +++ b/tests/ait/core/server/test_client.py @@ -1,62 +1,82 @@ -# import gevent -# from ait.core.server.broker import Broker -# from ait.core.server.client import TCPInputClient -# from ait.core.server.client import TCPInputServer -# broker = Broker() -# TEST_BYTES = "Howdy".encode() -# TEST_PORT = 6666 -# class SimpleServer(gevent.server.StreamServer): -# def handle(self, socket, address): -# socket.sendall(TEST_BYTES) -# class TCPServer(TCPInputServer): -# def __init__(self, name, inputs, **kwargs): -# super(TCPServer, self).__init__(broker.context, input=inputs) -# def process(self, input_data): -# self.cur_socket.sendall(input_data) -# class TCPClient(TCPInputClient): -# def __init__(self, name, inputs, **kwargs): -# super(TCPClient, self).__init__( -# broker.context, input=inputs, protocol=gevent.socket.SOCK_STREAM -# ) -# self.input_data = None -# def process(self, input_data): -# self.input_data = input_data -# self._exit() -# class TestTCPServer: -# def setup_method(self): -# self.server = TCPServer("test_tcp_server", inputs=["server", TEST_PORT]) -# self.server.start() -# self.client = gevent.socket.create_connection(("127.0.0.1", TEST_PORT)) -# def teardown_method(self): -# self.server.stop() -# self.client.close() -# def test_TCP_server(self): -# nbytes = self.client.send(TEST_BYTES) -# response = self.client.recv(len(TEST_BYTES)) -# assert nbytes == len(TEST_BYTES) -# assert response == TEST_BYTES -# def test_null_send(self): -# nbytes1 = self.client.send(b"") -# nbytes2 = self.client.send(TEST_BYTES) -# response = self.client.recv(len(TEST_BYTES)) -# assert nbytes1 == 0 -# assert nbytes2 == len(TEST_BYTES) -# assert response == TEST_BYTES -# class TestTCPClient: -# def setup_method(self): -# self.server = SimpleServer(("127.0.0.1", 0)) -# self.server.start() -# self.client = TCPClient( -# "test_tcp_client", inputs=["127.0.0.1", self.server.server_port] -# ) -# def teardown_method(self): -# self.server.stop() -# def test_TCP_client(self): -# self.client.start() -# gevent.sleep(1) -# assert self.client.input_data == TEST_BYTES -# def test_bad_connection(self): -# self.client.port = 1 -# self.client.connection_reattempts = 2 -# self.client.start() -# assert self.client.connection_status != 0 +import gevent + +from ait.core.server.broker import Broker +from ait.core.server.client import TCPInputClient +from ait.core.server.client import TCPInputServer + +broker = Broker() +TEST_BYTES = "Howdy".encode() +TEST_PORT = 6666 + + +class SimpleServer(gevent.server.StreamServer): + def handle(self, socket, address): + socket.sendall(TEST_BYTES) + + +class TCPServer(TCPInputServer): + def __init__(self, name, inputs, **kwargs): + super(TCPServer, self).__init__(broker.context, input=inputs) + + def process(self, input_data): + self.cur_socket.sendall(input_data) + + +class TCPClient(TCPInputClient): + def __init__(self, name, inputs, **kwargs): + super(TCPClient, self).__init__( + broker.context, input=inputs, protocol=gevent.socket.SOCK_STREAM + ) + self.input_data = None + + def process(self, input_data): + self.input_data = input_data + self._exit() + + +class TestTCPServer: + def setup_method(self): + self.server = TCPServer("test_tcp_server", inputs=f"tcp:server:{TEST_PORT}") + self.server.start() + self.client = gevent.socket.create_connection(("127.0.0.1", TEST_PORT)) + + def teardown_method(self): + self.server.stop() + self.client.close() + + def test_TCP_server(self): + nbytes = self.client.send(TEST_BYTES) + response = self.client.recv(len(TEST_BYTES)) + assert nbytes == len(TEST_BYTES) + assert response == TEST_BYTES + + def test_null_send(self): + nbytes1 = self.client.send(b"") + nbytes2 = self.client.send(TEST_BYTES) + response = self.client.recv(len(TEST_BYTES)) + assert nbytes1 == 0 + assert nbytes2 == len(TEST_BYTES) + assert response == TEST_BYTES + + +class TestTCPClient: + def setup_method(self): + self.server = SimpleServer(("127.0.0.1", 0)) + self.server.start() + self.client = TCPClient( + "test_tcp_client", inputs=f"tcp:127.0.0.1:{self.server.server_port}" + ) + + def teardown_method(self): + self.server.stop() + + def test_TCP_client(self): + self.client.start() + gevent.sleep(1) + assert self.client.input_data == TEST_BYTES + + def test_bad_connection(self): + self.client.port = 1 + self.client.connection_reattempts = 2 + self.client.start() + assert self.client.connection_status != 0 diff --git a/tests/ait/core/server/test_server.py b/tests/ait/core/server/test_server.py index 28ce004b..25c93765 100644 --- a/tests/ait/core/server/test_server.py +++ b/tests/ait/core/server/test_server.py @@ -375,11 +375,11 @@ def test_successful_outbound_stream_creation( assert type(created_stream.handlers) == list # Testing creation of outbound stream with port output - config = cfg.AitConfig(config={"name": "some_stream", "output": 3333}) + config = cfg.AitConfig(config={"name": "some_stream", "output": [3333]}) created_stream = server._create_outbound_stream(config) - assert type(created_stream) == ait.core.server.stream.PortOutputStream + assert type(created_stream) == ait.core.server.stream.UDPOutputStream assert created_stream.name == "some_stream" - assert created_stream.out_port == 3333 + assert created_stream.addr_spec == ("localhost", 3333) assert created_stream.handlers == [] diff --git a/tests/ait/core/server/test_stream.py b/tests/ait/core/server/test_stream.py index cf0491a1..54f77dc5 100644 --- a/tests/ait/core/server/test_stream.py +++ b/tests/ait/core/server/test_stream.py @@ -8,10 +8,11 @@ from ait.core.server.handlers import PacketHandler from ait.core.server.stream import input_stream_factory from ait.core.server.stream import output_stream_factory -from ait.core.server.stream import PortOutputStream from ait.core.server.stream import TCPInputClientStream from ait.core.server.stream import TCPInputServerStream +from ait.core.server.stream import TCPOutputStream from ait.core.server.stream import UDPInputServerStream +from ait.core.server.stream import UDPOutputStream from ait.core.server.stream import ZMQStream From 2cb87f53bbae19a9f6402914e8e0a38dc981fb27 Mon Sep 17 00:00:00 2001 From: Cameron Jackson Date: Thu, 24 Oct 2024 10:38:40 -0700 Subject: [PATCH 07/12] output streams and tests --- ait/core/server/stream.py | 104 ++++++++++++++++----------- tests/ait/core/server/test_stream.py | 97 ++++++++++++++----------- 2 files changed, 115 insertions(+), 86 deletions(-) diff --git a/ait/core/server/stream.py b/ait/core/server/stream.py index 9aeac911..492fc892 100644 --- a/ait/core/server/stream.py +++ b/ait/core/server/stream.py @@ -113,39 +113,44 @@ def output_stream_factory(name, inputs, outputs, handlers, zmq_args=None): values in 'outputs' decides on the appropriate stream to instantiate and then returns it. """ - ostream = None - if type(outputs) is not list or (type(outputs) is list and len(outputs) == 0): - ostream = ZMQStream( - name, - inputs, - handlers, - zmq_args=zmq_args, - ) - return ostream - # backwards compatability with original UDP spec - if type(outputs[0]) is int and ait.MIN_PORT <= outputs[0] <= ait.MAX_PORT: - ostream = UDPOutputStream(name, inputs, outputs[0], handlers, zmq_args=zmq_args) - elif is_valid_address_spec(outputs[0]): - protocol, hostname, port = outputs[0].split(":") - if int(port) < ait.MIN_PORT or int(port) > ait.MAX_PORT: + + parsed_output = outputs + if type(parsed_output) is list and len(parsed_output) > 0: + if len(parsed_output) > 1: + ait.core.log.warn(f"Additional output args discarded {parsed_output[1:]}") + parsed_output = parsed_output[0] + if type(parsed_output) is int: + if ait.MIN_PORT <= parsed_output <= ait.MAX_PORT: + return UDPOutputStream( + name, inputs, parsed_output, handlers, zmq_args=zmq_args + ) + else: raise ValueError(f"Output stream specification invalid: {outputs}") - if protocol.lower() == "udp": - ostream = UDPOutputStream( - name, inputs, outputs[0], handlers, zmq_args=zmq_args + + elif type(parsed_output) is str and is_valid_address_spec(parsed_output): + protocol, hostname, port = parsed_output.split(":") + if protocol.lower() == "udp" and ait.MIN_PORT <= int(port) <= ait.MAX_PORT: + return UDPOutputStream( + name, inputs, parsed_output, handlers, zmq_args=zmq_args ) - elif protocol.lower() == "tcp": - ostream = TCPOutputStream( - name, inputs, outputs[0], handlers, zmq_args=zmq_args + elif protocol.lower() == "tcp" and ait.MIN_PORT <= int(port) <= ait.MAX_PORT: + return TCPOutputStream( + name, inputs, parsed_output, handlers, zmq_args=zmq_args ) else: raise ValueError(f"Output stream specification invalid: {outputs}") + elif parsed_output is None or ( + type(parsed_output) is list and len(parsed_output) == 0 + ): + return ZMQStream( + name, + inputs, + handlers, + zmq_args=zmq_args, + ) else: raise ValueError(f"Output stream specification invalid: {outputs}") - if ostream is None: - raise ValueError(f"Output stream specification invalid: {outputs}") - return ostream - def input_stream_factory(name, inputs, handlers, zmq_args=None): """ @@ -156,21 +161,30 @@ def input_stream_factory(name, inputs, handlers, zmq_args=None): """ stream = None - - if type(inputs) is not list or (type(inputs) is list and len(inputs) == 0): - raise ValueError(f"Input stream specification invalid: {inputs}") + parsed_inputs = inputs + if type(parsed_inputs) is int: + parsed_inputs = [parsed_inputs] + if type(parsed_inputs) is str: + parsed_inputs = [parsed_inputs] + + if type(parsed_inputs) is not list or ( + type(parsed_inputs) is list and len(parsed_inputs) == 0 + ): + raise ValueError(f"Input stream specification invalid: {parsed_inputs}") # backwards compatability with original UDP server spec if ( - type(inputs) is list - and type(inputs[0]) is int - and ait.MIN_PORT <= inputs[0] <= ait.MAX_PORT + type(parsed_inputs) is list + and type(parsed_inputs[0]) is int + and ait.MIN_PORT <= parsed_inputs[0] <= ait.MAX_PORT ): - stream = UDPInputServerStream(name, inputs[0], handlers, zmq_args=zmq_args) - elif is_valid_address_spec(inputs[0]): - protocol, hostname, port = inputs[0].split(":") + stream = UDPInputServerStream( + name, parsed_inputs[0], handlers, zmq_args=zmq_args + ) + elif is_valid_address_spec(parsed_inputs[0]): + protocol, hostname, port = parsed_inputs[0].split(":") if int(port) < ait.MIN_PORT or int(port) > ait.MAX_PORT: - raise ValueError(f"Input stream specification invalid: {inputs}") + raise ValueError(f"Input stream specification invalid: {parsed_inputs}") if protocol.lower() == "tcp": if hostname.lower() in [ "server", @@ -178,9 +192,13 @@ def input_stream_factory(name, inputs, handlers, zmq_args=None): "127.0.0.1", "0.0.0.0", ]: - stream = TCPInputServerStream(name, inputs[0], handlers, zmq_args) + stream = TCPInputServerStream( + name, parsed_inputs[0], handlers, zmq_args + ) else: - stream = TCPInputClientStream(name, inputs[0], handlers, zmq_args) + stream = TCPInputClientStream( + name, parsed_inputs[0], handlers, zmq_args + ) else: if hostname.lower() in [ "server", @@ -189,17 +207,17 @@ def input_stream_factory(name, inputs, handlers, zmq_args=None): "0.0.0.0", ]: stream = UDPInputServerStream( - name, inputs[0], handlers, zmq_args=zmq_args + name, parsed_inputs[0], handlers, zmq_args=zmq_args ) else: - raise ValueError(f"Input stream specification invalid: {inputs}") - elif all(isinstance(item, str) for item in inputs): - stream = ZMQStream(name, inputs, handlers, zmq_args=zmq_args) + raise ValueError(f"Input stream specification invalid: {parsed_inputs}") + elif all(isinstance(item, str) for item in parsed_inputs): + stream = ZMQStream(name, parsed_inputs, handlers, zmq_args=zmq_args) else: - raise ValueError(f"Input stream specification invalid: {inputs}") + raise ValueError(f"Input stream specification invalid: {parsed_inputs}") if stream is None: - raise ValueError(f"Input stream specification invalid: {inputs}") + raise ValueError(f"Input stream specification invalid: {parsed_inputs}") return stream diff --git a/tests/ait/core/server/test_stream.py b/tests/ait/core/server/test_stream.py index 54f77dc5..aa2e1df2 100644 --- a/tests/ait/core/server/test_stream.py +++ b/tests/ait/core/server/test_stream.py @@ -176,12 +176,18 @@ def test_stream_creation_invalid_workflow(self, stream, args): (["TCP:localhost:1234"], TCPInputServerStream), (["TCP:foo:1234"], TCPInputClientStream), ([1234], UDPInputServerStream), + (1234, UDPInputServerStream), (["UDP:server:1234"], UDPInputServerStream), (["UDP:localhost:1234"], UDPInputServerStream), (["UDP:0.0.0.0:1234"], UDPInputServerStream), (["UDP:127.0.0.1:1234"], UDPInputServerStream), + ("UDP:127.0.0.1:1234", UDPInputServerStream), (["FOO"], ZMQStream), (["FOO", "BAR"], ZMQStream), + ( + [1234, "FOO", "BAR"], + UDPInputServerStream, + ), # Technically valid but not really correct ], ) def test_valid_input_stream_factory(self, args, expected): @@ -215,47 +221,52 @@ def test_invalid_input_stream_factory(self, args, expected): with pytest.raises(expected): _ = input_stream_factory(*full_args) - # @pytest.mark.parametrize( - # "args,expected", - # [ - # (["TCP", "127.0.0.1", 1234], PortOutputStream), - # (["TCP", "localhost", 1234], PortOutputStream), - # (["TCP", "foo", 1234], PortOutputStream), - # (["UDP", "127.0.0.1", 1234], PortOutputStream), - # (["UDP", "localhost", 1234], PortOutputStream), - # (["UDP", "foo", 1234], PortOutputStream), - # ([1234], PortOutputStream), - # (1234, PortOutputStream), - # ([], ZMQStream), - # (None, ZMQStream), - # ], - # ) - # def test_valid_output_stream_factory(self, args, expected): - # full_args = [ - # "foo", - # "bar", - # args, - # [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], - # {"zmq_context": broker.context}, - # ] - # stream = output_stream_factory(*full_args) - # assert isinstance(stream, expected) + @pytest.mark.parametrize( + "args,expected", + [ + (["TCP:127.0.0.1:1234"], TCPOutputStream), + (["TCP:localhost:1234"], TCPOutputStream), + (["TCP:foo:1234"], TCPOutputStream), + (["UDP:127.0.0.1:1234"], UDPOutputStream), + (["UDP:localhost:1234"], UDPOutputStream), + (["UDP:foo:1234"], UDPOutputStream), + ([1234], UDPOutputStream), + (1234, UDPOutputStream), + ("UDP:foo:1234", UDPOutputStream), + ([], ZMQStream), + (None, ZMQStream), + ( + [1234, "TCP:foo:1234"], + UDPOutputStream, + ), # Technically valid but not really correct + ], + ) + def test_valid_output_stream_factory(self, args, expected): + full_args = [ + "foo", + "bar", + args, + [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], + {"zmq_context": broker.context}, + ] + stream = output_stream_factory(*full_args) + assert isinstance(stream, expected) - # @pytest.mark.parametrize( - # "args,expected", - # [ - # (["FOO", "127.0.0.1", 1234], ValueError), - # (["UDP", "127.0.0.1", "1234"], ValueError), - # (["UDP", 1, "1234"], ValueError), - # ], - # ) - # def test_invalid_output_stream_factory(self, args, expected): - # full_args = [ - # "foo", - # "bar", - # args, - # [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], - # {"zmq_context": broker.context}, - # ] - # with pytest.raises(expected): - # _ = output_stream_factory(*full_args) + @pytest.mark.parametrize( + "args,expected", + [ + (["FOO:127.0.0.1:1234"], ValueError), + (["UDP", "127.0.0.1", "1234"], ValueError), + (["FOO"], ValueError), + ], + ) + def test_invalid_output_stream_factory(self, args, expected): + full_args = [ + "foo", + "bar", + args, + [PacketHandler(input_type=int, output_type=str, packet="CCSDS_HEADER")], + {"zmq_context": broker.context}, + ] + with pytest.raises(expected): + _ = output_stream_factory(*full_args) From 1ccef5af4b5e2328394c5a6aa9e9893057d64006 Mon Sep 17 00:00:00 2001 From: Cameron Jackson Date: Thu, 24 Oct 2024 10:58:01 -0700 Subject: [PATCH 08/12] update documentation --- doc/source/server_architecture.rst | 64 +++++++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 6 deletions(-) diff --git a/doc/source/server_architecture.rst b/doc/source/server_architecture.rst index 895d1ef1..e1a542e3 100644 --- a/doc/source/server_architecture.rst +++ b/doc/source/server_architecture.rst @@ -61,21 +61,29 @@ AIT provides a number of default plugins. Check the `Plugins API documentation < Streams ^^^^^^^ - Streams must be listed under either **inbound-streams** or **outbound-streams**, and must have a **name**. -- **Inbound streams** can have an integer port or inbound streams as their **input**. Inbound streams can have multiple inputs. A port input should always be listed as the first input to an inbound stream. +- **Inbound streams** can have an address specification or inbound streams as their **input**. Inbound streams can have multiple inputs. - The server sets up an input stream that emits properly formed telemetry packet messages over a globally configured topic. This is used internally by the ground script API for telemetry monitoring. The input streams that pass data to this stream must output data in the Packet UID annotated format that the core packet handlers use. The input streams used can be configured via the **server.api-telemetry-streams** field. If no configuration is provided the server will default to all valid input streams if possible. See :ref:`the Ground Script API documentation ` for additional information. - **Outbound streams** can have plugins or outbound streams as their **input**. Outbound streams can have multiple inputs. - - Outbound streams also have the option to **output** to an integer port (see :ref:`example config below `). + - Outbound streams also have the option to **output** to an address specification (see :ref:`example config below `). - The server exposes an entry point for commands submitted by other processes. During initialization, this entry point will be connected to a single outbound stream, either explicitly declared by the stream (by setting the **command-subscriber** field; see :ref:`example config below `), or decided by the server (select the first outbound stream in the configuration file). - Streams can have any number of **handlers**. A stream passes each received *packet* through its handlers in order and publishes the result. -- There are several stream classes that inherit from the base stream class. These child classes exist for handling the input and output of streams differently based on whether the inputs/output are ports or other streams and plugins. The appropriate stream type will be instantiated based on whether the stream is an inbound or outbound stream and based on the inputs/output specified in the stream's configs. If the input type of an inbound stream is an integer, it will be assumed to be a port. If it is a string, it will be assumed to be another stream name or plugin. Only outbound streams can have an output, and the output must be a port, not another stream or plugin. +- There are several stream classes that inherit from the base stream class. These child classes exist for handling the input and output of streams differently based on whether the inputs/output are remote hosts, ports or other streams and plugins. The appropriate stream type will be instantiated based on whether the stream is an inbound or outbound stream and based on the inputs/output specified in the stream's configs. Only outbound streams can have an output, and the output must be an address specification, not another stream or plugin. .. _Stream_config: +TCP/UDP Address Specification: + +.. code-block:: none + + [TCP|UDP|tcp|udp]:[0.0.0.0|127.0.0.1|server|localhost]:[1024 - 65535] # UDP/TCP Server Spec + + [TCP|tcp]:[remote hostname|remote ip]:[1024 - 65535] # TCP Client Spec + Example configuration: .. code-block:: none @@ -86,17 +94,42 @@ Example configuration: input: - 3077 + # UDP Input Server - stream: - name: telem_port_in_stream + name: telem_port_in_stream_1 input: - 3076 handlers: - my_custom_handlers.TestbedTelemHandler + # UDP Input Server + - stream: + name: telem_port_in_stream_2 + input: + - "UDP:server:3077" + handlers: + - my_custom_handlers.TestbedTelemHandler + + # TCP Input Server + - stream: + name: telem_port_in_stream_3 + input: + - "TCP:server:3078" + handlers: + - my_custom_handlers.TestbedTelemHandler + + # TCP Input Client + - stream: + name: telem_port_in_stream_4 + input: + - "TCP:1.2.3.4:3079 + handlers: + - my_custom_handlers.TestbedTelemHandler + - stream: name: telem_testbed_stream input: - - telem_port_in_stream + - telem_port_in_stream_1 handlers: - name: ait.server.handlers.PacketHandler packet: 1553_HS_Packet @@ -114,14 +147,33 @@ Example configuration: - name: my_custom_handlers.FlightlikeCommandHandler command-subscriber: True + # UDP Output to localhost:3075 - stream: - name: command_port_out_stream + name: command_port_out_stream_1 input: - command_testbed_stream - command_flightlike_stream output: - 3075 + # UDP Output to remote host + - stream: + name: command_port_out_stream_2 + input: + - command_testbed_stream + - command_flightlike_stream + output: + - "UDP:1.2.3.4:3075" + + # TCP Output to remote host + - stream: + name: command_port_out_stream_3 + input: + - command_testbed_stream + - command_flightlike_stream + output: + - "TCP:1.2.3.4:3075" + Handlers ^^^^^^^^ From 8291dab6cff6b46a398ad6b512e38898ccebf51d Mon Sep 17 00:00:00 2001 From: Cameron Jackson Date: Thu, 24 Oct 2024 11:10:03 -0700 Subject: [PATCH 09/12] dont subscribe to valid address specs --- ait/core/server/broker.py | 7 ++++++- docker/network-test-config.yaml | 8 ++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/ait/core/server/broker.py b/ait/core/server/broker.py index 6908c7af..92ec2ad9 100644 --- a/ait/core/server/broker.py +++ b/ait/core/server/broker.py @@ -9,6 +9,7 @@ import ait.core.server from ait.core import log from .config import ZmqConfig +from .utils import is_valid_address_spec class Broker(gevent.Greenlet): @@ -69,7 +70,11 @@ def _subscribe_all(self): """ for stream in self.inbound_streams + self.outbound_streams: for input_ in stream.inputs: - if not type(input_) is int and input_ is not None: + if ( + not type(input_) is int + and input_ is not None + and not is_valid_address_spec(input_) + ): Broker.subscribe(stream, input_) for plugin in self.plugins: diff --git a/docker/network-test-config.yaml b/docker/network-test-config.yaml index 8cc3cb1c..8f80629a 100644 --- a/docker/network-test-config.yaml +++ b/docker/network-test-config.yaml @@ -16,6 +16,7 @@ default: - input_stream_debug_1 - input_stream_debug_2 - input_stream_debug_3 + - input_stream_debug_5 handlers: - name: ait.core.server.handlers.DebugHandler handler_name: "Input Stream Handler" @@ -45,6 +46,13 @@ default: - name: ait.core.server.handlers.DebugHandler handler_name: "TCP Input Client Stream 3" + - stream: + name: input_stream_debug_5 + input: 6969 + handlers: + - name: ait.core.server.handlers.DebugHandler + handler_name: "TCP Input Client Stream 4" + outbound-streams: - stream: name: output_stream_debug_1 From cc3803543b0ab4741423a9a760fb34a8e92c3e8b Mon Sep 17 00:00:00 2001 From: Cameron Jackson Date: Thu, 24 Oct 2024 11:14:00 -0700 Subject: [PATCH 10/12] remove docker test harness --- Makefile | 14 ----- docker/Dockerfile | 103 -------------------------------- docker/docker-compose.yaml | 91 ---------------------------- docker/network-test-config.yaml | 69 --------------------- scripts/network_tester.py | 73 ---------------------- 5 files changed, 350 deletions(-) delete mode 100644 Makefile delete mode 100644 docker/Dockerfile delete mode 100644 docker/docker-compose.yaml delete mode 100644 docker/network-test-config.yaml delete mode 100644 scripts/network_tester.py diff --git a/Makefile b/Makefile deleted file mode 100644 index 3137234a..00000000 --- a/Makefile +++ /dev/null @@ -1,14 +0,0 @@ -SHELL := /bin/bash - -clean: - @cd docker; docker compose down - -network-test: - @cd docker; docker compose up --build --detach - @echo "network test running" - -logs: - @cd docker; docker compose logs --follow - -bash: - @cd docker; docker compose exec ait-server bash diff --git a/docker/Dockerfile b/docker/Dockerfile deleted file mode 100644 index 9a1cebb4..00000000 --- a/docker/Dockerfile +++ /dev/null @@ -1,103 +0,0 @@ -# FROM redhat/ubi8:latest - -# ENV LOG_LEVEL=INFO -# ARG USER=ait -# ARG GROUP=ait -# ARG UID=1001 -# ARG GID=1001 -# ARG HOME=/home/$USER -# ENV PROJECT_HOME=/home/$USER - -# RUN dnf install -y python3.9 python3-pip \ -# && yum install -y nc \ -# && groupadd -r -g ${GID} ${GROUP} \ -# && useradd -m -u ${UID} -g ${GROUP} ${USER} - -# USER ait -# WORKDIR $PROJECT_HOME -# COPY --chown=${USER}:${GROUP} . $PROJECT_HOME/AIT-Core -# RUN python3.9 -m pip install --user --upgrade pip setuptools virtualenvwrapper virtualenv poetry \ -# && echo 'export PATH="${PROJECT_HOME}/.local/bin:$PATH"' >> ~/.bashrc \ -# && echo 'export VIRTUALENVWRAPPER_PYTHON=/usr/bin/python3.9' >> ~/.bashrc \ -# && echo 'export WORKON_HOME=${PROJECT_HOME}/.virtualenvs' >> ~/.bashrc \ -# && echo 'export PROJECT_HOME=${PROJECT_HOME}' >> ~/.bashrc \ -# && echo 'export VIRTUALENVWRAPPER_VIRTUALENV=${PROJECT_HOME}/.local/bin/virtualenv' >> ~/.bashrc \ -# && echo 'source ${PROJECT_HOME}/.local/bin/virtualenvwrapper.sh' >> ~/.bashrc \ -# && source ~/.bashrc \ -# && cd $PROJECT_HOME \ -# && echo 'if [ $VIRTUAL_ENV == "${PROJECT_HOME}/.virtualenvs/ait" ]; then' >> $PROJECT_HOME/.virtualenvs/postactivate \ -# && echo 'export AIT_ROOT=${PROJECT_HOME}/AIT-Core' >> $PROJECT_HOME/.virtualenvs/postactivate \ -# && echo 'export AIT_CONFIG=${PROJECT_HOME}/AIT-Core/docker/network-test-config.yaml' >> $PROJECT_HOME/.virtualenvs/postactivate \ -# && echo 'fi' >> $PROJECT_HOME/.virtualenvs/postactivate \ -# && cd AIT-Core \ -# && mkvirtualenv ait \ -# && poetry install -# ENTRYPOINT ["/usr/bin/bash","-c"] -# CMD ["source /home/ait/.bashrc && cd AIT-Core && workon ait && ait-server"] -#CMD ["sleep infinity"] - - -FROM redhat/ubi8:latest - -ENV LOG_LEVEL=INFO -ENV POETRY_VIRTUALENVS_CREATE=false -ENV PROJECT_HOME=/AIT-Core - -RUN dnf install -y python3.9 python3-pip \ - && yum install -y nc \ - && ln -sf /usr/bin/python3.9 /usr/bin/python - -WORKDIR $PROJECT_HOME -RUN python3.9 -m pip install --upgrade pip setuptools poetry -COPY poetry.lock pyproject.toml $PROJECT_HOME/ - -# Cache the install of all deps except for the root module -WORKDIR $PROJECT_HOME/ -RUN poetry install --no-interaction --no-ansi --no-root - -WORKDIR $PROJECT_HOME -COPY ait $PROJECT_HOME/ait -COPY config $PROJECT_HOME/config -COPY doc $PROJECT_HOME/doc -COPY docker $PROJECT_HOME/docker -COPY openmct $PROJECT_HOME/openmct -COPY poetry_cli $PROJECT_HOME/poetry_cli -COPY scripts $PROJECT_HOME/scripts -COPY sequences $PROJECT_HOME/sequences -COPY README.rst $PROJECT_HOME/ -COPY setup.cfg $PROJECT_HOME/tests -RUN echo 'export AIT_ROOT=${PROJECT_HOME}' >> ~/.bashrc \ - && echo 'export AIT_CONFIG=${PROJECT_HOME}/docker/network-test-config.yaml' >> ~/.bashrc \ - && echo 'export POETRY_VIRTUALENVS_CREATE=false' >> ~/.bashrc \ - && poetry install --no-interaction --no-ansi -ENTRYPOINT ["/usr/bin/bash","-c"] -#CMD ["source ~/.bashrc && ait-server"] - -# FROM redhat/ubi8:latest - -# ENV LOG_LEVEL=INFO -# ENV PROJECT_HOME=/AIT-Core -# ENV POETRY_VIRTUALENVS_CREATE=false -# RUN dnf install -y python3.9 python3-pip \ -# && yum install -y nc \ -# && ln -sf /usr/bin/python3.9 /usr/bin/python - -# WORKDIR $PROJECT_HOME -# RUN python3.9 -m pip install --upgrade pip setuptools poetry -# COPY poetry.lock pyproject.toml $PROJECT_HOME/ - -# # Cache the install of all deps except for the root module -# WORKDIR $PROJECT_HOME -# RUN poetry install --no-interaction --no-ansi --no-root - -# WORKDIR $PROJECT_HOME -# COPY ait $PROJECT_HOME/ait -# COPY docker $PROJECT_HOME/docker -# COPY scripts $PROJECT_HOME/scripts -# COPY config $PROJECT_HOME/config -# RUN echo 'export PATH="${PROJECT_HOME}/.local/bin:$PATH"' >> ~/.bashrc \ -# && echo 'export AIT_ROOT=${PROJECT_HOME}' >> ~/.bashrc \ -# && echo 'export AIT_CONFIG=${PROJECT_HOME}/docker/network-test-config.yaml' >> ~/.bashrc \ -# && echo 'export POETRY_VIRTUALENVS_CREATE=false' >> ~/.bashrc \ -# && poetry install --no-interaction --no-ansi -# ENTRYPOINT ["/usr/bin/bash","-c"] diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml deleted file mode 100644 index b256b38f..00000000 --- a/docker/docker-compose.yaml +++ /dev/null @@ -1,91 +0,0 @@ -services: - ait-server: - container_name: ait-server - platform: linux/amd64 - build: - context: ../ - dockerfile: ./docker/Dockerfile - depends_on: [tcp-server-send, tcp-server, udp-server] - command: ["source ~/.bashrc && ait-server"] - environment: - LOG_LEVEL: INFO - networks: - - ait - - # tcp-client: - # container_name: tcp-client - # platform: linux/amd64 - # build: - # context: ../ - # dockerfile: ./docker/Dockerfile - # depends_on: [ait-server] - # command: ["source /home/ait/.bashrc && cd AIT-Core && workon ait && python scripts/network_tester.py client TCP ait-server 1234"] - # networks: - # - ait - - udp-client-1: - container_name: udp-client-1 - platform: linux/amd64 - build: - context: ../ - dockerfile: ./docker/Dockerfile - depends_on: [ait-server] - command: ["source ~/.bashrc && python scripts/network_tester.py client UDP ait-server 1234"] - networks: - - ait - - tcp-client-2: - container_name: tcp-client-2 - platform: linux/amd64 - build: - context: ../ - dockerfile: ./docker/Dockerfile - depends_on: [ait-server] - command: ["source ~/.bashrc && python scripts/network_tester.py client TCP ait-server 1235"] - networks: - - ait - - # udp-client-2: - # container_name: udp-client-2 - # platform: linux/amd64 - # build: - # context: ../ - # dockerfile: ./docker/Dockerfile - # depends_on: [ait-server] - # command: ["source ~/.bashrc && python scripts/network_tester.py client UDP ait-server 1235"] - # networks: - # - ait - - tcp-server: - container_name: tcp-server - platform: linux/amd64 - build: - context: ../ - dockerfile: ./docker/Dockerfile - command: ["source ~/.bashrc && python -u scripts/network_tester.py server TCP 0.0.0.0 1237"] - networks: - - ait - - tcp-server-send: - container_name: tcp-server-send - platform: linux/amd64 - build: - context: ../ - dockerfile: ./docker/Dockerfile - command: ["source ~/.bashrc && python scripts/network_tester.py server TCP-SEND 0.0.0.0 1236"] - networks: - - ait - - udp-server: - container_name: udp-server - platform: linux/amd64 - build: - context: ../ - dockerfile: ./docker/Dockerfile - command: ["source ~/.bashrc && python -u scripts/network_tester.py server UDP 0.0.0.0 1238"] - networks: - - ait - -networks: - ait: - name: ait diff --git a/docker/network-test-config.yaml b/docker/network-test-config.yaml deleted file mode 100644 index 8f80629a..00000000 --- a/docker/network-test-config.yaml +++ /dev/null @@ -1,69 +0,0 @@ -default: - - cmddict: - filename: ../config/cmd.yaml - - tlmdict: - filename: ../config/tlm.yaml - - server: - - inbound-streams: - - - stream: - name: input_stream_debug_4 - input: - - input_stream_debug_1 - - input_stream_debug_2 - - input_stream_debug_3 - - input_stream_debug_5 - handlers: - - name: ait.core.server.handlers.DebugHandler - handler_name: "Input Stream Handler" - - - stream: - name: input_stream_debug_1 - input: - - "UDP:0.0.0.0:1234" - handlers: - - name: ait.core.server.handlers.DebugHandler - handler_name: "UDP Input Server Stream 1" - - - - stream: - name: input_stream_debug_2 - input: - - "TCP:0.0.0.0:1235" - handlers: - - name: ait.core.server.handlers.DebugHandler - handler_name: "TCP Input Server Stream 2" - - - stream: - name: input_stream_debug_3 - input: - - "TCP:tcp-server-send:1236" - handlers: - - name: ait.core.server.handlers.DebugHandler - handler_name: "TCP Input Client Stream 3" - - - stream: - name: input_stream_debug_5 - input: 6969 - handlers: - - name: ait.core.server.handlers.DebugHandler - handler_name: "TCP Input Client Stream 4" - - outbound-streams: - - stream: - name: output_stream_debug_1 - input: - - input_stream_debug_1 - output: - - "TCP:tcp-server:1237" - - - stream: - name: output_stream_debug_2 - input: - - input_stream_debug_2 - output: - - "UDP:udp-server:1238" diff --git a/scripts/network_tester.py b/scripts/network_tester.py deleted file mode 100644 index e726fc14..00000000 --- a/scripts/network_tester.py +++ /dev/null @@ -1,73 +0,0 @@ -import socket -import sys -import time - -RATE = 1 # Do a little throttling so we dont completely thrash the server -BUFF_SIZE = 1024 -TEST_DATA = b'U'*BUFF_SIZE -def main(mode,protocol,host,port): - if mode == "server": - if protocol == "TCP" or protocol == "TCP-SEND": - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind((host, port)) - sock.listen() - connection, address = sock.accept() - with connection: - if protocol == "TCP": - ts = time.time() - data_size = 0 - while True: - buf = connection.recv(BUFF_SIZE) - data_size += len(buf) - te = time.time() - print(f"Received {data_size} bytes from {address} - est data rate: {data_size / (te-ts)}") - else: - while True: - connection.sendall(TEST_DATA) - time.sleep(RATE) - if protocol == "UDP": - server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - server_socket.bind((host, port)) - ts = time.time() - data_size = 0 - while True: - buf, address = server_socket.recvfrom(BUFF_SIZE) - data_size += len(buf) - te = time.time() - print(f"Received {data_size} bytes from {address} - est data rate: {data_size / (te-ts)}") - if mode == "client": - if protocol == "UDP": - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - while True: - try: - print(f"Sent {len(TEST_DATA)} bytes to {host}:{port}") - sock.sendto(TEST_DATA, (host, port)) - time.sleep(RATE) - except Exception as e: - print(e) - continue - if protocol == "TCP": - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - connected = False - while not connected: - try: - sock.connect((host, port)) - connected = True - except Exception as e: - print("retrying connection") - time.sleep(1) - while True: - try: - sock.send(TEST_DATA) - time.sleep(RATE) - except Exception as e: - print(e) - continue - -if __name__ == "__main__": - print(sys.argv) - mode = sys.argv[1] - protocol = sys.argv[2] - host = sys.argv[3] - port = sys.argv[4] - sys.exit(main(mode,protocol,host,int(port))) From b04ecd1eac1169a667071508df2f63860eb2e51d Mon Sep 17 00:00:00 2001 From: Cameron Jackson Date: Thu, 24 Oct 2024 11:16:23 -0700 Subject: [PATCH 11/12] remove debug handler --- ait/core/server/handlers/__init__.py | 1 - ait/core/server/handlers/debug_handler.py | 12 ------------ 2 files changed, 13 deletions(-) delete mode 100644 ait/core/server/handlers/debug_handler.py diff --git a/ait/core/server/handlers/__init__.py b/ait/core/server/handlers/__init__.py index cbd5094f..91215123 100644 --- a/ait/core/server/handlers/__init__.py +++ b/ait/core/server/handlers/__init__.py @@ -1,3 +1,2 @@ from .ccsds_packet_handler import * # noqa -from .debug_handler import * # noqa from .packet_handler import * # noqa diff --git a/ait/core/server/handlers/debug_handler.py b/ait/core/server/handlers/debug_handler.py deleted file mode 100644 index aab598b4..00000000 --- a/ait/core/server/handlers/debug_handler.py +++ /dev/null @@ -1,12 +0,0 @@ -import ait.core.log -from ait.core.server.handler import Handler - - -class DebugHandler(Handler): - def __init__(self, input_type=None, output_type=None, **kwargs): - super(DebugHandler, self).__init__(input_type, output_type) - self.handler_name = kwargs.get("handler_name", "DebugHandler") - - def handle(self, input_data): - ait.core.log.info(f"{self.handler_name} received {len(input_data)} bytes") - return input_data From 3892d20c7cf891a199bef69c914776a82cbda010 Mon Sep 17 00:00:00 2001 From: Cameron Jackson Date: Thu, 24 Oct 2024 11:29:31 -0700 Subject: [PATCH 12/12] sonar qube --- ait/core/server/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ait/core/server/client.py b/ait/core/server/client.py index 60718d2d..bfd6297c 100644 --- a/ait/core/server/client.py +++ b/ait/core/server/client.py @@ -351,7 +351,7 @@ def __init__( else: raise (ValueError("TCPInputClient: Invalid Specification")) - def __exit__(self): + def __exit__(self, exc_type, exc_val, exc_tb): try: if self.sub: self.sub.close()