Skip to content

Commit

Permalink
Merge pull request #540 from elfenpiff/iox2-532-resizable-shm-support…
Browse files Browse the repository at this point in the history
…-in-publisher

[#532] resizable shm support in publisher
  • Loading branch information
elfenpiff authored Dec 5, 2024
2 parents b78746f + f08c0a0 commit ad49e19
Show file tree
Hide file tree
Showing 51 changed files with 1,543 additions and 505 deletions.
4 changes: 2 additions & 2 deletions benchmarks/publish-subscribe/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn perform_benchmark<T: Service>(args: &Args) -> Result<(), Box<dyn std::error::
.spawn(|| {
let sender_a2b = service_a2b
.publisher_builder()
.max_slice_len(args.payload_size)
.initial_max_slice_len(args.payload_size)
.create()
.unwrap();
let receiver_b2a = service_b2a.subscriber_builder().create().unwrap();
Expand Down Expand Up @@ -100,7 +100,7 @@ fn perform_benchmark<T: Service>(args: &Args) -> Result<(), Box<dyn std::error::
.spawn(|| {
let sender_b2a = service_b2a
.publisher_builder()
.max_slice_len(args.payload_size)
.initial_max_slice_len(args.payload_size)
.create()
.unwrap();
let receiver_a2b = service_a2b.subscriber_builder().create().unwrap();
Expand Down
1 change: 1 addition & 0 deletions doc/release-notes/iceoryx2-unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* Add relocatable `SlotMap` [#504](https://github.com/eclipse-iceoryx/iceoryx2/issues/504)
* Add `ResizableSharedMemory` [#497](https://github.com/eclipse-iceoryx/iceoryx2/issues/497)
* Make signal handling optional in `WaitSet` and `Node` [#528](https://github.com/eclipse-iceoryx/iceoryx2/issues/528)
* Support dynamic data with reallocation for publish-subscribe communication [#532](https://github.com/eclipse-iceoryx/iceoryx2/issues/532)
* Add benchmark for iceoryx2 queues [#535](https://github.com/eclipse-iceoryx/iceoryx2/issues/535)

### Bugfixes
Expand Down
15 changes: 15 additions & 0 deletions examples/cxx/domains/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,21 @@ not receive any data.

## Implementation

> [!CAUTION]
> Every payload you transmit with iceoryx2 must be compatible with shared
> memory. Specifically, it must:
>
> * be self contained, no heap, no pointers to external sources
> * have a uniform memory representation, ensuring that shared structs have the
> same data layout
> * not use pointers to manage their internal structure
>
> Data types like `std::string` or `std::vector` will cause undefined behavior
> and may result in segmentation faults. We provide alternative data types
> that are compatible with shared memory. See the
> [complex data type example](../complex_data_types) for guidance on how to
> use them.
To achieve this, we create a copy of the global configuration, modify the
setting `config.global.prefix` using the user-provided CLI argument, and then
set up the example accordingly.
Expand Down
15 changes: 15 additions & 0 deletions examples/cxx/event_based_communication/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,21 @@

## Running The Example

> [!CAUTION]
> Every payload you transmit with iceoryx2 must be compatible with shared
> memory. Specifically, it must:
>
> * be self contained, no heap, no pointers to external sources
> * have a uniform memory representation, ensuring that shared structs have the
> same data layout
> * not use pointers to manage their internal structure
>
> Data types like `std::string` or `std::vector` will cause undefined behavior
> and may result in segmentation faults. We provide alternative data types
> that are compatible with shared memory. See the
> [complex data type example](../complex_data_types) for guidance on how to
> use them.
This example demonstrates iceoryx2's event multiplexing mechanism in a more
complex setup. The iceoryx2 `Publisher` and `Subscriber` are integrated into
custom `ExamplePublisher` and `ExampleSubscriber` classes, which also
Expand Down
3 changes: 2 additions & 1 deletion examples/cxx/publish_subscribe/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ instructions in the [C++ Examples Readme](../README.md).
> memory. Specifically, it must:
>
> * be self contained, no heap, no pointers to external sources
> * have a uniform memory representation -> `#[repr(C)]`
> * have a uniform memory representation, ensuring that shared structs have the
> same data layout
> * not use pointers to manage their internal structure
>
> Data types like `std::string` or `std::vector` will cause undefined behavior
Expand Down
68 changes: 68 additions & 0 deletions examples/cxx/publish_subscribe_dynamic_data/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Publish-Subscribe With Dynamic Data (Slice Of Shared Memory Compatible Types)

This example demonstrates how to send data when the maximum data size cannot
be predetermined and needs to be adjusted dynamically during the service's
runtime. iceoryx2 enables the reallocation of the publisher's data segment,
allowing users to send samples of arbitrary sizes.

## Running The Example

> [!CAUTION]
> Every payload you transmit with iceoryx2 must be compatible with shared
> memory. Specifically, it must:
>
> * be self contained, no heap, no pointers to external sources
> * have a uniform memory representation, ensuring that shared structs have the
> same data layout
> * not use pointers to manage their internal structure
>
> Data types like `std::string` or `std::vector` will cause undefined behavior
> and may result in segmentation faults. We provide alternative data types
> that are compatible with shared memory. See the
> [complex data type example](../complex_data_types) for guidance on how to
> use them.
This example demonstrates a robust publisher-subscriber communication pattern
between two separate processes. A service with the payload type of an `u8` slice
is created, and every publisher can define a slice length hint they support
for communication with `initial_max_slice_len`. The publisher sends a message with
increasing size every second containing a piece of dynamic data. On the receiving
end, the subscriber checks for new data every second.

The subscriber is printing the sample on the console whenever new data arrives.

The `initial_max_slice_len` hint and the `AllocationStrategy` set by the
publisher will define how memory is reallocated when [`Publisher::loan_slice()`]
or [`Publisher::loan_slice_uninit()`] request more memory than it is available.

First you have to build the C++ examples:

```sh
cmake -S . -B target/ffi/build -DBUILD_EXAMPLES=ON
cmake --build target/ffi/build
```

To observe this dynamic communication in action, open two separate terminals and
execute the following commands:

### Terminal 1

```sh
./target/ffi/build/examples/cxx/publish_subscribe_dynamic_data/example_cxx_publish_subscribe_dyn_subscriber
```

### Terminal 2

```sh
./target/ffi/build/examples/cxx/publish_subscribe_dynamic_data/example_cxx_publish_subscribe_dyn_publisher
```

Feel free to run multiple instances of publisher or subscriber processes
simultaneously to explore how iceoryx2 handles publisher-subscriber
communication efficiently.

You may hit the maximum supported number of ports when too many publisher or
subscriber processes run. Take a look at the [iceoryx2 config](../../../config)
to set the limits globally or at the
[API of the Service builder](https://docs.rs/iceoryx2/latest/iceoryx2/service/index.html)
to set them for a single service.
18 changes: 14 additions & 4 deletions examples/cxx/publish_subscribe_dynamic_data/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,24 @@ auto main() -> int {

// Since the payload type is uint8_t, this number is the same as the number of bytes in the payload.
// For other types, number of bytes used by the payload will be max_slice_len * sizeof(Payload::ValueType)
const uint64_t maximum_elements = 1024; // NOLINT
auto publisher =
service.publisher_builder().max_slice_len(maximum_elements).create().expect("successful publisher creation");
constexpr uint64_t INITIAL_SIZE_HINT = 16;
auto publisher = service
.publisher_builder()
// We guess that the samples are at most 16 bytes in size.
// This is just a hint to the underlying allocator and is purely optional
// The better the guess is the less reallocations will be performed
.initial_max_slice_len(INITIAL_SIZE_HINT)
// The underlying sample size will be increased with a power of two strategy
// when [`Publisher::loan_slice()`] or [`Publisher::loan_slice_uninit()`] require more
// memory than available.
.allocation_strategy(AllocationStrategy::PowerOfTwo)
.create()
.expect("successful publisher creation");

auto counter = 0;

while (node.wait(CYCLE_TIME).has_value()) {
const auto required_memory_size = (counter % 16) + 1; // NOLINT
const auto required_memory_size = (counter + 1) * (counter + 1); // NOLINT
auto sample = publisher.loan_slice_uninit(required_memory_size).expect("acquire sample");
sample.write_from_fn([&](auto byte_idx) { return (byte_idx + counter) % 255; }); // NOLINT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "iox2/service_type.hpp"

#include <cstdint>
#include <iomanip>
#include <iostream>

constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromSeconds(1);
Expand All @@ -37,11 +36,8 @@ auto main() -> int {
auto sample = subscriber.receive().expect("receive succeeds");
while (sample.has_value()) {
auto payload = sample->payload();
std::cout << "received " << std::dec << static_cast<int>(payload.number_of_bytes()) << " bytes: ";
for (auto byte : payload) {
std::cout << std::setw(2) << std::setfill('0') << std::hex << static_cast<int>(byte) << " ";
}
std::cout << std::endl;
std::cout << "received " << std::dec << static_cast<int>(payload.number_of_bytes()) << " bytes"
<< std::endl;
sample = subscriber.receive().expect("receive succeeds");
}
}
Expand Down
3 changes: 2 additions & 1 deletion examples/cxx/publish_subscribe_with_user_header/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ instructions in the [C++ Examples Readme](../README.md).
> memory. Specifically, it must:
>
> * be self contained, no heap, no pointers to external sources
> * have a uniform memory representation -> `#[repr(C)]`
> * have a uniform memory representation, ensuring that shared structs have the
> same data layout
> * not use pointers to manage their internal structure
>
> Data types like `std::string` or `std::vector` will cause undefined behavior
Expand Down
17 changes: 13 additions & 4 deletions examples/rust/publish_subscribe_dynamic_data/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Publish-Subscribe With Dynamic Data (Slice Of Shared Memory Compatible Types)

This example demonstrates how to send data when the maximum data size cannot
be predetermined and needs to be adjusted dynamically during the service's
runtime. iceoryx2 enables the reallocation of the publisher's data segment,
allowing users to send samples of arbitrary sizes.

## Running The Example

> [!CAUTION]
Expand All @@ -18,13 +23,17 @@
This example demonstrates a robust publisher-subscriber communication pattern
between two separate processes. A service with the payload type of an `u8` slice
is created, and every publisher can define the largest slice length they support
for communication with `max_slice_len`. The publisher sends a message every
second containing a piece of dynamic data. On the receiving end, the subscriber
checks for new data every second.
is created, and every publisher can define a slice length hint they support
for communication with `initial_max_slice_len`. The publisher sends a message with
increasing size every second containing a piece of dynamic data. On the receiving
end, the subscriber checks for new data every second.

The subscriber is printing the sample on the console whenever new data arrives.

The `initial_max_slice_len` hint and the `AllocationStrategy` set by the
publisher will define how memory is reallocated when [`Publisher::loan_slice()`]
or [`Publisher::loan_slice_uninit()`] request more memory than it is available.

To observe this dynamic communication in action, open two separate terminals and
execute the following commands:

Expand Down
12 changes: 9 additions & 3 deletions examples/rust/publish_subscribe_dynamic_data/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,22 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.publish_subscribe::<[u8]>()
.open_or_create()?;

let maximum_elements = 1024;
let publisher = service
.publisher_builder()
.max_slice_len(maximum_elements)
// We guess that the samples are at most 16 bytes in size.
// This is just a hint to the underlying allocator and is purely optional
// The better the guess is the less reallocations will be performed
.initial_max_slice_len(16)
// The underlying sample size will be increased with a power of two strategy
// when [`Publisher::loan_slice()`] or [`Publisher::loan_slice_uninit()`] require more
// memory than available.
.allocation_strategy(AllocationStrategy::PowerOfTwo)
.create()?;

let mut counter = 0;

while node.wait(CYCLE_TIME).is_ok() {
let required_memory_size = (counter % 16) + 1;
let required_memory_size = (counter + 1) * (counter + 1);
let sample = publisher.loan_slice_uninit(required_memory_size)?;
let sample = sample.write_from_fn(|byte_idx| ((byte_idx + counter) % 255) as u8);

Expand Down
6 changes: 1 addition & 5 deletions examples/rust/publish_subscribe_dynamic_data/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

while node.wait(CYCLE_TIME).is_ok() {
while let Some(sample) = subscriber.receive()? {
print!("received {} bytes: ", sample.payload().len());
for byte in sample.payload() {
print!("{:02x} ", byte);
}
println!("");
println!("received {} bytes", sample.payload().len());
}
}

Expand Down
19 changes: 15 additions & 4 deletions iceoryx2-bb/memory/src/pool_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,20 @@ impl PoolAllocator {
self.bucket_alignment
}

/// Releases an previously allocated bucket of memory.
///
/// # Safety
///
/// * `ptr` must be allocated previously with [`PoolAllocator::allocate()`] or
/// [`PoolAllocator::allocate_zeroed()`]
///
pub unsafe fn deallocate_bucket(&self, ptr: NonNull<u8>) {
self.verify_init("deallocate");

self.buckets
.release_raw_index(self.get_index(ptr), ReleaseMode::Default);
}

/// # Safety
///
/// * `ptr` must point to a piece of memory of length `size`
Expand Down Expand Up @@ -204,10 +218,7 @@ impl BaseAllocator for PoolAllocator {
}

unsafe fn deallocate(&self, ptr: NonNull<u8>, _layout: Layout) {
self.verify_init("deallocate");

self.buckets
.release_raw_index(self.get_index(ptr), ReleaseMode::Default);
self.deallocate_bucket(ptr);
}
}

Expand Down
Loading

0 comments on commit ad49e19

Please sign in to comment.