Skip to content

Commit

Permalink
Merge pull request #960 from vinser52/svinogra_ipc_api
Browse files Browse the repository at this point in the history
Update umfOpenIPCHandle API to use IPC handler instead of pool
  • Loading branch information
lukaszstolarczuk authored Dec 5, 2024
2 parents 52680b5 + 55cf4ab commit 66b161a
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 30 deletions.
9 changes: 8 additions & 1 deletion examples/ipc_ipcapi/ipc_ipcapi_consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ int main(int argc, char *argv[]) {
goto err_destroy_OS_memory_provider;
}

umf_ipc_handler_handle_t ipc_handler;
umf_result = umfPoolGetIPCHandler(scalable_pool, &ipc_handler);
if (umf_result != UMF_RESULT_SUCCESS) {
fprintf(stderr, "[producer] ERROR: get IPC handler failed\n");
goto err_destroy_scalable_pool;
}

// connect to the producer
producer_socket = consumer_connect_to_producer(port);
if (producer_socket < 0) {
Expand Down Expand Up @@ -209,7 +216,7 @@ int main(int argc, char *argv[]) {
len);

void *SHM_ptr;
umf_result = umfOpenIPCHandle(scalable_pool, IPC_handle, &SHM_ptr);
umf_result = umfOpenIPCHandle(ipc_handler, IPC_handle, &SHM_ptr);
if (umf_result == UMF_RESULT_ERROR_NOT_SUPPORTED) {
fprintf(stderr,
"[consumer] SKIP: opening the IPC handle is not supported\n");
Expand Down
11 changes: 9 additions & 2 deletions examples/ipc_level_zero/ipc_level_zero.c
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,21 @@ int main(void) {

fprintf(stdout, "Consumer pool created.\n");

umf_ipc_handler_handle_t ipc_handler = 0;
umf_result = umfPoolGetIPCHandler(consumer_pool, &ipc_handler);
if (umf_result != UMF_RESULT_SUCCESS) {
fprintf(stderr, "ERROR: Failed to get IPC handler!\n");
return -1;
}

void *mapped_buf = NULL;
umf_result = umfOpenIPCHandle(consumer_pool, ipc_handle, &mapped_buf);
umf_result = umfOpenIPCHandle(ipc_handler, ipc_handle, &mapped_buf);
if (umf_result != UMF_RESULT_SUCCESS) {
fprintf(stderr, "ERROR: Failed to open IPC handle!\n");
return -1;
}

fprintf(stdout, "IPC handle opened in the consumer pool.\n");
fprintf(stdout, "IPC handle opened.\n");

size_t *tmp_buf = malloc(BUFFER_SIZE);
ret = level_zero_copy(consumer_context, device, tmp_buf, mapped_buf,
Expand Down
13 changes: 11 additions & 2 deletions include/umf/ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ extern "C" {

typedef struct umf_ipc_data_t *umf_ipc_handle_t;

typedef void *umf_ipc_handler_handle_t;

///
/// @brief Returns the size of IPC handles for the specified pool.
/// @param hPool [in] Pool handle
Expand All @@ -44,11 +46,11 @@ umf_result_t umfPutIPCHandle(umf_ipc_handle_t ipcHandle);

///
/// @brief Open IPC handle retrieved by umfGetIPCHandle.
/// @param hPool [in] Pool handle where to open the the IPC handle.
/// @param hIPCHandler [in] IPC Handler handle used to open the IPC handle.
/// @param ipcHandle [in] IPC handle.
/// @param ptr [out] pointer to the memory in the current process.
/// @return UMF_RESULT_SUCCESS on success or appropriate error code on failure.
umf_result_t umfOpenIPCHandle(umf_memory_pool_handle_t hPool,
umf_result_t umfOpenIPCHandle(umf_ipc_handler_handle_t hIPCHandler,
umf_ipc_handle_t ipcHandle, void **ptr);

///
Expand All @@ -57,6 +59,13 @@ umf_result_t umfOpenIPCHandle(umf_memory_pool_handle_t hPool,
/// @return UMF_RESULT_SUCCESS on success or appropriate error code on failure.
umf_result_t umfCloseIPCHandle(void *ptr);

/// @brief Get handle to the IPC handler from existing pool.
/// @param hPool [in] Pool handle
/// @param hIPCHandler [out] handle to the IPC handler
/// @return UMF_RESULT_SUCCESS on success or appropriate error code on failure.
umf_result_t umfPoolGetIPCHandler(umf_memory_pool_handle_t hPool,
umf_ipc_handler_handle_t *hIPCHandler);

#ifdef __cplusplus
}
#endif
Expand Down
9 changes: 6 additions & 3 deletions scripts/docs_config/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,15 @@ to another process it can be opened by the :any:`umfOpenIPCHandle` function.

.. code-block:: c
umf_ipc_handler_handle_t ipc_handler = 0;
umf_result = umfPoolGetIPCHandler(consumer_pool, &ipc_handler);
void *mapped_buf = NULL;
umf_result = umfOpenIPCHandle(consumer_pool, ipc_handle, &mapped_buf);
umf_result = umfOpenIPCHandle(ipc_handler, ipc_handle, &mapped_buf);
The :any:`umfOpenIPCHandle` function requires the memory pool handle and the IPC handle as input parameters. It maps
The :any:`umfOpenIPCHandle` function requires the IPC handler and the IPC handle as input parameters. The IPC handler maps
the handle to the current process address space and returns the pointer to the same memory region that was allocated
in the producer process.
in the producer process. To retrieve the IPC handler, the :any:`umfPoolGetIPCHandler` function is used.

.. note::
The virtual addresses of the memory region referred to by the IPC handle may not be the same in the producer and consumer processes.
Expand Down
37 changes: 33 additions & 4 deletions src/ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,18 @@ umf_result_t umfPutIPCHandle(umf_ipc_handle_t umfIPCHandle) {
return ret;
}

umf_result_t umfOpenIPCHandle(umf_memory_pool_handle_t hPool,
umf_result_t umfOpenIPCHandle(umf_ipc_handler_handle_t hIPCHandler,
umf_ipc_handle_t umfIPCHandle, void **ptr) {

// We cannot use umfPoolGetMemoryProvider function because it returns
// upstream provider but we need tracking one
umf_memory_provider_handle_t hProvider = hPool->provider;
// IPC handler is an instance of tracking memory provider
if (*(uint32_t *)hIPCHandler != UMF_VERSION_CURRENT) {
// It is a temporary hack to verify that user passes correct IPC handler,
// not a pool handle, as it was required in previous version.
LOG_ERR("Invalid IPC handler.");
return UMF_RESULT_ERROR_INVALID_ARGUMENT;
}

umf_memory_provider_handle_t hProvider = hIPCHandler;
void *base = NULL;

umf_result_t ret = umfMemoryProviderOpenIPCHandle(
Expand Down Expand Up @@ -153,3 +159,26 @@ umf_result_t umfCloseIPCHandle(void *ptr) {
return umfMemoryProviderCloseIPCHandle(hProvider, allocInfo.base,
allocInfo.baseSize);
}

umf_result_t umfPoolGetIPCHandler(umf_memory_pool_handle_t hPool,
umf_ipc_handler_handle_t *hIPCHandler) {
if (hPool == NULL) {
LOG_ERR("Pool handle is NULL.");
return UMF_RESULT_ERROR_INVALID_ARGUMENT;
}

if (hIPCHandler == NULL) {
LOG_ERR("hIPCHandler is NULL.");
return UMF_RESULT_ERROR_INVALID_ARGUMENT;
}

// We cannot use umfPoolGetMemoryProvider function because it returns
// upstream provider but we need tracking one
umf_memory_provider_handle_t hProvider = hPool->provider;

// We are using tracking provider as an IPC handler because
// it is doing IPC caching.
*hIPCHandler = (umf_ipc_handler_handle_t)hProvider;

return UMF_RESULT_SUCCESS;
}
1 change: 1 addition & 0 deletions src/libumf.def
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ EXPORTS
umfPoolCreateFromMemspace
umfPoolDestroy
umfPoolFree
umfPoolGetIPCHandler
umfPoolGetIPCHandleSize
umfPoolGetLastAllocationError
umfPoolGetMemoryProvider
Expand Down
1 change: 1 addition & 0 deletions src/libumf.map
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ UMF_1.0 {
umfPoolCreateFromMemspace;
umfPoolDestroy;
umfPoolFree;
umfPoolGetIPCHandler;
umfPoolGetIPCHandleSize;
umfPoolGetLastAllocationError;
umfPoolGetMemoryProvider;
Expand Down
9 changes: 8 additions & 1 deletion test/common/ipc_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ int run_consumer(int port, umf_memory_pool_ops_t *pool_ops, void *pool_params,
goto err_umfMemoryProviderDestroy;
}

umf_ipc_handler_handle_t ipc_handler;
umf_result = umfPoolGetIPCHandler(pool, &ipc_handler);
if (umf_result != UMF_RESULT_SUCCESS) {
fprintf(stderr, "[consumer] ERROR: get IPC handler failed\n");
goto err_umfMemoryPoolDestroy;
}

producer_socket = consumer_connect(port);
if (producer_socket < 0) {
goto err_umfMemoryPoolDestroy;
Expand Down Expand Up @@ -195,7 +202,7 @@ int run_consumer(int port, umf_memory_pool_ops_t *pool_ops, void *pool_params,
len);

void *SHM_ptr;
umf_result = umfOpenIPCHandle(pool, IPC_handle, &SHM_ptr);
umf_result = umfOpenIPCHandle(ipc_handler, IPC_handle, &SHM_ptr);
if (umf_result == UMF_RESULT_ERROR_NOT_SUPPORTED) {
fprintf(stderr,
"[consumer] SKIP: opening the IPC handle is not supported\n");
Expand Down
62 changes: 47 additions & 15 deletions test/ipcFixtures.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,17 @@ TEST_P(umfIpcTest, BasicFlow) {
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
ASSERT_EQ(handleFullSize, handleHalfSize);

umf_ipc_handler_handle_t ipcHandler = nullptr;
ret = umfPoolGetIPCHandler(pool.get(), &ipcHandler);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
ASSERT_NE(ipcHandler, nullptr);

void *fullArray = nullptr;
ret = umfOpenIPCHandle(pool.get(), ipcHandleFull, &fullArray);
ret = umfOpenIPCHandle(ipcHandler, ipcHandleFull, &fullArray);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);

void *halfArray = nullptr;
ret = umfOpenIPCHandle(pool.get(), ipcHandleHalf, &halfArray);
ret = umfOpenIPCHandle(ipcHandler, ipcHandleHalf, &halfArray);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);

std::vector<int> actual_data(SIZE);
Expand Down Expand Up @@ -276,8 +281,13 @@ TEST_P(umfIpcTest, GetPoolByOpenedHandle) {

for (size_t pool_id = 0; pool_id < NUM_POOLS; pool_id++) {
void *ptr = nullptr;
umf_ipc_handler_handle_t ipcHandler = nullptr;
ret =
umfOpenIPCHandle(pools_to_open[pool_id].get(), ipcHandle, &ptr);
umfPoolGetIPCHandler(pools_to_open[pool_id].get(), &ipcHandler);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
ASSERT_NE(ipcHandler, nullptr);

ret = umfOpenIPCHandle(ipcHandler, ipcHandle, &ptr);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
openedPtrs[pool_id][i] = ptr;
}
Expand Down Expand Up @@ -311,16 +321,22 @@ TEST_P(umfIpcTest, GetPoolByOpenedHandle) {
TEST_P(umfIpcTest, AllocFreeAllocTest) {
constexpr size_t SIZE = 64 * 1024;
umf::pool_unique_handle_t pool = makePool();
umf_ipc_handler_handle_t ipcHandler = nullptr;

umf_result_t ret = umfPoolGetIPCHandler(pool.get(), &ipcHandler);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
ASSERT_NE(ipcHandler, nullptr);

void *ptr = umfPoolMalloc(pool.get(), SIZE);
EXPECT_NE(ptr, nullptr);

umf_ipc_handle_t ipcHandle = nullptr;
size_t handleSize = 0;
umf_result_t ret = umfGetIPCHandle(ptr, &ipcHandle, &handleSize);
ret = umfGetIPCHandle(ptr, &ipcHandle, &handleSize);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);

void *opened_ptr = nullptr;
ret = umfOpenIPCHandle(pool.get(), ipcHandle, &opened_ptr);
ret = umfOpenIPCHandle(ipcHandler, ipcHandle, &opened_ptr);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);

ret = umfCloseIPCHandle(opened_ptr);
Expand All @@ -343,7 +359,7 @@ TEST_P(umfIpcTest, AllocFreeAllocTest) {
ret = umfGetIPCHandle(ptr, &ipcHandle, &handleSize);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);

ret = umfOpenIPCHandle(pool.get(), ipcHandle, &opened_ptr);
ret = umfOpenIPCHandle(ipcHandler, ipcHandle, &opened_ptr);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);

ret = umfCloseIPCHandle(opened_ptr);
Expand All @@ -362,11 +378,22 @@ TEST_P(umfIpcTest, AllocFreeAllocTest) {
EXPECT_EQ(stat.openCount, stat.closeCount);
}

TEST_P(umfIpcTest, openInTwoPools) {
TEST_P(umfIpcTest, openInTwoIpcHandlers) {
constexpr size_t SIZE = 100;
std::vector<int> expected_data(SIZE);
umf::pool_unique_handle_t pool1 = makePool();
umf::pool_unique_handle_t pool2 = makePool();
umf_ipc_handler_handle_t ipcHandler1 = nullptr;
umf_ipc_handler_handle_t ipcHandler2 = nullptr;

umf_result_t ret = umfPoolGetIPCHandler(pool1.get(), &ipcHandler1);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
ASSERT_NE(ipcHandler1, nullptr);

ret = umfPoolGetIPCHandler(pool2.get(), &ipcHandler2);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
ASSERT_NE(ipcHandler2, nullptr);

void *ptr = umfPoolMalloc(pool1.get(), sizeof(expected_data[0]) * SIZE);
EXPECT_NE(ptr, nullptr);

Expand All @@ -375,15 +402,15 @@ TEST_P(umfIpcTest, openInTwoPools) {

umf_ipc_handle_t ipcHandle = nullptr;
size_t handleSize = 0;
umf_result_t ret = umfGetIPCHandle(ptr, &ipcHandle, &handleSize);
ret = umfGetIPCHandle(ptr, &ipcHandle, &handleSize);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);

void *openedPtr1 = nullptr;
ret = umfOpenIPCHandle(pool1.get(), ipcHandle, &openedPtr1);
ret = umfOpenIPCHandle(ipcHandler1, ipcHandle, &openedPtr1);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);

void *openedPtr2 = nullptr;
ret = umfOpenIPCHandle(pool2.get(), ipcHandle, &openedPtr2);
ret = umfOpenIPCHandle(ipcHandler2, ipcHandle, &openedPtr2);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);

ret = umfPutIPCHandle(ipcHandle);
Expand Down Expand Up @@ -466,6 +493,7 @@ TEST_P(umfIpcTest, ConcurrentGetPutHandles) {
}

TEST_P(umfIpcTest, ConcurrentOpenCloseHandles) {
umf_result_t ret;
std::vector<void *> ptrs;
constexpr size_t ALLOC_SIZE = 100;
constexpr size_t NUM_POINTERS = 100;
Expand All @@ -481,21 +509,25 @@ TEST_P(umfIpcTest, ConcurrentOpenCloseHandles) {
for (size_t i = 0; i < NUM_POINTERS; ++i) {
umf_ipc_handle_t ipcHandle;
size_t handleSize;
umf_result_t ret = umfGetIPCHandle(ptrs[i], &ipcHandle, &handleSize);
ret = umfGetIPCHandle(ptrs[i], &ipcHandle, &handleSize);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
ipcHandles[i] = ipcHandle;
}

std::array<std::vector<void *>, NTHREADS> openedIpcHandles;
umf_ipc_handler_handle_t ipcHandler = nullptr;
ret = umfPoolGetIPCHandler(pool.get(), &ipcHandler);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
ASSERT_NE(ipcHandler, nullptr);

umf_test::syncthreads_barrier syncthreads(NTHREADS);

auto openHandlesFn = [&ipcHandles, &openedIpcHandles, &syncthreads,
&pool](size_t tid) {
ipcHandler](size_t tid) {
syncthreads();
for (auto ipcHandle : ipcHandles) {
void *ptr;
umf_result_t ret = umfOpenIPCHandle(pool.get(), ipcHandle, &ptr);
umf_result_t ret = umfOpenIPCHandle(ipcHandler, ipcHandle, &ptr);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);
openedIpcHandles[tid].push_back(ptr);
}
Expand All @@ -514,12 +546,12 @@ TEST_P(umfIpcTest, ConcurrentOpenCloseHandles) {
umf_test::parallel_exec(NTHREADS, closeHandlesFn);

for (auto ipcHandle : ipcHandles) {
umf_result_t ret = umfPutIPCHandle(ipcHandle);
ret = umfPutIPCHandle(ipcHandle);
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
}

for (void *ptr : ptrs) {
umf_result_t ret = umfPoolFree(pool.get(), ptr);
ret = umfPoolFree(pool.get(), ptr);
EXPECT_EQ(ret,
get_umf_result_of_free(freeNotSupported, UMF_RESULT_SUCCESS));
}
Expand Down
8 changes: 6 additions & 2 deletions test/ipc_negative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ TEST_F(IpcNotSupported, OpenIPCHandleNotSupported) {
// This data doesn't matter, as the ipc call is no-op
std::array<uint8_t, 128> ipc_data = {};
void *ptr;
auto ret = umfOpenIPCHandle(
pool, reinterpret_cast<umf_ipc_handle_t>(&ipc_data), &ptr);
umf_ipc_handler_handle_t ipc_handler;
auto ret = umfPoolGetIPCHandler(pool, &ipc_handler);
ASSERT_EQ(ret, UMF_RESULT_SUCCESS);

ret = umfOpenIPCHandle(ipc_handler,
reinterpret_cast<umf_ipc_handle_t>(&ipc_data), &ptr);
EXPECT_EQ(ret, UMF_RESULT_ERROR_NOT_SUPPORTED);
}

0 comments on commit 66b161a

Please sign in to comment.