From 08151282c8bb45bc081823066d61384bef6f79cb Mon Sep 17 00:00:00 2001 From: Raymond Douglass Date: Thu, 15 Jul 2021 17:18:58 -0400 Subject: [PATCH 01/11] DOC Update to v0.22 From 006886d44b89392f3a0c938fa5bfc2dc7dfff3ac Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 20 Aug 2021 14:30:39 +0200 Subject: [PATCH 02/11] Cancel waiting `am_recv` if endpoint closes (#772) * Cancel waiting am_recv if endpoint closes * Add AM in test_shutdown * Skip test_shutdown AM tests if not supported by UCX * Directly access `worker._am_recv_wait` in endpoint finalizer * Add missing AM support check in endpoint finalizer * Fix test_listener_close --- tests/test_shutdown.py | 96 ++++++++++++++++++++++++-------------- ucp/_libs/transfer_am.pyx | 1 - ucp/_libs/ucx_endpoint.pyx | 26 +++++++++-- 3 files changed, 82 insertions(+), 41 deletions(-) diff --git a/tests/test_shutdown.py b/tests/test_shutdown.py index 23e414af6..f80f40f92 100644 --- a/tests/test_shutdown.py +++ b/tests/test_shutdown.py @@ -25,23 +25,45 @@ def event_loop(scope="function"): loop.close() +def _skip_if_not_supported(message_type): + if message_type == "am" and not ucp._libs.ucx_api.is_am_supported(): + pytest.skip("AM only supported in UCX >= 1.11") + + +async def _shutdown_send(ep, message_type): + msg = np.arange(10 ** 6) + if message_type == "tag": + await ep.send(msg) + else: + await ep.am_send(msg) + + +async def _shutdown_recv(ep, message_type): + if message_type == "tag": + msg = np.empty(10 ** 6) + await ep.recv(msg) + else: + await ep.am_recv() + + @pytest.mark.asyncio -async def test_server_shutdown(): +@pytest.mark.parametrize("message_type", ["tag", "am"]) +async def test_server_shutdown(message_type): """The server calls shutdown""" endpoint_error_handling = ucp.get_ucx_version() >= (1, 10, 0) + _skip_if_not_supported(message_type) + async def server_node(ep): - msg = np.empty(10 ** 6) with pytest.raises(ucp.exceptions.UCXCanceled): - await asyncio.gather(ep.recv(msg), ep.close()) + await asyncio.gather(_shutdown_recv(ep, message_type), ep.close()) async def client_node(port): ep = await ucp.create_endpoint( ucp.get_address(), port, endpoint_error_handling=endpoint_error_handling ) - msg = np.empty(10 ** 6) with pytest.raises(ucp.exceptions.UCXCanceled): - await ep.recv(msg) + await _shutdown_recv(ep, message_type) listener = ucp.create_listener( server_node, endpoint_error_handling=endpoint_error_handling @@ -53,22 +75,23 @@ async def client_node(port): sys.version_info < (3, 7), reason="test currently fails for python3.6" ) @pytest.mark.asyncio -async def test_client_shutdown(): +@pytest.mark.parametrize("message_type", ["tag", "am"]) +async def test_client_shutdown(message_type): """The client calls shutdown""" endpoint_error_handling = ucp.get_ucx_version() >= (1, 10, 0) + _skip_if_not_supported(message_type) + async def client_node(port): ep = await ucp.create_endpoint( ucp.get_address(), port, endpoint_error_handling=endpoint_error_handling ) - msg = np.empty(10 ** 6) with pytest.raises(ucp.exceptions.UCXCanceled): - await asyncio.gather(ep.recv(msg), ep.close()) + await asyncio.gather(_shutdown_recv(ep, message_type), ep.close()) async def server_node(ep): - msg = np.empty(10 ** 6) with pytest.raises(ucp.exceptions.UCXCanceled): - await ep.recv(msg) + await _shutdown_recv(ep, message_type) listener = ucp.create_listener( server_node, endpoint_error_handling=endpoint_error_handling @@ -77,26 +100,28 @@ async def server_node(ep): @pytest.mark.asyncio -async def test_listener_close(): +@pytest.mark.parametrize("message_type", ["tag", "am"]) +async def test_listener_close(message_type): """The server close the listener""" endpoint_error_handling = ucp.get_ucx_version() >= (1, 10, 0) + _skip_if_not_supported(message_type) + async def client_node(listener): ep = await ucp.create_endpoint( ucp.get_address(), listener.port, endpoint_error_handling=endpoint_error_handling, ) - msg = np.empty(100, dtype=np.int64) - await ep.recv(msg) - await ep.recv(msg) + await _shutdown_recv(ep, message_type) + await _shutdown_recv(ep, message_type) assert listener.closed() is False listener.close() assert listener.closed() is True async def server_node(ep): - await ep.send(np.arange(100, dtype=np.int64)) - await ep.send(np.arange(100, dtype=np.int64)) + await _shutdown_send(ep, message_type) + await _shutdown_send(ep, message_type) listener = ucp.create_listener( server_node, endpoint_error_handling=endpoint_error_handling @@ -105,13 +130,16 @@ async def server_node(ep): @pytest.mark.asyncio -async def test_listener_del(): +@pytest.mark.parametrize("message_type", ["tag", "am"]) +async def test_listener_del(message_type): """The client delete the listener""" endpoint_error_handling = ucp.get_ucx_version() >= (1, 10, 0) + _skip_if_not_supported(message_type) + async def server_node(ep): - await ep.send(np.arange(100, dtype=np.int64)) - await ep.send(np.arange(100, dtype=np.int64)) + await _shutdown_send(ep, message_type) + await _shutdown_send(ep, message_type) listener = ucp.create_listener( server_node, endpoint_error_handling=endpoint_error_handling @@ -121,21 +149,23 @@ async def server_node(ep): listener.port, endpoint_error_handling=endpoint_error_handling, ) - msg = np.empty(100, dtype=np.int64) - await ep.recv(msg) + await _shutdown_recv(ep, message_type) assert listener.closed() is False del listener - await ep.recv(msg) + await _shutdown_recv(ep, message_type) @pytest.mark.asyncio -async def test_close_after_n_recv(): +@pytest.mark.parametrize("message_type", ["tag", "am"]) +async def test_close_after_n_recv(message_type): """The Endpoint.close_after_n_recv()""" endpoint_error_handling = ucp.get_ucx_version() >= (1, 10, 0) + _skip_if_not_supported(message_type) + async def server_node(ep): for _ in range(10): - await ep.send(np.arange(10)) + await _shutdown_send(ep, message_type) async def client_node(port): ep = await ucp.create_endpoint( @@ -143,40 +173,34 @@ async def client_node(port): ) ep.close_after_n_recv(10) for _ in range(10): - msg = np.empty(10) - await ep.recv(msg) + await _shutdown_recv(ep, message_type) assert ep.closed() ep = await ucp.create_endpoint( ucp.get_address(), port, endpoint_error_handling=endpoint_error_handling ) for _ in range(5): - msg = np.empty(10) - await ep.recv(msg) + await _shutdown_recv(ep, message_type) ep.close_after_n_recv(5) for _ in range(5): - msg = np.empty(10) - await ep.recv(msg) + await _shutdown_recv(ep, message_type) assert ep.closed() ep = await ucp.create_endpoint( ucp.get_address(), port, endpoint_error_handling=endpoint_error_handling ) for _ in range(5): - msg = np.empty(10) - await ep.recv(msg) + await _shutdown_recv(ep, message_type) ep.close_after_n_recv(10, count_from_ep_creation=True) for _ in range(5): - msg = np.empty(10) - await ep.recv(msg) + await _shutdown_recv(ep, message_type) assert ep.closed() ep = await ucp.create_endpoint( ucp.get_address(), port, endpoint_error_handling=endpoint_error_handling ) for _ in range(10): - msg = np.empty(10) - await ep.recv(msg) + await _shutdown_recv(ep, message_type) with pytest.raises( ucp.exceptions.UCXError, match="`n` cannot be less than current recv_count", diff --git a/ucp/_libs/transfer_am.pyx b/ucp/_libs/transfer_am.pyx index 914c6ce39..1beb4ffe0 100644 --- a/ucp/_libs/transfer_am.pyx +++ b/ucp/_libs/transfer_am.pyx @@ -216,7 +216,6 @@ def am_recv_nb( cb_kwargs = {} if Feature.AM not in worker._context._feature_flags: raise ValueError("UCXContext must be created with `Feature.AM`") - cdef bint cuda_support am_recv_pool = worker._am_recv_pool ep_as_int = int(ep._handle) diff --git a/ucp/_libs/ucx_endpoint.pyx b/ucp/_libs/ucx_endpoint.pyx index 3a00e5487..2be83462b 100644 --- a/ucp/_libs/ucx_endpoint.pyx +++ b/ucp/_libs/ucx_endpoint.pyx @@ -12,7 +12,7 @@ from libc.stdio cimport FILE from .ucx_api_dep cimport * -from ..exceptions import UCXConnectionReset, UCXError +from ..exceptions import UCXCanceled, UCXConnectionReset, UCXError logger = logging.getLogger("ucx") @@ -57,8 +57,8 @@ def _ucx_endpoint_finalizer( uintptr_t handle_as_int, uintptr_t status_handle_as_int, bint endpoint_error_handling, - worker, - set inflight_msgs + UCXWorker worker, + set inflight_msgs, ): assert worker.initialized cdef ucp_ep_h handle = handle_as_int @@ -83,6 +83,24 @@ def _ucx_endpoint_finalizer( # Notice, `request_cancel()` evoke the send/recv callback functions worker.request_cancel(req) + # Cancel waiting `am_recv` calls + cdef dict recv_wait + if is_am_supported() and handle_as_int in worker._am_recv_wait: + while len(worker._am_recv_wait[handle_as_int]) > 0: + recv_wait = worker._am_recv_wait[handle_as_int].pop(0) + cb_func = recv_wait["cb_func"] + cb_args = recv_wait["cb_args"] + cb_kwargs = recv_wait["cb_kwargs"] + + logger.debug("Cancelling am_recv wait on ep %s" % hex(int(handle_as_int))) + + cb_func( + None, + UCXCanceled("While waiting for am_recv the endpoint was closed"), + *cb_args, + **cb_kwargs + ) + # Close the endpoint cdef str msg cdef unsigned close_mode = UCP_EP_CLOSE_MODE_FLUSH @@ -150,7 +168,7 @@ cdef class UCXEndpoint(UCXObject): int(ep_status), endpoint_error_handling, worker, - self._inflight_msgs + self._inflight_msgs, ) worker.add_child(self) From 0be5cfa186da0219d9e5496d0abc1afd6c2911cb Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 20 Aug 2021 17:48:34 +0200 Subject: [PATCH 03/11] Fix UCXAddress finalizer (#773) * Fix UCXAddress finalizer * Simplify UCXAddress __cinit__ type-casting * Update typedef for UCXAddress _length attribute Co-authored-by: jakirkham Co-authored-by: jakirkham --- ucp/_libs/ucx_address.pyx | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/ucp/_libs/ucx_address.pyx b/ucp/_libs/ucx_address.pyx index aeb16f8aa..4cad20607 100644 --- a/ucp/_libs/ucx_address.pyx +++ b/ucp/_libs/ucx_address.pyx @@ -12,20 +12,42 @@ from .arr cimport Array from .ucx_api_dep cimport * -cdef class UCXAddress: +def _ucx_address_finalizer( + uintptr_t handle_as_int, + uintptr_t worker_handle_as_int, +): + cdef ucp_address_t *address = handle_as_int + cdef ucp_worker_h worker = worker_handle_as_int + if worker_handle_as_int != 0: + ucp_worker_release_address(worker, address) + else: + free(address) + + +cdef class UCXAddress(UCXObject): """Python representation of ucp_address_t""" cdef ucp_address_t *_address - cdef Py_ssize_t _length + cdef size_t _length - def __cinit__(self, uintptr_t address_as_int, Py_ssize_t length): + def __cinit__( + self, + uintptr_t address_as_int, + size_t length, + UCXWorker worker=None, + ): address = address_as_int # Copy address to `self._address` self._address = malloc(length) self._length = length memcpy(self._address, address, length) - def __dealloc__(self): - free(self._address) + self.add_handle_finalizer( + _ucx_address_finalizer, + int(self._address), + 0 if worker is None else worker.handle, + ) + if worker is not None: + worker.add_child(self) @classmethod def from_buffer(cls, buffer): @@ -41,10 +63,7 @@ cdef class UCXAddress: cdef size_t length status = ucp_worker_get_address(ucp_worker, &address, &length) assert_ucs_status(status) - try: - return UCXAddress(int(address), length) - finally: - ucp_worker_release_address(ucp_worker, address) + return UCXAddress(address, length, worker=worker) @property def address(self): From 194260b3d21cd143dbc88e42750dc6a2304fe7b5 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 24 Aug 2021 10:50:26 +0200 Subject: [PATCH 04/11] Update RAPIDS version in CI (#776) --- ci/gpu/build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index ad297a802..71291f396 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -26,7 +26,7 @@ export HOME=$WORKSPACE cd $WORKSPACE export GIT_DESCRIBE_TAG=`git describe --tags` export MINOR_VERSION=`echo $GIT_DESCRIBE_TAG | grep -o -E '([0-9]+\.[0-9]+)'` -export RAPIDS_VERSION="21.08" +export RAPIDS_VERSION="21.10" export UCX_PATH=$CONDA_PREFIX ################################################################################ From 8152bec788d6c9f24a15a1e3266eae57fa36c1a7 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 24 Aug 2021 16:38:48 +0200 Subject: [PATCH 05/11] Add missing check for enabled AM in endpoint finalizer (#774) --- ucp/_libs/ucx_endpoint.pyx | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ucp/_libs/ucx_endpoint.pyx b/ucp/_libs/ucx_endpoint.pyx index 2be83462b..373002226 100644 --- a/ucp/_libs/ucx_endpoint.pyx +++ b/ucp/_libs/ucx_endpoint.pyx @@ -64,6 +64,9 @@ def _ucx_endpoint_finalizer( cdef ucp_ep_h handle = handle_as_int cdef ucs_status_ptr_t status cdef ucs_status_t ep_status + cdef bint am_enabled = ( + is_am_supported() and Feature.AM in worker._context._feature_flags + ) if status_handle_as_int == NULL: ep_status = UCS_OK @@ -85,7 +88,7 @@ def _ucx_endpoint_finalizer( # Cancel waiting `am_recv` calls cdef dict recv_wait - if is_am_supported() and handle_as_int in worker._am_recv_wait: + if am_enabled and handle_as_int in worker._am_recv_wait: while len(worker._am_recv_wait[handle_as_int]) > 0: recv_wait = worker._am_recv_wait[handle_as_int].pop(0) cb_func = recv_wait["cb_func"] From 237068c2aa374ba16b278f0d71c8ed6153761abe Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 24 Aug 2021 16:39:14 +0200 Subject: [PATCH 06/11] Remove `guarantee_msg_order` (#775) * Remove `guarantee_msg_order` --- tests/test_guarantee_msg_order.py | 34 --------- ucp/core.py | 115 ++++++------------------------ 2 files changed, 21 insertions(+), 128 deletions(-) delete mode 100644 tests/test_guarantee_msg_order.py diff --git a/tests/test_guarantee_msg_order.py b/tests/test_guarantee_msg_order.py deleted file mode 100644 index a653e1dfc..000000000 --- a/tests/test_guarantee_msg_order.py +++ /dev/null @@ -1,34 +0,0 @@ -import asyncio - -import pytest - -import ucp - - -@pytest.mark.parametrize("server_guarantee_msg_order", [True, False]) -def test_mismatch(server_guarantee_msg_order): - - # We use an exception handle to catch errors raised by the server - def handle_exception(loop, context): - msg = str(context.get("exception", context["message"])) - loop.test_failed = msg.find(loop.error_msg_expected) == -1 - - loop = asyncio.get_event_loop() - loop.set_exception_handler(handle_exception) - loop.test_failed = False - loop.error_msg_expected = "Both peers must set guarantee_msg_order identically" - - with pytest.raises(ValueError, match=loop.error_msg_expected): - lt = ucp.create_listener( - lambda x: x, guarantee_msg_order=server_guarantee_msg_order - ) - loop.run_until_complete( - ucp.create_endpoint( - ucp.get_address(), - lt.port, - guarantee_msg_order=(not server_guarantee_msg_order), - ) - ) - loop.run_until_complete(asyncio.sleep(0.1)) # Give the server time to finish - - assert not loop.test_failed, "expected error message not raised by the server" diff --git a/ucp/core.py b/ucp/core.py index 4c68f7d4a..41f4b063a 100644 --- a/ucp/core.py +++ b/ucp/core.py @@ -35,20 +35,12 @@ def _get_ctx(): return _ctx -async def exchange_peer_info( - endpoint, msg_tag, ctrl_tag, guarantee_msg_order, listener -): +async def exchange_peer_info(endpoint, msg_tag, ctrl_tag, listener): """Help function that exchange endpoint information""" # Pack peer information incl. a checksum - fmt = "QQ?Q" - my_info = struct.pack( - fmt, - msg_tag, - ctrl_tag, - guarantee_msg_order, - hash64bits(msg_tag, ctrl_tag, guarantee_msg_order), - ) + fmt = "QQQ" + my_info = struct.pack(fmt, msg_tag, ctrl_tag, hash64bits(msg_tag, ctrl_tag)) peer_info = bytearray(len(my_info)) my_info_arr = Array(my_info) peer_info_arr = Array(peer_info) @@ -64,25 +56,15 @@ async def exchange_peer_info( # Unpacking and sanity check of the peer information ret = {} - ( - ret["msg_tag"], - ret["ctrl_tag"], - ret["guarantee_msg_order"], - ret["checksum"], - ) = struct.unpack(fmt, peer_info) - - expected_checksum = hash64bits( - ret["msg_tag"], ret["ctrl_tag"], ret["guarantee_msg_order"] - ) + (ret["msg_tag"], ret["ctrl_tag"], ret["checksum"]) = struct.unpack(fmt, peer_info) + + expected_checksum = hash64bits(ret["msg_tag"], ret["ctrl_tag"]) if expected_checksum != ret["checksum"]: raise RuntimeError( f'Checksum invalid! {hex(expected_checksum)} != {hex(ret["checksum"])}' ) - if ret["guarantee_msg_order"] != guarantee_msg_order: - raise ValueError("Both peers must set guarantee_msg_order identically") - return ret @@ -143,9 +125,7 @@ def setup_ctrl_recv(ep): ) -async def _listener_handler_coroutine( - conn_request, ctx, func, guarantee_msg_order, endpoint_error_handling -): +async def _listener_handler_coroutine(conn_request, ctx, func, endpoint_error_handling): # We create the Endpoint in five steps: # 1) Create endpoint from conn_request # 2) Generate unique IDs to use as tags @@ -161,11 +141,7 @@ async def _listener_handler_coroutine( ctrl_tag = hash64bits("ctrl_tag", seed, endpoint.handle) peer_info = await exchange_peer_info( - endpoint=endpoint, - msg_tag=msg_tag, - ctrl_tag=ctrl_tag, - guarantee_msg_order=guarantee_msg_order, - listener=True, + endpoint=endpoint, msg_tag=msg_tag, ctrl_tag=ctrl_tag, listener=True, ) tags = { "msg_send": peer_info["msg_tag"], @@ -173,9 +149,7 @@ async def _listener_handler_coroutine( "ctrl_send": peer_info["ctrl_tag"], "ctrl_recv": ctrl_tag, } - ep = Endpoint( - endpoint=endpoint, ctx=ctx, guarantee_msg_order=guarantee_msg_order, tags=tags - ) + ep = Endpoint(endpoint=endpoint, ctx=ctx, tags=tags) logger.debug( "_listener_handler() server: %s, error handling: %s, msg-tag-send: %s, " @@ -203,16 +177,10 @@ async def _listener_handler_coroutine( func(ep) -def _listener_handler( - conn_request, callback_func, ctx, guarantee_msg_order, endpoint_error_handling -): +def _listener_handler(conn_request, callback_func, ctx, endpoint_error_handling): asyncio.ensure_future( _listener_handler_coroutine( - conn_request, - ctx, - callback_func, - guarantee_msg_order, - endpoint_error_handling, + conn_request, ctx, callback_func, endpoint_error_handling, ) ) @@ -251,11 +219,7 @@ def __init__(self, config_dict={}, blocking_progress_mode=None): ) def create_listener( - self, - callback_func, - port=0, - guarantee_msg_order=False, - endpoint_error_handling=None, + self, callback_func, port=0, endpoint_error_handling=None, ): """Create and start a listener to accept incoming connections @@ -273,9 +237,6 @@ def create_listener( port: int, optional An unused port number for listening, or `0` to let UCX assign an unused port. - guarantee_msg_order: boolean, optional - Whether to guarantee message order or not. Remember, both peers - of the endpoint must set guarantee_msg_order to the same value. endpoint_error_handling: None or boolean, optional Enable endpoint error handling raising exceptions when an error occurs, may incur in performance penalties but prevents a process @@ -303,19 +264,12 @@ def create_listener( worker=self.worker, port=port, cb_func=_listener_handler, - cb_args=( - callback_func, - self, - guarantee_msg_order, - endpoint_error_handling, - ), + cb_args=(callback_func, self, endpoint_error_handling), ) ) return ret - async def create_endpoint( - self, ip_address, port, guarantee_msg_order, endpoint_error_handling=None - ): + async def create_endpoint(self, ip_address, port, endpoint_error_handling=None): """Create a new endpoint to a server Parameters @@ -324,9 +278,6 @@ async def create_endpoint( IP address of the server the endpoint should connect to port: int IP address of the server the endpoint should connect to - guarantee_msg_order: boolean, optional - Whether to guarantee message order or not. Remember, both peers - of the endpoint must set guarantee_msg_order to the same value. endpoint_error_handling: None or boolean, optional Enable endpoint error handling raising exceptions when an error occurs, may incur in performance penalties but prevents a process @@ -359,11 +310,7 @@ async def create_endpoint( msg_tag = hash64bits("msg_tag", seed, ucx_ep.handle) ctrl_tag = hash64bits("ctrl_tag", seed, ucx_ep.handle) peer_info = await exchange_peer_info( - endpoint=ucx_ep, - msg_tag=msg_tag, - ctrl_tag=ctrl_tag, - guarantee_msg_order=guarantee_msg_order, - listener=False, + endpoint=ucx_ep, msg_tag=msg_tag, ctrl_tag=ctrl_tag, listener=False, ) tags = { "msg_send": peer_info["msg_tag"], @@ -371,12 +318,7 @@ async def create_endpoint( "ctrl_send": peer_info["ctrl_tag"], "ctrl_recv": ctrl_tag, } - ep = Endpoint( - endpoint=ucx_ep, - ctx=self, - guarantee_msg_order=guarantee_msg_order, - tags=tags, - ) + ep = Endpoint(endpoint=ucx_ep, ctx=self, tags=tags) logger.debug( "create_endpoint() client: %s, error handling: %s, msg-tag-send: %s, " @@ -520,10 +462,9 @@ class Endpoint: to create an Endpoint. """ - def __init__(self, endpoint, ctx, guarantee_msg_order, tags=None): + def __init__(self, endpoint, ctx, tags=None): self._ep = endpoint self._ctx = ctx - self._guarantee_msg_order = guarantee_msg_order self._send_count = 0 # Number of calls to self.send() self._recv_count = 0 # Number of calls to self.recv() self._finished_recv_count = 0 # Number of returned (finished) self.recv() calls @@ -624,8 +565,6 @@ async def send(self, buffer, tag=None): tag = self._tags["msg_send"] else: tag = hash64bits(self._tags["msg_send"], hash(tag)) - if self._guarantee_msg_order: - tag += self._send_count try: return await comm.tag_send(self._ep, buffer, nbytes, tag, name=log) @@ -697,8 +636,6 @@ async def recv(self, buffer, tag=None): ) logger.debug(log) self._recv_count += 1 - if self._guarantee_msg_order: - tag += self._recv_count try: ret = await comm.tag_recv(self._ep, buffer, nbytes, tag, name=log) @@ -943,25 +880,15 @@ def register_am_allocator(allocator, allocator_type): return _get_ctx().register_am_allocator(allocator, allocator_type) -def create_listener( - callback_func, port=None, guarantee_msg_order=False, endpoint_error_handling=None -): +def create_listener(callback_func, port=None, endpoint_error_handling=None): return _get_ctx().create_listener( - callback_func, - port, - guarantee_msg_order, - endpoint_error_handling=endpoint_error_handling, + callback_func, port, endpoint_error_handling=endpoint_error_handling, ) -async def create_endpoint( - ip_address, port, guarantee_msg_order=False, endpoint_error_handling=None -): +async def create_endpoint(ip_address, port, endpoint_error_handling=None): return await _get_ctx().create_endpoint( - ip_address, - port, - guarantee_msg_order, - endpoint_error_handling=endpoint_error_handling, + ip_address, port, endpoint_error_handling=endpoint_error_handling, ) From 9c115813571d47b20e263faab17f644553e2d30e Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 7 Sep 2021 18:30:26 +0200 Subject: [PATCH 07/11] Add support for endpoints from worker address on async API (#777) * Add `create_endpoint_from_worker_address` function to async API * Ensure progress even before endpoints are created on local process * Add `recv` function to receive from worker on async API * Add `force_tag` argument to `send`/`recv` Endpoint async functions * Support worker on `comm.tag_recv` function * Add simple `create_endpoint_from_worker_address` test * Add multinode test for `create_endpoint_from_worker_address` * Add wrapper for `UCXAddress.from_buffer` to async API --- tests/test_from_worker_address.py | 233 ++++++++++++++++++++++++++++++ ucp/comm.py | 15 +- ucp/core.py | 127 ++++++++++++++-- 3 files changed, 353 insertions(+), 22 deletions(-) create mode 100644 tests/test_from_worker_address.py diff --git a/tests/test_from_worker_address.py b/tests/test_from_worker_address.py new file mode 100644 index 000000000..19c7c2f42 --- /dev/null +++ b/tests/test_from_worker_address.py @@ -0,0 +1,233 @@ +import asyncio +import multiprocessing as mp +import os +import struct + +import numpy as np +import pytest + +import ucp + +mp = mp.get_context("spawn") + + +def _test_from_worker_address_server(queue): + async def run(): + # Send worker address to client process via multiprocessing.Queue + address = ucp.get_worker_address() + queue.put(address) + + # Receive address size + address_size = np.empty(1, dtype=np.int64) + await ucp.recv(address_size, tag=0) + + # Receive address buffer on tag 0 and create UCXAddress from it + remote_address = bytearray(address_size[0]) + await ucp.recv(remote_address, tag=0) + remote_address = ucp.get_ucx_address_from_buffer(remote_address) + + # Create endpoint to remote worker using the received address + ep = await ucp.create_endpoint_from_worker_address(remote_address) + + # Send data to client's endpoint + send_msg = np.arange(10, dtype=np.int64) + await ep.send(send_msg, tag=1, force_tag=True) + + asyncio.get_event_loop().run_until_complete(run()) + + +def _test_from_worker_address_client(queue): + async def run(): + # Read local worker address + address = ucp.get_worker_address() + + # Receive worker address from server via multiprocessing.Queue, create + # endpoint to server + remote_address = queue.get() + ep = await ucp.create_endpoint_from_worker_address(remote_address) + + # Send local address to server on tag 0 + await ep.send(np.array(address.length, np.int64), tag=0, force_tag=True) + await ep.send(address, tag=0, force_tag=True) + + # Receive message from server + recv_msg = np.empty(10, dtype=np.int64) + await ep.recv(recv_msg, tag=1, force_tag=True) + + np.testing.assert_array_equal(recv_msg, np.arange(10, dtype=np.int64)) + + asyncio.get_event_loop().run_until_complete(run()) + + +def test_from_worker_address(): + queue = mp.Queue() + + server = mp.Process(target=_test_from_worker_address_server, args=(queue,),) + server.start() + + client = mp.Process(target=_test_from_worker_address_client, args=(queue,),) + client.start() + + client.join() + server.join() + + assert not server.exitcode + assert not client.exitcode + + +def _get_address_info(address=None): + # Fixed frame size + frame_size = 10000 + + # Header format: Recv Tag (Q) + Send Tag (Q) + UCXAddress.length (Q) + header_fmt = "QQQ" + + # Data length + data_length = frame_size - struct.calcsize(header_fmt) + + # Padding length + padding_length = None if address is None else (data_length - address.length) + + # Header + UCXAddress string + padding + fixed_size_address_buffer_fmt = header_fmt + str(data_length) + "s" + + assert struct.calcsize(fixed_size_address_buffer_fmt) == frame_size + + return { + "frame_size": frame_size, + "data_length": data_length, + "padding_length": padding_length, + "fixed_size_address_buffer_fmt": fixed_size_address_buffer_fmt, + } + + +def _pack_address_and_tag(address, recv_tag, send_tag): + address_info = _get_address_info(address) + + fixed_size_address_packed = struct.pack( + address_info["fixed_size_address_buffer_fmt"], + recv_tag, # Recv Tag + send_tag, # Send Tag + address.length, # Address buffer length + ( + bytearray(address) + bytearray(address_info["padding_length"]) + ), # Address buffer + padding + ) + + assert len(fixed_size_address_packed) == address_info["frame_size"] + + return fixed_size_address_packed + + +def _unpack_address_and_tag(address_packed): + address_info = _get_address_info() + + recv_tag, send_tag, address_length, address_padded = struct.unpack( + address_info["fixed_size_address_buffer_fmt"], address_packed, + ) + + # Swap send and recv tags, as they are used by the remote process in the + # opposite direction. + return { + "address": address_padded[:address_length], + "recv_tag": send_tag, + "send_tag": recv_tag, + } + + +def _test_from_worker_address_server_fixedsize(num_nodes, queue): + async def run(): + async def _handle_client(packed_remote_address): + # Unpack the fixed-size address+tag buffer + unpacked = _unpack_address_and_tag(packed_remote_address) + remote_address = ucp.get_ucx_address_from_buffer(unpacked["address"]) + + # Create endpoint to remote worker using the received address + ep = await ucp.create_endpoint_from_worker_address(remote_address) + + # Send data to client's endpoint + send_msg = np.arange(10, dtype=np.int64) + await ep.send(send_msg, tag=unpacked["send_tag"], force_tag=True) + + # Receive data from client's endpoint + recv_msg = np.empty(20, dtype=np.int64) + await ep.recv(recv_msg, tag=unpacked["recv_tag"], force_tag=True) + + np.testing.assert_array_equal(recv_msg, np.arange(20, dtype=np.int64)) + + # Send worker address to client processes via multiprocessing.Queue, + # one entry for each client. + address = ucp.get_worker_address() + for i in range(num_nodes): + queue.put(address) + + address_info = _get_address_info() + + server_tasks = [] + for i in range(num_nodes): + # Receive fixed-size address+tag buffer on tag 0 + packed_remote_address = bytearray(address_info["frame_size"]) + await ucp.recv(packed_remote_address, tag=0) + + # Create an async task for client + server_tasks.append(_handle_client(packed_remote_address)) + + # Await handling each client request + await asyncio.gather(*server_tasks) + + asyncio.get_event_loop().run_until_complete(run()) + + +def _test_from_worker_address_client_fixedsize(queue): + async def run(): + # Read local worker address + address = ucp.get_worker_address() + recv_tag = ucp.utils.hash64bits(os.urandom(16)) + send_tag = ucp.utils.hash64bits(os.urandom(16)) + packed_address = _pack_address_and_tag(address, recv_tag, send_tag) + + # Receive worker address from server via multiprocessing.Queue, create + # endpoint to server + remote_address = queue.get() + ep = await ucp.create_endpoint_from_worker_address(remote_address) + + # Send local address to server on tag 0 + await ep.send(packed_address, tag=0, force_tag=True) + + # Receive message from server + recv_msg = np.empty(10, dtype=np.int64) + await ep.recv(recv_msg, tag=recv_tag, force_tag=True) + + np.testing.assert_array_equal(recv_msg, np.arange(10, dtype=np.int64)) + + # Send message to server + send_msg = np.arange(20, dtype=np.int64) + await ep.send(send_msg, tag=send_tag, force_tag=True) + + asyncio.get_event_loop().run_until_complete(run()) + + +@pytest.mark.parametrize("num_nodes", [1, 2, 4, 8]) +def test_from_worker_address_multinode(num_nodes): + queue = mp.Queue() + + server = mp.Process( + target=_test_from_worker_address_server_fixedsize, args=(num_nodes, queue), + ) + server.start() + + clients = [] + for i in range(num_nodes): + client = mp.Process( + target=_test_from_worker_address_client_fixedsize, args=(queue,), + ) + client.start() + clients.append(client) + + for client in clients: + client.join() + + server.join() + + assert not server.exitcode + assert not client.exitcode diff --git a/ucp/comm.py b/ucp/comm.py index bf97f76a5..707b44b02 100644 --- a/ucp/comm.py +++ b/ucp/comm.py @@ -3,6 +3,7 @@ # See file LICENSE for terms. import asyncio +from typing import Union from ._libs import arr, ucx_api @@ -80,7 +81,7 @@ def stream_send( def tag_recv( - ep: ucx_api.UCXEndpoint, + obj: Union[ucx_api.UCXEndpoint, ucx_api.UCXWorker], buffer: arr.Array, nbytes: int, tag: int, @@ -88,15 +89,11 @@ def tag_recv( event_loop=None, ) -> asyncio.Future: + worker = obj if isinstance(obj, ucx_api.UCXWorker) else obj.worker + ep = obj if isinstance(obj, ucx_api.UCXEndpoint) else None + return _call_ucx_api( - event_loop, - ucx_api.tag_recv_nb, - ep.worker, - buffer, - nbytes, - tag, - name=name, - ep=ep, + event_loop, ucx_api.tag_recv_nb, worker, buffer, nbytes, tag, name=name, ep=ep, ) diff --git a/ucp/core.py b/ucp/core.py index 41f4b063a..f820ed377 100644 --- a/ucp/core.py +++ b/ucp/core.py @@ -218,6 +218,11 @@ def __init__(self, config_dict={}, blocking_progress_mode=None): self, _epoll_fd_finalizer, self.epoll_fd, self.progress_tasks ) + # Ensure progress even before Endpoints get created, for example to + # receive messages directly on a worker after a remote endpoint + # connected with `create_endpoint_from_worker_address`. + self.continuous_ucx_progress() + def create_listener( self, callback_func, port=0, endpoint_error_handling=None, ): @@ -337,6 +342,47 @@ async def create_endpoint(self, ip_address, port, endpoint_error_handling=None): CtrlMsg.setup_ctrl_recv(ep) return ep + async def create_endpoint_from_worker_address( + self, address, endpoint_error_handling=None + ): + """Create a new endpoint to a server + + Parameters + ---------- + address: UCXAddress + endpoint_error_handling: None or boolean, optional + Enable endpoint error handling raising exceptions when an error + occurs, may incur in performance penalties but prevents a process + from terminating unexpectedly that may happen when disabled. + None (default) will enable endpoint error handling based on the + UCX version, enabling for UCX >= 1.11.0 and disabled for any + versions prior to that. This is done to prevent CUDA IPC to be + quietly disabled due to lack of support in older UCX versions. + Explicitly specifying True/False will override the default. + + Returns + ------- + Endpoint + The new endpoint + """ + self.continuous_ucx_progress() + if endpoint_error_handling is None: + endpoint_error_handling = get_ucx_version() >= (1, 11, 0) + + ucx_ep = ucx_api.UCXEndpoint.create_from_worker_address( + self.worker, address, endpoint_error_handling, + ) + self.worker.progress() + + ep = Endpoint(endpoint=ucx_ep, ctx=self, tags=None) + + logger.debug( + "create_endpoint() client: %s, error handling: %s" + % (hex(ep._ep.handle), endpoint_error_handling) + ) + + return ep + def continuous_ucx_progress(self, event_loop=None): """Guarantees continuous UCX progress @@ -424,6 +470,31 @@ def register_am_allocator(self, allocator, allocator_type): allocator_type = ucx_api.AllocatorType.UNSUPPORTED self.worker.register_am_allocator(allocator, allocator_type) + @nvtx_annotate("UCXPY_WORKER_RECV", color="red", domain="ucxpy") + async def recv(self, buffer, tag): + """Receive directly on worker without a local Endpoint into `buffer`. + + Parameters + ---------- + buffer: exposing the buffer protocol or array/cuda interface + The buffer to receive into. Raise ValueError if buffer + is smaller than nbytes or read-only. + tag: hashable, optional + Set a tag that must match the received message. + """ + if not isinstance(buffer, Array): + buffer = Array(buffer) + nbytes = buffer.nbytes + log = "[Worker Recv] worker: %s, tag: %s, nbytes: %d, type: %s" % ( + hex(self.worker.handle), + hex(tag), + nbytes, + type(buffer.obj), + ) + logger.debug(log) + + return await comm.tag_recv(self.worker, buffer, nbytes, tag, name=log) + class Listener: """A handle to the listening service started by `create_listener()` @@ -535,7 +606,7 @@ async def close(self): self.abort() @nvtx_annotate("UCXPY_SEND", color="green", domain="ucxpy") - async def send(self, buffer, tag=None): + async def send(self, buffer, tag=None, force_tag=False): """Send `buffer` to connected peer. Parameters @@ -544,27 +615,35 @@ async def send(self, buffer, tag=None): The buffer to send. Raise ValueError if buffer is smaller than nbytes. tag: hashable, optional - Set a tag that the receiver must match. + tag: hashable, optional + Set a tag that the receiver must match. Currently the tag + is hashed together with the internal Endpoint tag that is + agreed with the remote end at connection time. To enforce + using the user tag, make sure to specify `force_tag=True`. + force_tag: bool + If true, force using `tag` as is, otherwise the value + specified with `tag` (if any) will be hashed with the + internal Endpoint tag. """ self._ep.raise_on_error() if self.closed(): raise UCXCloseError("Endpoint closed") if not isinstance(buffer, Array): buffer = Array(buffer) + if tag is None: + tag = self._tags["msg_send"] + elif not force_tag: + tag = hash64bits(self._tags["msg_send"], hash(tag)) nbytes = buffer.nbytes log = "[Send #%03d] ep: %s, tag: %s, nbytes: %d, type: %s" % ( self._send_count, hex(self.uid), - hex(self._tags["msg_send"]), + hex(tag), nbytes, type(buffer.obj), ) logger.debug(log) self._send_count += 1 - if tag is None: - tag = self._tags["msg_send"] - else: - tag = hash64bits(self._tags["msg_send"], hash(tag)) try: return await comm.tag_send(self._ep, buffer, nbytes, tag, name=log) @@ -601,7 +680,7 @@ async def am_send(self, buffer): return await comm.am_send(self._ep, buffer, nbytes, name=log) @nvtx_annotate("UCXPY_RECV", color="red", domain="ucxpy") - async def recv(self, buffer, tag=None): + async def recv(self, buffer, tag=None, force_tag=False): """Receive from connected peer into `buffer`. Parameters @@ -610,13 +689,19 @@ async def recv(self, buffer, tag=None): The buffer to receive into. Raise ValueError if buffer is smaller than nbytes or read-only. tag: hashable, optional - Set a tag that must match the received message. Notice, currently - UCX-Py doesn't support a "any tag" thus `tag=None` only matches a - send that also sets `tag=None`. + Set a tag that must match the received message. Currently + the tag is hashed together with the internal Endpoint tag + that is agreed with the remote end at connection time. + To enforce using the user tag, make sure to specify + `force_tag=True`. + force_tag: bool + If true, force using `tag` as is, otherwise the value + specified with `tag` (if any) will be hashed with the + internal Endpoint tag. """ if tag is None: tag = self._tags["msg_recv"] - else: + elif not force_tag: tag = hash64bits(self._tags["msg_recv"], hash(tag)) if not self._ctx.worker.tag_probe(tag): @@ -630,7 +715,7 @@ async def recv(self, buffer, tag=None): log = "[Recv #%03d] ep: %s, tag: %s, nbytes: %d, type: %s" % ( self._recv_count, hex(self.uid), - hex(self._tags["msg_recv"]), + hex(tag), nbytes, type(buffer.obj), ) @@ -892,6 +977,14 @@ async def create_endpoint(ip_address, port, endpoint_error_handling=None): ) +async def create_endpoint_from_worker_address( + address, endpoint_error_handling=None, +): + return await _get_ctx().create_endpoint_from_worker_address( + address, endpoint_error_handling=endpoint_error_handling, + ) + + def continuous_ucx_progress(event_loop=None): _get_ctx().continuous_ucx_progress(event_loop=event_loop) @@ -904,6 +997,14 @@ def get_worker_address(): return _get_ctx().get_worker_address() +def get_ucx_address_from_buffer(buffer): + return ucx_api.UCXAddress.from_buffer(buffer) + + +async def recv(buffer, tag): + return await _get_ctx().recv(buffer, tag=tag) + + def get_ucp_context_info(): """Gets information on the current UCX context, obtained from `ucp_context_print_info`. From fb2b7cb2ca5969a736ac2ee77ef96e825e49d539 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 9 Sep 2021 09:43:27 -0700 Subject: [PATCH 08/11] Update build instructions to UCX 1.11.1 --- docs/source/install.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/source/install.rst b/docs/source/install.rst index 3a22b0085..778f48407 100644 --- a/docs/source/install.rst +++ b/docs/source/install.rst @@ -70,17 +70,17 @@ Test Dependencies distributed -UCX-1.11 (Development) -~~~~~~~~~~~~~~~~~~~~~~ +UCX-1.11.1 +~~~~~~~~~~ -Instructions for building UCX 1.11 (current development version): +Instructions for building UCX 1.11.1: :: conda activate ucx git clone https://github.com/openucx/ucx cd ucx - git checkout v1.11.x + git checkout v1.11.1 ./autogen.sh mkdir build cd build From 989056f6b78a715ba0461a765e16f606afd8afae Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 17 Sep 2021 17:44:58 +0200 Subject: [PATCH 09/11] Add compile-time variable UCXPY_DISABLE_HWLOC (#782) * Add compile-time variable UCXPY_DISABLE_HWLOC For UCX 1.9 and older we needed hwloc to determine the closest InfiniBand device to a GPU. With UCX 1.10+ this isn't necessary anymore, so it's a deprecated feature. Since building with hwloc can be problematic for some use cases, add a new compile-time environment variable to allow disabling build of topological distance code. * Document how to disable hwloc at compile-time --- docs/source/install.rst | 9 ++++++++- setup.py | 43 +++++++++++++++++++++++++---------------- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/docs/source/install.rst b/docs/source/install.rst index 778f48407..705d58455 100644 --- a/docs/source/install.rst +++ b/docs/source/install.rst @@ -55,10 +55,12 @@ Build Dependencies conda create -n ucx -c conda-forge \ automake make libtool pkg-config \ - libhwloc psutil \ + psutil \ "python=3.7" setuptools "cython>=0.29.14,<3.0.0a0" +If you are using UCX 1.9 and older and using both CUDA and InfiniBand support, ensure ``libhwloc`` is also on the list above. + Test Dependencies ~~~~~~~~~~~~~~~~~ @@ -157,3 +159,8 @@ UCX-Py pip install . # or for develop build pip install -v -e . + +In UCX 1.10 and above, or for builds that don't need CUDA and InfiniBand support, users can disable building with hwloc support: + +:: + UCXPY_DISABLE_HWLOC=1 pip install . diff --git a/setup.py b/setup.py index d27a4d713..264f64957 100644 --- a/setup.py +++ b/setup.py @@ -21,10 +21,33 @@ include_dirs = [os.path.dirname(get_python_inc())] library_dirs = [get_config_var("LIBDIR")] -libraries = ["ucp", "uct", "ucm", "ucs", "hwloc"] +libraries = ["ucp", "uct", "ucm", "ucs"] extra_compile_args = ["-std=c99", "-Werror"] +DISABLE_HWLOC = int(os.environ.get("UCXPY_DISABLE_HWLOC", "0")) +topological_distance_ext = [] +if DISABLE_HWLOC == 0: + libraries.append("hwloc") + topological_distance_ext = [ + Extension( + "ucp._libs.topological_distance", + sources=[ + "ucp/_libs/topological_distance.pyx", + "ucp/_libs/src/topological_distance.c", + ], + depends=[ + "ucp/_libs/src/topological_distance.h", + "ucp/_libs/topological_distance_dep.pxd", + ], + include_dirs=include_dirs, + library_dirs=library_dirs, + libraries=libraries, + extra_compile_args=extra_compile_args, + ), + ] + + def get_ucp_version(): with open(include_dirs[0] + "/ucp/api/ucp_version.h") as f: ftext = f.read() @@ -59,22 +82,8 @@ def get_ucp_version(): libraries=libraries, extra_compile_args=extra_compile_args, ), - Extension( - "ucp._libs.topological_distance", - sources=[ - "ucp/_libs/topological_distance.pyx", - "ucp/_libs/src/topological_distance.c", - ], - depends=[ - "ucp/_libs/src/topological_distance.h", - "ucp/_libs/topological_distance_dep.pxd", - ], - include_dirs=include_dirs, - library_dirs=library_dirs, - libraries=libraries, - extra_compile_args=extra_compile_args, - ), - ], + ] + + topological_distance_ext, compile_time_env={"CY_UCP_AM_SUPPORTED": _am_supported}, ) From 108dce38f625b2a9ae0f0e58619919d5bcf94cb9 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 22 Sep 2021 20:04:52 +0200 Subject: [PATCH 10/11] Add reference Docker container (#781) --- docker/Dockerfile | 24 ++++++++++++++++ docker/README.md | 24 ++++++++++++++++ docker/run.sh | 70 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+) create mode 100644 docker/Dockerfile create mode 100644 docker/README.md create mode 100755 docker/run.sh diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 000000000..6b1ac5f13 --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,24 @@ +FROM python:3 + +RUN apt-get update && \ + DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends tzdata && \ + apt-get install -y \ + automake \ + dh-make \ + g++ \ + git \ + libcap2 \ + libhwloc-dev \ + libnuma-dev \ + libtool \ + make \ + udev \ + wget \ + && apt-get remove -y openjdk-11-* || apt-get autoremove -y \ + && apt-get clean && rm -rf /var/lib/apt/lists/* + +COPY run.sh /root + +WORKDIR /root + +CMD [ "/root/run.sh" ] diff --git a/docker/README.md b/docker/README.md new file mode 100644 index 000000000..dd38c05b4 --- /dev/null +++ b/docker/README.md @@ -0,0 +1,24 @@ +# Docker container + +## Summary + +Contains reference dockerfile and build script to run UCX-Py tests and benchmarks. This is a minimal setup, without support for CUDA, MOFED or rdma-core. + +## Building Docker image + +To begin, it's necessary to build the image, this is done as follows: + +```bash +cd docker +docker build -t ucx-py -f Dockerfile . +``` + +## Running + +Once building the Docker image is complete, the container can be started with the following command: + +```bash +docker run ucx-py +``` + +The container above will run UCX-Py tests and benchmarks. diff --git a/docker/run.sh b/docker/run.sh new file mode 100755 index 000000000..d7e0d3ecd --- /dev/null +++ b/docker/run.sh @@ -0,0 +1,70 @@ +#!/bin/bash +# Copyright (c) 2021, NVIDIA CORPORATION. +set -e + +function logger { + echo -e "\n$@\n" +} + +PYTHON_PREFIX=$(python -c "import distutils.sysconfig; print(distutils.sysconfig.PREFIX)") + +################################################################################ +# SETUP - Install python packages and check environment +################################################################################ + +pip install \ + "pytest" "pytest-asyncio" \ + "dask" "distributed" \ + "cython" + +logger "Check versions" +python --version +pip list + +################################################################################ +# BUILD - Build UCX master, UCX-Py and run tests +################################################################################ +logger "Build UCX master" +cd $HOME +git clone https://github.com/openucx/ucx +cd ucx +./autogen.sh +./contrib/configure-devel \ + --prefix=$PYTHON_PREFIX \ + --enable-gtest=no \ + --with-valgrind=no +make -j install + +echo $PYTHON_PREFIX >> /etc/ld.so.conf.d/python.conf +ldconfig + +logger "UCX Version and Build Information" +ucx_info -v + + +################################################################################ +# TEST - Run pytests for ucx-py +################################################################################ +logger "Clone and Build UCX-Py" +cd $HOME +git clone https://github.com/rapidsai/ucx-py +cd ucx-py +python setup.py build_ext --inplace +python -m pip install -e . + +for tls in "tcp" "all"; do + export UCX_TLS=$tls + + logger "Python pytest for ucx-py" + + # Test with TCP/Sockets + logger "Tests (UCX_TLS=$UCX_TLS)" + pytest --cache-clear -vs ucp/_libs/tests + pytest --cache-clear -vs tests/ + + logger "Benchmarks (UCX_TLS=$UCX_TLS)" + python benchmarks/send-recv.py -o numpy \ + --server-dev 0 --client-dev 0 --reuse-alloc + python benchmarks/send-recv-core.py -o numpy \ + --server-dev 0 --client-dev 0 --reuse-alloc +done From de30c95c4aad04fac8d92f8f62781944b376bd13 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 23 Sep 2021 09:58:52 -0400 Subject: [PATCH 11/11] Make ucx-py installation blocks consistent (#784) * Make ucx-py installation examples consistent --- docs/source/install.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/source/install.rst b/docs/source/install.rst index 705d58455..5568d8934 100644 --- a/docs/source/install.rst +++ b/docs/source/install.rst @@ -155,12 +155,12 @@ UCX-Py conda activate ucx git clone https://github.com/rapidsai/ucx-py.git cd ucx-py - python setup.py build_ext --inplace - pip install . + pip install -v . # or for develop build pip install -v -e . In UCX 1.10 and above, or for builds that don't need CUDA and InfiniBand support, users can disable building with hwloc support: :: - UCXPY_DISABLE_HWLOC=1 pip install . + + UCXPY_DISABLE_HWLOC=1 pip install -v .