diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e69de29..90db424 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -0,0 +1,27 @@ +name: Test + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + + - name: Set up JDK 11 + uses: actions/setup-java@v3 + with: + java-version: '11' + distribution: 'temurin' + cache: maven + + - name: Run tests + run: mvn -B test + + - name: Build with Maven + run: mvn -B package --file pom.xml \ No newline at end of file diff --git a/src/main/java/io/github/elimelt/pmqueue/Message.java b/src/main/java/io/github/elimelt/pmqueue/Message.java index 43df085..bfdacbd 100644 --- a/src/main/java/io/github/elimelt/pmqueue/Message.java +++ b/src/main/java/io/github/elimelt/pmqueue/Message.java @@ -5,107 +5,168 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.lang.ref.SoftReference; -import java.util.Arrays; import sun.misc.Unsafe; import java.lang.reflect.Field; +/** + * A high-performance, immutable message container optimized for memory + * efficiency and fast access. + * This class uses direct memory operations via {@link sun.misc.Unsafe} for + * improved performance + * and implements custom serialization for better control over the serialization + * process. + * + *

+ * The message contains: + *

+ * + *

+ * This class implements optimizations including: + *

+ * + * @see MessageSerializer + */ +@SuppressWarnings("deprecation") public class Message implements Serializable { - private static final long serialVersionUID = 1L; - - // Use SoftReference for the cached hash to allow GC if memory is tight - private transient SoftReference hashCache; - - // Direct byte array reference for minimal overhead - private final byte[] data; - private final long timestamp; - private final int messageType; - - // Cache array length to avoid field access - private final int length; - - // Unsafe instance for direct memory operations - private static final Unsafe unsafe; - - // Field offsets for direct memory access - private static final long dataOffset; - private static final long timestampOffset; - private static final long messageTypeOffset; - - static { - try { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - unsafe = (Unsafe) f.get(null); - - dataOffset = unsafe.objectFieldOffset(Message.class.getDeclaredField("data")); - timestampOffset = unsafe.objectFieldOffset(Message.class.getDeclaredField("timestamp")); - messageTypeOffset = unsafe.objectFieldOffset(Message.class.getDeclaredField("messageType")); - } catch (Exception e) { - throw new Error(e); - } - } - - public Message(byte[] data, int messageType) { - // Avoid double array length access - int dataLength = data.length; - this.data = new byte[dataLength]; - // Direct memory copy instead of Arrays.copyOf - unsafe.copyMemory(data, Unsafe.ARRAY_BYTE_BASE_OFFSET, - this.data, Unsafe.ARRAY_BYTE_BASE_OFFSET, - dataLength); - this.length = dataLength; - this.timestamp = System.currentTimeMillis(); - this.messageType = messageType; - } - - public byte[] getData() { - byte[] copy = new byte[length]; - // Direct memory copy for better performance - unsafe.copyMemory(data, Unsafe.ARRAY_BYTE_BASE_OFFSET, - copy, Unsafe.ARRAY_BYTE_BASE_OFFSET, - length); - return copy; - } - - public long getTimestamp() { - // Direct memory access instead of field access - return unsafe.getLong(this, timestampOffset); - } - - public int getMessageType() { - // Direct memory access instead of field access - return unsafe.getInt(this, messageTypeOffset); + private static final long serialVersionUID = 1L; + + private transient SoftReference hashCache; + private final byte[] data; + private final long timestamp; + private final int messageType; + private final int length; + + private static final Unsafe unsafe; + @SuppressWarnings("unused") + private static final long dataOffset; + private static final long timestampOffset; + private static final long messageTypeOffset; + + static { + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + unsafe = (Unsafe) f.get(null); + + dataOffset = unsafe.objectFieldOffset(Message.class.getDeclaredField("data")); + timestampOffset = unsafe.objectFieldOffset(Message.class.getDeclaredField("timestamp")); + messageTypeOffset = unsafe.objectFieldOffset(Message.class.getDeclaredField("messageType")); + } catch (Exception e) { + throw new Error(e); } - - @Override - public int hashCode() { - Integer cachedHash = hashCache != null ? hashCache.get() : null; - if (cachedHash != null) { - return cachedHash; - } - - // FNV-1a hash algorithm - faster than Arrays.hashCode - int hash = 0x811c9dc5; - for (byte b : data) { - hash ^= b; - hash *= 0x01000193; - } - hash = hash * 31 + (int)(timestamp ^ (timestamp >>> 32)); - hash = hash * 31 + messageType; - - hashCache = new SoftReference<>(hash); - return hash; - } - - private void writeObject(ObjectOutputStream out) throws IOException { - // Direct field access for better performance - out.writeLong(unsafe.getLong(this, timestampOffset)); - out.writeInt(unsafe.getInt(this, messageTypeOffset)); - out.writeInt(length); - out.write(data, 0, length); + } + + /** + * Creates a new Message with the specified data and message type. + * The message's timestamp is automatically set to the current system time. + * A defensive copy of the input data is made to ensure immutability. + * + * @param data the byte array containing the message data + * @param messageType an integer identifying the type of message + * @throws NullPointerException if data is null + */ + public Message(byte[] data, int messageType) { + int dataLength = data.length; + this.data = new byte[dataLength]; + unsafe.copyMemory(data, Unsafe.ARRAY_BYTE_BASE_OFFSET, + this.data, Unsafe.ARRAY_BYTE_BASE_OFFSET, + dataLength); + this.length = dataLength; + this.timestamp = System.currentTimeMillis(); + this.messageType = messageType; + } + + /** + * Returns a copy of the message data. + * A new array is created and returned each time to preserve immutability. + * + * @return a copy of the message data as a byte array + */ + public byte[] getData() { + byte[] copy = new byte[length]; + unsafe.copyMemory(data, Unsafe.ARRAY_BYTE_BASE_OFFSET, + copy, Unsafe.ARRAY_BYTE_BASE_OFFSET, + length); + return copy; + } + + /** + * Returns the timestamp when this message was created. + * + * @return the message creation timestamp as milliseconds since epoch + */ + public long getTimestamp() { + return unsafe.getLong(this, timestampOffset); + } + + /** + * Returns the message type identifier. + * + * @return the integer message type + */ + public int getMessageType() { + return unsafe.getInt(this, messageTypeOffset); + } + + /** + * Computes and caches the hash code for this message using the FNV-1a + * algorithm. + * The hash is computed based on the message data, timestamp, and message type. + * The computed hash is cached using a {@link SoftReference} to allow garbage + * collection + * if memory is tight. + * + * @return the hash code for this message + */ + @Override + public int hashCode() { + Integer cachedHash = hashCache != null ? hashCache.get() : null; + if (cachedHash != null) { + return cachedHash; } - private void readObject(ObjectInputStream in) throws IOException { - throw new IOException("Use MessageSerializer instead"); + int hash = 0x811c9dc5; + for (byte b : data) { + hash ^= b; + hash *= 0x01000193; } + hash = hash * 31 + (int) (timestamp ^ (timestamp >>> 32)); + hash = hash * 31 + messageType; + + hashCache = new SoftReference<>(hash); + return hash; + } + + /** + * Custom serialization implementation for better performance. + * Writes the message fields directly to the output stream. + * + * @param out the output stream to write to + * @throws IOException if an I/O error occurs + */ + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeLong(unsafe.getLong(this, timestampOffset)); + out.writeInt(unsafe.getInt(this, messageTypeOffset)); + out.writeInt(length); + out.write(data, 0, length); + } + + /** + * Disabled default deserialization. + * Use {@link MessageSerializer} instead for proper deserialization. + * + * @param in the input stream to read from + * @throws IOException always, to prevent default deserialization + */ + private void readObject(ObjectInputStream in) throws IOException { + throw new IOException("Use MessageSerializer instead"); + } } \ No newline at end of file diff --git a/src/main/java/io/github/elimelt/pmqueue/MessageSerializer.java b/src/main/java/io/github/elimelt/pmqueue/MessageSerializer.java index 6ae3d11..5062b28 100644 --- a/src/main/java/io/github/elimelt/pmqueue/MessageSerializer.java +++ b/src/main/java/io/github/elimelt/pmqueue/MessageSerializer.java @@ -6,93 +6,143 @@ import sun.misc.Unsafe; import java.lang.reflect.Field; +/** + * A high-performance serializer for {@link Message} objects using direct memory + * operations. + * This class provides methods to convert {@link Message} objects to and from + * byte arrays + * with minimal overhead and maximum performance. + * + *

+ * The serialization format consists of: + *

+ * + *

+ * Performance optimizations include: + *

+ * + *

+ * Note: This class is not intended for external use and + * should only be used by the {@link Message} class's serialization mechanism. + */ @SuppressWarnings("deprecation") class MessageSerializer { - private static final int HEADER_SIZE = 16; // 8 bytes timestamp + 4 bytes type + 4 bytes length + private static final int HEADER_SIZE = 16; - // Thread-local ByteBuffer for reuse - private static final ThreadLocal threadLocalBuffer = - ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(4096)); + private static final ThreadLocal threadLocalBuffer = ThreadLocal + .withInitial(() -> ByteBuffer.allocateDirect(4096)); - // Unsafe instance for direct memory operations - private static final Unsafe unsafe; + private static final Unsafe unsafe; + private static final long addressOffset; - // Base address of direct ByteBuffer for optimized access - private static final long addressOffset; + static { + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + unsafe = (Unsafe) f.get(null); - static { - try { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - unsafe = (Unsafe) f.get(null); + Field addressField = Buffer.class.getDeclaredField("address"); + addressOffset = unsafe.objectFieldOffset(addressField); + } catch (Exception e) { + throw new Error(e); + } + } + + /** + * Serializes a {@link Message} object into a byte array. + * The resulting byte array contains the message's timestamp, type, length, + * and data in a compact binary format. + * + *

+ * This method uses thread-local direct {@link ByteBuffer}s to optimize + * performance and minimize garbage collection pressure. The buffer size + * automatically grows if needed. + * + * @param message the Message object to serialize + * @return a byte array containing the serialized message + * @throws IOException if the message is null or cannot be serialized + * @throws OutOfMemoryError if unable to allocate required buffer space + */ + public static byte[] serialize(Message message) throws IOException { + if (message == null) { + throw new IOException("Message is null"); + } + + byte[] data = message.getData(); + int totalLength = HEADER_SIZE + data.length; - Field addressField = Buffer.class.getDeclaredField("address"); - addressOffset = unsafe.objectFieldOffset(addressField); - } catch (Exception e) { - throw new Error(e); - } + ByteBuffer buffer = threadLocalBuffer.get(); + if (buffer.capacity() < totalLength) { + buffer = ByteBuffer.allocateDirect(Math.max(totalLength, buffer.capacity() * 2)); + threadLocalBuffer.set(buffer); } - public static byte[] serialize(Message message) throws IOException { - if (message == null) { - throw new IOException("Message is null"); - } - - byte[] data = message.getData(); - int totalLength = HEADER_SIZE + data.length; - - // Get thread-local buffer or allocate new if needed - ByteBuffer buffer = threadLocalBuffer.get(); - if (buffer.capacity() < totalLength) { - buffer = ByteBuffer.allocateDirect(Math.max(totalLength, buffer.capacity() * 2)); - threadLocalBuffer.set(buffer); - } - - buffer.clear(); - long bufferAddress = unsafe.getLong(buffer, addressOffset); - - // Direct memory writes - unsafe.putLong(bufferAddress, message.getTimestamp()); - unsafe.putInt(bufferAddress + 8, message.getMessageType()); - unsafe.putInt(bufferAddress + 12, data.length); - unsafe.copyMemory(data, Unsafe.ARRAY_BYTE_BASE_OFFSET, - null, bufferAddress + HEADER_SIZE, - data.length); - - // Create result array and copy data - byte[] result = new byte[totalLength]; - unsafe.copyMemory(null, bufferAddress, - result, Unsafe.ARRAY_BYTE_BASE_OFFSET, - totalLength); - - return result; + buffer.clear(); + long bufferAddress = unsafe.getLong(buffer, addressOffset); + + unsafe.putLong(bufferAddress, message.getTimestamp()); + unsafe.putInt(bufferAddress + 8, message.getMessageType()); + unsafe.putInt(bufferAddress + 12, data.length); + unsafe.copyMemory(data, Unsafe.ARRAY_BYTE_BASE_OFFSET, + null, bufferAddress + HEADER_SIZE, + data.length); + + byte[] result = new byte[totalLength]; + unsafe.copyMemory(null, bufferAddress, + result, Unsafe.ARRAY_BYTE_BASE_OFFSET, + totalLength); + + return result; + } + + /** + * Deserializes a byte array into a {@link Message} object. + * The byte array must contain data in the format produced by + * {@link #serialize}. + * + *

+ * This method creates a new Message object with the original timestamp + * preserved through anonymous subclassing. The message type and data are + * extracted from the serialized format using direct memory operations for + * optimal performance. + * + * @param bytes the byte array containing the serialized message + * @return a new Message object with the deserialized data + * @throws IOException if the byte array is too short, contains invalid length + * information, or is otherwise malformed + */ + public static Message deserialize(byte[] bytes) throws IOException { + if (bytes.length < HEADER_SIZE) { + throw new IOException("Invalid message: too short"); } - public static Message deserialize(byte[] bytes) throws IOException { - if (bytes.length < HEADER_SIZE) { - throw new IOException("Invalid message: too short"); - } - - // Direct memory reads using Unsafe - long timestamp = unsafe.getLong(bytes, Unsafe.ARRAY_BYTE_BASE_OFFSET); - int type = unsafe.getInt(bytes, Unsafe.ARRAY_BYTE_BASE_OFFSET + 8); - int length = unsafe.getInt(bytes, Unsafe.ARRAY_BYTE_BASE_OFFSET + 12); - - if (length < 0 || length > bytes.length - HEADER_SIZE) { - throw new IOException("Invalid message length"); - } - - // Create data array and copy directly - byte[] data = new byte[length]; - unsafe.copyMemory(bytes, Unsafe.ARRAY_BYTE_BASE_OFFSET + HEADER_SIZE, - data, Unsafe.ARRAY_BYTE_BASE_OFFSET, - length); - - return new Message(data, type) { - @Override - public long getTimestamp() { - return timestamp; - } - }; + long timestamp = unsafe.getLong(bytes, Unsafe.ARRAY_BYTE_BASE_OFFSET); + int type = unsafe.getInt(bytes, Unsafe.ARRAY_BYTE_BASE_OFFSET + 8); + int length = unsafe.getInt(bytes, Unsafe.ARRAY_BYTE_BASE_OFFSET + 12); + + if (length < 0 || length > bytes.length - HEADER_SIZE) { + throw new IOException("Invalid message length"); } + + byte[] data = new byte[length]; + unsafe.copyMemory(bytes, Unsafe.ARRAY_BYTE_BASE_OFFSET + HEADER_SIZE, + data, Unsafe.ARRAY_BYTE_BASE_OFFSET, + length); + + return new Message(data, type) { + @Override + public long getTimestamp() { + return timestamp; + } + }; + } } \ No newline at end of file diff --git a/src/main/java/io/github/elimelt/pmqueue/PersistentMessageQueue.java b/src/main/java/io/github/elimelt/pmqueue/PersistentMessageQueue.java index adeda55..646b380 100644 --- a/src/main/java/io/github/elimelt/pmqueue/PersistentMessageQueue.java +++ b/src/main/java/io/github/elimelt/pmqueue/PersistentMessageQueue.java @@ -1,4 +1,5 @@ package io.github.elimelt.pmqueue; + import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -8,259 +9,396 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.zip.CRC32; +/** + * A high-performance, persistent queue implementation for storing messages on + * disk. + * This queue provides durability guarantees while maintaining efficient read + * and write + * operations through various optimizations including write batching and direct + * I/O. + * + *

+ * The queue stores messages in a single file with the following structure: + *

+ * + *

+ * Key features: + *

+ * + *

+ * Example usage: + * {@code + * // Create a new queue + * try (PersistentMessageQueue queue = new PersistentMessageQueue("messages.queue")) { + * // Create and offer a message + * Message msg1 = new Message("Hello".getBytes(), 1); + * queue.offer(msg1); + * + * // Poll a message from the queue + * Message received = queue.poll(); + * if (received != null) { + * System.out.println(new String(received.getData())); + * } + * } + * } + * + *

+ * Performance considerations: + *

+ */ public class PersistentMessageQueue implements Closeable { - private static final boolean DEBUG = false; - // size params - private static final int QUEUE_HEADER_SIZE = 24; - private static final int BLOCK_HEADER_SIZE = 8; - private static final long MAX_FILE_SIZE = 1024L * 1024L * 1024L; - private static final int INITIAL_FILE_SIZE = QUEUE_HEADER_SIZE; - private static final int PAGE_SIZE = 4096; - private static final int DEFAULT_BUFFER_SIZE = (1024 * 1024 / PAGE_SIZE) * PAGE_SIZE; // 1MB aligned - private static final int MAX_BUFFER_SIZE = (8 * 1024 * 1024 / PAGE_SIZE) * PAGE_SIZE; // 8MB aligned - - // write batching - private static final int BATCH_THRESHOLD = 64; - private final ByteBuffer writeBatchBuffer; - private int batchSize = 0; - private long batchStartOffset; - - // store - private final FileChannel channel; - private final RandomAccessFile file; - private ByteBuffer messageBuffer; - - // offsets - private volatile long frontOffset; - private volatile long rearOffset; - - // utils - private final ReentrantLock lock; - private final CRC32 checksumCalculator; - - public PersistentMessageQueue(String filename) throws IOException { - File f = new File(filename); - boolean isNew = !f.exists(); - this.file = new RandomAccessFile(f, "rw"); - this.channel = file.getChannel(); - - // Use direct buffers for better I/O performance - this.messageBuffer = ByteBuffer.allocateDirect(DEFAULT_BUFFER_SIZE); - this.writeBatchBuffer = ByteBuffer.allocateDirect(MAX_BUFFER_SIZE); - - this.lock = new ReentrantLock(true); // Fair lock for predictable ordering - this.checksumCalculator = new CRC32(); - - if (isNew) { - initializeNewFile(); - } else { - loadMetadata(); - } + private static final boolean DEBUG = false; + private static final int QUEUE_HEADER_SIZE = 24; + private static final int BLOCK_HEADER_SIZE = 8; + private static final long MAX_FILE_SIZE = 1024L * 1024L * 1024L; + private static final int INITIAL_FILE_SIZE = QUEUE_HEADER_SIZE; + private static final int PAGE_SIZE = 4096; + private static final int DEFAULT_BUFFER_SIZE = (1024 * 1024 / PAGE_SIZE) * PAGE_SIZE; + private static final int MAX_BUFFER_SIZE = (8 * 1024 * 1024 / PAGE_SIZE) * PAGE_SIZE; + private static final int BATCH_THRESHOLD = 64; + + private final ByteBuffer writeBatchBuffer; + private int batchSize = 0; + private long batchStartOffset; + private final FileChannel channel; + private final RandomAccessFile file; + private ByteBuffer messageBuffer; + private volatile long frontOffset; + private volatile long rearOffset; + private final ReentrantLock lock; + private final CRC32 checksumCalculator; + + /** + * Creates a new persistent message queue or opens an existing one. + * + *

+ * If the file doesn't exist, it will be created with initial metadata. + * If it exists, the queue metadata will be loaded and validated. + * + * @param filename the path to the queue file + * @throws IOException if the file cannot be created/opened or if the + * existing file is corrupted + * @throws SecurityException if the application doesn't have required file + * permissions + */ + public PersistentMessageQueue(String filename) throws IOException { + File f = new File(filename); + boolean isNew = !f.exists(); + this.file = new RandomAccessFile(f, "rw"); + this.channel = file.getChannel(); + + this.messageBuffer = ByteBuffer.allocateDirect(DEFAULT_BUFFER_SIZE); + this.writeBatchBuffer = ByteBuffer.allocateDirect(MAX_BUFFER_SIZE); + this.lock = new ReentrantLock(true); + this.checksumCalculator = new CRC32(); + + if (isNew) { + initializeNewFile(); + } else { + loadMetadata(); } - - public boolean offer(Message message) throws IOException { - if (message == null) - throw new NullPointerException("Message cannot be null"); - - byte[] serialized = MessageSerializer.serialize(message); - int totalSize = BLOCK_HEADER_SIZE + serialized.length; - - lock.lock(); - try { - if (rearOffset + totalSize > MAX_FILE_SIZE) - return false; - - // Ensure file has enough space - long requiredLength = rearOffset + totalSize; - if (requiredLength > file.length()) { - long newSize = Math.min(MAX_FILE_SIZE, - Math.max(file.length() * 2, requiredLength + DEFAULT_BUFFER_SIZE)); - file.setLength(newSize); - } - - checksumCalculator.reset(); - checksumCalculator.update(serialized); - int checksum = (int) checksumCalculator.getValue(); - - // Start new batch if needed - if (batchSize == 0) { - batchStartOffset = rearOffset; - } - - // Add to batch if message is small enough - if (serialized.length < DEFAULT_BUFFER_SIZE / 4 && - totalSize <= MAX_BUFFER_SIZE - writeBatchBuffer.position() && - batchSize < BATCH_THRESHOLD) { - - writeBatchBuffer.putInt(serialized.length); - writeBatchBuffer.putInt(checksum); - writeBatchBuffer.put(serialized); - batchSize++; - - rearOffset += totalSize; - - // Flush if batch is full - if (batchSize >= BATCH_THRESHOLD || - writeBatchBuffer.position() >= writeBatchBuffer.capacity() / 2) { - flushBatch(); - } - } else { - // Flush any existing batch first - if (batchSize > 0) { - flushBatch(); - } - - // Handle large message directly - if (serialized.length + BLOCK_HEADER_SIZE > messageBuffer.capacity()) { - messageBuffer = ByteBuffer.allocateDirect( - Math.min(MAX_BUFFER_SIZE, serialized.length + BLOCK_HEADER_SIZE)); - } - - messageBuffer.clear(); - messageBuffer.putInt(serialized.length); - messageBuffer.putInt(checksum); - messageBuffer.put(serialized); - messageBuffer.flip(); - - channel.write(messageBuffer, rearOffset); - rearOffset += totalSize; - saveMetadata(); - } - - return true; - } finally { - lock.unlock(); + } + + /** + * Offers a message to the queue. + * + *

+ * Messages smaller than 256KB are batched together for better performance. + * Larger messages are written directly to disk. If the queue file would exceed + * its maximum size (1GB), the message is rejected. + * + *

+ * Example: + * {@code + * Message msg = new Message("Important data".getBytes(), 1); + * boolean success = queue.offer(msg); + * if (!success) { + * System.err.println("Queue is full"); + * } + * } + * + * @param message the message to add to the queue + * @return true if the message was added, false if the queue is full + * @throws IOException if an I/O error occurs + * @throws NullPointerException if message is null + */ + public boolean offer(Message message) throws IOException { + if (message == null) + throw new NullPointerException("Message cannot be null"); + + byte[] serialized = MessageSerializer.serialize(message); + int totalSize = BLOCK_HEADER_SIZE + serialized.length; + + lock.lock(); + try { + if (rearOffset + totalSize > MAX_FILE_SIZE) + return false; + + long requiredLength = rearOffset + totalSize; + if (requiredLength > file.length()) { + long newSize = Math.min(MAX_FILE_SIZE, + Math.max(file.length() * 2, requiredLength + DEFAULT_BUFFER_SIZE)); + file.setLength(newSize); + } + + checksumCalculator.reset(); + checksumCalculator.update(serialized); + int checksum = (int) checksumCalculator.getValue(); + + if (batchSize == 0) { + batchStartOffset = rearOffset; + } + + if (serialized.length < DEFAULT_BUFFER_SIZE / 4 && + totalSize <= MAX_BUFFER_SIZE - writeBatchBuffer.position() && + batchSize < BATCH_THRESHOLD) { + + writeBatchBuffer.putInt(serialized.length); + writeBatchBuffer.putInt(checksum); + writeBatchBuffer.put(serialized); + batchSize++; + + rearOffset += totalSize; + + if (batchSize >= BATCH_THRESHOLD || + writeBatchBuffer.position() >= writeBatchBuffer.capacity() / 2) { + flushBatch(); } - } - - private void flushBatch() throws IOException { + } else { if (batchSize > 0) { - writeBatchBuffer.flip(); - channel.write(writeBatchBuffer, batchStartOffset); - writeBatchBuffer.clear(); - batchSize = 0; - saveMetadata(); + flushBatch(); } - } - public Message poll() throws IOException { - lock.lock(); - try { - if (isEmpty()) - return null; - - // Flush any pending writes before reading - if (batchSize > 0) { - flushBatch(); - } - - // Read message header - ByteBuffer headerBuffer = ByteBuffer.allocate(BLOCK_HEADER_SIZE); - int bytesRead = channel.read(headerBuffer, frontOffset); - if (bytesRead != BLOCK_HEADER_SIZE) { - throw new IOException("Failed to read message header"); - } - headerBuffer.flip(); - - int messageSize = headerBuffer.getInt(); - int storedChecksum = headerBuffer.getInt(); - - if (messageSize <= 0 || frontOffset + BLOCK_HEADER_SIZE + messageSize > file.length()) { - throw new IOException(String.format( - "Corrupted queue: invalid block size %d at offset %d (file length: %d)", - messageSize, frontOffset, file.length())); - } - - // Read message data - ByteBuffer dataBuffer = ByteBuffer.allocate(messageSize); - bytesRead = channel.read(dataBuffer, frontOffset + BLOCK_HEADER_SIZE); - if (bytesRead != messageSize) { - throw new IOException("Failed to read message data"); - } - dataBuffer.flip(); - - byte[] data = new byte[messageSize]; - dataBuffer.get(data); - - checksumCalculator.reset(); - checksumCalculator.update(data); - int calculatedChecksum = (int) checksumCalculator.getValue(); - - if (storedChecksum != calculatedChecksum) { - throw new IOException(String.format( - "Corrupted message: checksum mismatch at offset %d. Expected: %d, Got: %d", - frontOffset, storedChecksum, calculatedChecksum)); - } - - Message message = MessageSerializer.deserialize(data); - frontOffset += BLOCK_HEADER_SIZE + messageSize; - saveMetadata(); - - return message; - } finally { - lock.unlock(); + if (serialized.length + BLOCK_HEADER_SIZE > messageBuffer.capacity()) { + messageBuffer = ByteBuffer.allocateDirect( + Math.min(MAX_BUFFER_SIZE, serialized.length + BLOCK_HEADER_SIZE)); } - } - private void loadMetadata() throws IOException { - if (file.length() < QUEUE_HEADER_SIZE) { - throw new IOException("File too small to contain valid header"); - } + messageBuffer.clear(); + messageBuffer.putInt(serialized.length); + messageBuffer.putInt(checksum); + messageBuffer.put(serialized); + messageBuffer.flip(); - ByteBuffer buffer = ByteBuffer.allocate(QUEUE_HEADER_SIZE); - int bytesRead = channel.read(buffer, 0); - if (bytesRead != QUEUE_HEADER_SIZE) { - throw new IOException("Failed to read queue metadata"); - } - buffer.flip(); - - frontOffset = buffer.getLong(); - rearOffset = buffer.getLong(); + channel.write(messageBuffer, rearOffset); + rearOffset += totalSize; + saveMetadata(); + } - if (frontOffset < QUEUE_HEADER_SIZE || rearOffset < QUEUE_HEADER_SIZE || - frontOffset > file.length() || rearOffset > file.length() || - frontOffset > rearOffset) { - throw new IOException("Corrupted queue metadata"); - } + return true; + } finally { + lock.unlock(); + } + } + + /** + * Retrieves and removes the head of the queue, or returns null if the queue is + * empty. + * + *

+ * This operation ensures data integrity by validating the CRC32 checksum of the + * message before returning it. + * + *

+ * Example: + * {@code + * while (true) { + * Message msg = queue.poll(); + * if (msg == null) { + * // Queue is empty + * break; + * } + * processMessage(msg); + * } + * } + * + * @return the head message of the queue, or null if the queue is empty + * @throws IOException if an I/O error occurs or if the message data is + * corrupted + */ + public Message poll() throws IOException { + lock.lock(); + try { + if (isEmpty()) + return null; + + if (batchSize > 0) { + flushBatch(); + } + + ByteBuffer headerBuffer = ByteBuffer.allocate(BLOCK_HEADER_SIZE); + int bytesRead = channel.read(headerBuffer, frontOffset); + if (bytesRead != BLOCK_HEADER_SIZE) { + throw new IOException("Failed to read message header"); + } + headerBuffer.flip(); + + int messageSize = headerBuffer.getInt(); + int storedChecksum = headerBuffer.getInt(); + + if (messageSize <= 0 || frontOffset + BLOCK_HEADER_SIZE + messageSize > file.length()) { + throw new IOException(String.format( + "Corrupted queue: invalid block size %d at offset %d (file length: %d)", + messageSize, frontOffset, file.length())); + } + + ByteBuffer dataBuffer = ByteBuffer.allocate(messageSize); + bytesRead = channel.read(dataBuffer, frontOffset + BLOCK_HEADER_SIZE); + if (bytesRead != messageSize) { + throw new IOException("Failed to read message data"); + } + dataBuffer.flip(); + + byte[] data = new byte[messageSize]; + dataBuffer.get(data); + + checksumCalculator.reset(); + checksumCalculator.update(data); + int calculatedChecksum = (int) checksumCalculator.getValue(); + + if (storedChecksum != calculatedChecksum) { + throw new IOException(String.format( + "Corrupted message: checksum mismatch at offset %d. Expected: %d, Got: %d", + frontOffset, storedChecksum, calculatedChecksum)); + } + + Message message = MessageSerializer.deserialize(data); + frontOffset += BLOCK_HEADER_SIZE + messageSize; + saveMetadata(); + + return message; + } finally { + lock.unlock(); + } + } + + /** + * Checks if the queue is empty. + * + * @return true if the queue contains no messages, false otherwise + */ + public boolean isEmpty() { + return frontOffset >= rearOffset && batchSize == 0; + } + + /** + * Closes the queue, ensuring all pending writes are flushed to disk. + * + *

+ * This method should be called when the queue is no longer needed to + * ensure proper resource cleanup. It's recommended to use try-with-resources + * to ensure the queue is properly closed. + * + * @throws IOException if an I/O error occurs while closing + */ + @Override + public void close() throws IOException { + lock.lock(); + try { + flushBatch(); + saveMetadata(); + channel.force(true); + channel.close(); + file.close(); + } finally { + lock.unlock(); } + } + + private void flushBatch() throws IOException { + if (batchSize > 0) { + writeBatchBuffer.flip(); + channel.write(writeBatchBuffer, batchStartOffset); + writeBatchBuffer.clear(); + batchSize = 0; + saveMetadata(); + } + } - private void saveMetadata() throws IOException { - ByteBuffer buffer = ByteBuffer.allocate(QUEUE_HEADER_SIZE); - buffer.putLong(frontOffset); - buffer.putLong(rearOffset); - buffer.flip(); - channel.write(buffer, 0); - channel.force(true); + private void loadMetadata() throws IOException { + if (file.length() < QUEUE_HEADER_SIZE) { + throw new IOException("File too small to contain valid header"); } - public boolean isEmpty() { - return frontOffset >= rearOffset && batchSize == 0; + ByteBuffer buffer = ByteBuffer.allocate(QUEUE_HEADER_SIZE); + int bytesRead = channel.read(buffer, 0); + if (bytesRead != QUEUE_HEADER_SIZE) { + throw new IOException("Failed to read queue metadata"); } - @Override - public void close() throws IOException { - lock.lock(); - try { - flushBatch(); - saveMetadata(); - channel.force(true); - channel.close(); - file.close(); - } finally { - lock.unlock(); - } + buffer.flip(); + + frontOffset = buffer.getLong(); + rearOffset = buffer.getLong(); + + if (frontOffset < QUEUE_HEADER_SIZE || rearOffset < QUEUE_HEADER_SIZE || + frontOffset > file.length() || rearOffset > file.length() || + frontOffset > rearOffset) { + throw new IOException("Corrupted queue metadata"); } + } - private void initializeNewFile() throws IOException { - file.setLength(INITIAL_FILE_SIZE); - frontOffset = QUEUE_HEADER_SIZE; - rearOffset = QUEUE_HEADER_SIZE; - saveMetadata(); + private void saveMetadata() throws IOException { + // check for closed file + if (channel == null || !channel.isOpen() || file == null || !file.getFD().valid() || + file.getChannel() == null) { + throw new IOException("Queue file is closed"); } - private void debug(String format, Object... args) { - if (DEBUG) { - System.out.printf("[DEBUG] " + format + "%n", args); - } + // check for corruption before writing + if (frontOffset < QUEUE_HEADER_SIZE || rearOffset < QUEUE_HEADER_SIZE || + frontOffset > file.length() || rearOffset > file.length() || + frontOffset > rearOffset) { + throw new IOException("Corrupted queue metadata"); + } + ByteBuffer buffer = ByteBuffer.allocate(QUEUE_HEADER_SIZE); + buffer.putLong(frontOffset); + buffer.putLong(rearOffset); + buffer.flip(); + + channel.write(buffer, 0); + channel.force(true); + } + + private void initializeNewFile() throws IOException { + file.setLength(INITIAL_FILE_SIZE); + frontOffset = QUEUE_HEADER_SIZE; + rearOffset = QUEUE_HEADER_SIZE; + saveMetadata(); + } + + @SuppressWarnings("unused") + private void debug(String format, Object... args) { + if (DEBUG) { + System.out.printf("[DEBUG] " + format + "%n", args); } + } } \ No newline at end of file diff --git a/src/test/java/io/github/elimelt/pmqueue/MessageSerializerTest.java b/src/test/java/io/github/elimelt/pmqueue/MessageSerializerTest.java index 81a4176..4465db3 100644 --- a/src/test/java/io/github/elimelt/pmqueue/MessageSerializerTest.java +++ b/src/test/java/io/github/elimelt/pmqueue/MessageSerializerTest.java @@ -12,59 +12,58 @@ class MessageSerializerTest { - private static Stream messageTestCases() { - return Stream.of( - Arguments.of("Empty message", new byte[0], 0), - Arguments.of("Small message", "Hello".getBytes(), 1), - Arguments.of("Large message", new byte[1024 * 1024], 2), // 1MB - Arguments.of("Binary data", new byte[]{0, 1, 2, 3, -1, -2, -3}, 3) - ); - } + private static Stream messageTestCases() { + return Stream.of( + Arguments.of("Empty message", new byte[0], 0), + Arguments.of("Small message", "Hello".getBytes(), 1), + Arguments.of("Large message", new byte[1024 * 1024], 2), // 1MB + Arguments.of("Binary data", new byte[] { 0, 1, 2, 3, -1, -2, -3 }, 3)); + } - @ParameterizedTest(name = "{0}") - @MethodSource("messageTestCases") - @DisplayName("Serialize and deserialize should preserve message content") - void serializeAndDeserializeShouldPreserveContent(String testName, byte[] data, int type) throws IOException { - Message original = new Message(data, type); - - byte[] serialized = MessageSerializer.serialize(original); - Message deserialized = MessageSerializer.deserialize(serialized); - - assertArrayEquals(original.getData(), deserialized.getData(), - "Deserialized data should match original"); - assertEquals(original.getMessageType(), deserialized.getMessageType(), - "Message type should be preserved"); - } + @ParameterizedTest(name = "{0}") + @MethodSource("messageTestCases") + @DisplayName("Serialize and deserialize should preserve message content") + void serializeAndDeserializeShouldPreserveContent(String testName, byte[] data, int type) throws IOException { + Message original = new Message(data, type); - @Test - @DisplayName("Deserialize should reject invalid data") - void deserializeShouldRejectInvalidData() { - byte[] invalidData = "Not a valid serialized message".getBytes(); - assertThrows(IOException.class, () -> MessageSerializer.deserialize(invalidData)); - } + byte[] serialized = MessageSerializer.serialize(original); + Message deserialized = MessageSerializer.deserialize(serialized); - @Test - @DisplayName("Serialize should reject null message") - void serializeShouldRejectNull() { - assertThrows(IOException.class, () -> MessageSerializer.serialize(null)); - } + assertArrayEquals(original.getData(), deserialized.getData(), + "Deserialized data should match original"); + assertEquals(original.getMessageType(), deserialized.getMessageType(), + "Message type should be preserved"); + } - @Test - @DisplayName("Deserialize should handle empty array") - void deserializeShouldHandleEmptyArray() { - assertThrows(IOException.class, () -> MessageSerializer.deserialize(new byte[0])); - } + @Test + @DisplayName("Deserialize should reject invalid data") + void deserializeShouldRejectInvalidData() { + byte[] invalidData = "Not a valid serialized message".getBytes(); + assertThrows(IOException.class, () -> MessageSerializer.deserialize(invalidData)); + } - @Test - @DisplayName("Serialization should handle messages with timestamp") - void serializationShouldHandleTimestamp() throws IOException { - Message original = new Message("test".getBytes(), 1); - long originalTimestamp = original.getTimestamp(); - - byte[] serialized = MessageSerializer.serialize(original); - Message deserialized = MessageSerializer.deserialize(serialized); - - assertEquals(originalTimestamp, deserialized.getTimestamp(), - "Timestamp should be preserved during serialization"); - } + @Test + @DisplayName("Serialize should reject null message") + void serializeShouldRejectNull() { + assertThrows(IOException.class, () -> MessageSerializer.serialize(null)); + } + + @Test + @DisplayName("Deserialize should handle empty array") + void deserializeShouldHandleEmptyArray() { + assertThrows(IOException.class, () -> MessageSerializer.deserialize(new byte[0])); + } + + @Test + @DisplayName("Serialization should handle messages with timestamp") + void serializationShouldHandleTimestamp() throws IOException { + Message original = new Message("test".getBytes(), 1); + long originalTimestamp = original.getTimestamp(); + + byte[] serialized = MessageSerializer.serialize(original); + Message deserialized = MessageSerializer.deserialize(serialized); + + assertEquals(originalTimestamp, deserialized.getTimestamp(), + "Timestamp should be preserved during serialization"); + } } \ No newline at end of file diff --git a/src/test/java/io/github/elimelt/pmqueue/MessageTest.java b/src/test/java/io/github/elimelt/pmqueue/MessageTest.java index b2faa93..6e788ab 100644 --- a/src/test/java/io/github/elimelt/pmqueue/MessageTest.java +++ b/src/test/java/io/github/elimelt/pmqueue/MessageTest.java @@ -6,51 +6,51 @@ class MessageTest { - @Test - @DisplayName("Message constructor should correctly initialize fields") - void constructorShouldInitializeFields() { - byte[] data = "test data".getBytes(); - int messageType = 1; - - Message message = new Message(data, messageType); - - assertArrayEquals(data, message.getData()); - assertEquals(messageType, message.getMessageType()); - assertTrue(message.getTimestamp() > 0); - } - - @Test - @DisplayName("getData should return a copy of the data") - void getDataShouldReturnCopy() { - byte[] originalData = "test data".getBytes(); - Message message = new Message(originalData, 1); - - byte[] returnedData = message.getData(); - assertArrayEquals(originalData, returnedData); - - // Modify returned data - returnedData[0] = 42; - - // Original data in message should remain unchanged - assertNotEquals(returnedData[0], message.getData()[0]); - } - - @Test - @DisplayName("Constructor should create defensive copy of data") - void constructorShouldCreateDefensiveCopy() { - byte[] originalData = "test data".getBytes(); - Message message = new Message(originalData, 1); - - // Modify original data - originalData[0] = 42; - - // Message's data should remain unchanged - assertNotEquals(originalData[0], message.getData()[0]); - } - - @Test - @DisplayName("Constructor should reject null data") - void constructorShouldRejectNullData() { - assertThrows(NullPointerException.class, () -> new Message(null, 1)); - } + @Test + @DisplayName("Message constructor should correctly initialize fields") + void constructorShouldInitializeFields() { + byte[] data = "test data".getBytes(); + int messageType = 1; + + Message message = new Message(data, messageType); + + assertArrayEquals(data, message.getData()); + assertEquals(messageType, message.getMessageType()); + assertTrue(message.getTimestamp() > 0); + } + + @Test + @DisplayName("getData should return a copy of the data") + void getDataShouldReturnCopy() { + byte[] originalData = "test data".getBytes(); + Message message = new Message(originalData, 1); + + byte[] returnedData = message.getData(); + assertArrayEquals(originalData, returnedData); + + // modify returned data + returnedData[0] = 42; + + // message remains unchanged + assertNotEquals(returnedData[0], message.getData()[0]); + } + + @Test + @DisplayName("Constructor should create defensive copy of data") + void constructorShouldCreateDefensiveCopy() { + byte[] originalData = "test data".getBytes(); + Message message = new Message(originalData, 1); + + // modify original data + originalData[0] = 42; + + // message remain unchanged + assertNotEquals(originalData[0], message.getData()[0]); + } + + @Test + @DisplayName("Constructor should reject null data") + void constructorShouldRejectNullData() { + assertThrows(NullPointerException.class, () -> new Message(null, 1)); + } } \ No newline at end of file diff --git a/src/test/java/io/github/elimelt/pmqueue/PersistentMessageQueueTest.java b/src/test/java/io/github/elimelt/pmqueue/PersistentMessageQueueTest.java index ac56b65..7ecca9c 100644 --- a/src/test/java/io/github/elimelt/pmqueue/PersistentMessageQueueTest.java +++ b/src/test/java/io/github/elimelt/pmqueue/PersistentMessageQueueTest.java @@ -1,211 +1,222 @@ package io.github.elimelt.pmqueue; import org.junit.jupiter.api.*; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import static org.junit.jupiter.api.Assertions.*; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.*; +import java.io.*; +import java.util.*; -@TestMethodOrder(MethodOrderer.OrderAnnotation.class) class PersistentMessageQueueTest { - private static final String TEST_FILE = "test_queue.dat"; - private PersistentMessageQueue queue; - - @BeforeEach - void setUp() throws IOException { - queue = new PersistentMessageQueue(TEST_FILE); - } - - @AfterEach - void tearDown() throws IOException { + private static final String BASE_TEST_DIR = "tmp/test_queues/"; + private PersistentMessageQueue queue; + private File testFile; + private final Random random = new Random(); + + @BeforeAll + static void setUpTestDirectory() { + new File(BASE_TEST_DIR).mkdirs(); + } + + @BeforeEach + void setUp() throws IOException { + // create unique file for each test + testFile = new File(BASE_TEST_DIR + UUID.randomUUID() + ".dat"); + queue = new PersistentMessageQueue(testFile.getPath()); + } + + @AfterEach + void tearDown() throws IOException { + try { + if (queue != null) { queue.close(); - new File(TEST_FILE).delete(); + } + if (testFile != null && testFile.exists()) { + testFile.delete(); + } + } catch (IOException ignore) { } - - @Test - @Order(1) - @DisplayName("Queue should be empty when created") - void queueShouldBeEmptyWhenCreated() { - assertTrue(queue.isEmpty(), "Newly created queue should be empty"); + } + + @AfterAll + static void cleanUpTestDirectory() { + deleteDirectory(new File(BASE_TEST_DIR)); + } + + private static void deleteDirectory(File directory) { + File[] files = directory.listFiles(); + if (files != null) { + for (File file : files) { + if (file.isDirectory()) { + deleteDirectory(file); + } else { + file.delete(); + } + } } - - @Test - @Order(2) - @DisplayName("Basic offer and poll operations should work") - void basicOfferAndPollShouldWork() throws IOException { - Message message = new Message("test".getBytes(), 1); - assertTrue(queue.offer(message), "Offer should succeed"); - assertFalse(queue.isEmpty(), "Queue should not be empty after offer"); - - Message retrieved = queue.poll(); - assertNotNull(retrieved, "Poll should return message"); - assertArrayEquals(message.getData(), retrieved.getData(), "Message data should match"); - assertEquals(message.getMessageType(), retrieved.getMessageType(), "Message type should match"); - - assertTrue(queue.isEmpty(), "Queue should be empty after poll"); + directory.delete(); + } + + @Test + void queueShouldBeEmptyWhenCreated() { + assertTrue(queue.isEmpty(), "Newly created queue should be empty"); + } + + @Test + void basicOfferAndPollShouldWork() throws IOException { + Message message = new Message("test".getBytes(), 1); + assertTrue(queue.offer(message)); + assertFalse(queue.isEmpty()); + + Message retrieved = queue.poll(); + assertNotNull(retrieved); + assertArrayEquals(message.getData(), retrieved.getData()); + assertEquals(message.getMessageType(), retrieved.getMessageType()); + assertTrue(queue.isEmpty()); + } + + @Test + void shouldHandleNullMessageOffer() { + assertThrows(NullPointerException.class, () -> queue.offer(null)); + } + + @Test + void shouldHandleCorruptedFileHeader() throws IOException { + queue.close(); + + // corrupt header + try (RandomAccessFile file = new RandomAccessFile(testFile, "rw")) { + file.seek(0); + file.writeLong(-1L); // invalid front offset } - @Test - @Order(3) - @DisplayName("Queue should persist messages across restarts") - void queueShouldPersistMessages() throws IOException { - Message message = new Message("persistent".getBytes(), 1); - queue.offer(message); - queue.close(); - - // Reopen queue - queue = new PersistentMessageQueue(TEST_FILE); - Message retrieved = queue.poll(); - - assertNotNull(retrieved, "Message should persist after restart"); - assertArrayEquals(message.getData(), retrieved.getData(), "Message data should persist correctly"); + assertThrows(IOException.class, () -> new PersistentMessageQueue(testFile.getPath())); + } + + @Test + void fuzzTestRandomMessageSizes() throws IOException { + List messages = new ArrayList<>(); + + // messages with random sizes + for (int i = 0; i < 100; i++) { // + int size = random.nextInt(1024); + byte[] data = new byte[size]; + random.nextBytes(data); + Message msg = new Message(data, i); + messages.add(msg); + assertTrue(queue.offer(msg)); } - @Test - @Order(4) - @DisplayName("Queue should handle multiple messages") - void queueShouldHandleMultipleMessages() throws IOException { - int messageCount = 100; - List messages = new ArrayList<>(); - - // Offer messages - for (int i = 0; i < messageCount; i++) { - Message msg = new Message(("message" + i).getBytes(), i); - messages.add(msg); - assertTrue(queue.offer(msg), "Offer should succeed for message " + i); - } - - // Poll and verify messages - for (int i = 0; i < messageCount; i++) { - Message original = messages.get(i); - Message retrieved = queue.poll(); - - assertNotNull(retrieved, "Should retrieve message " + i); - assertArrayEquals(original.getData(), retrieved.getData(), - "Data should match for message " + i); - assertEquals(original.getMessageType(), retrieved.getMessageType(), - "Type should match for message " + i); - } - - assertTrue(queue.isEmpty(), "Queue should be empty after polling all messages"); + // verify messages + for (Message original : messages) { + Message retrieved = queue.poll(); + assertNotNull(retrieved); + assertArrayEquals(original.getData(), retrieved.getData()); + assertEquals(original.getMessageType(), retrieved.getMessageType()); } - - @Test - @Order(5) - @DisplayName("Queue should handle concurrent operations") - void queueShouldHandleConcurrentOperations() throws InterruptedException { - int threadCount = 10; - int messagesPerThread = 100; - ExecutorService executor = Executors.newFixedThreadPool(threadCount); - CountDownLatch latch = new CountDownLatch(threadCount * 2); // For both producers and consumers - - // Start producer threads - for (int t = 0; t < threadCount; t++) { - final int threadId = t; - executor.submit(() -> { - try { - for (int i = 0; i < messagesPerThread; i++) { - Message msg = new Message( - ("thread" + threadId + "msg" + i).getBytes(), - threadId * 1000 + i - ); - queue.offer(msg); - } - } catch (IOException e) { - fail("Producer thread failed: " + e.getMessage()); - } finally { - latch.countDown(); - } - }); + } + + @ParameterizedTest + @ValueSource(ints = { 0, 1, 100, 1024 }) + void fuzzTestSpecificMessageSizes(int size) throws IOException { + byte[] data = new byte[size]; + random.nextBytes(data); + Message msg = new Message(data, 1); + + assertTrue(queue.offer(msg)); + Message retrieved = queue.poll(); + + assertNotNull(retrieved); + assertArrayEquals(msg.getData(), retrieved.getData()); + } + + @Test + void fuzzTestRandomBinaryData() throws IOException { + List messages = new ArrayList<>(); + + // write messages + for (int i = 0; i < 100; i++) { + byte[] data = new byte[random.nextInt(100)]; + random.nextBytes(data); + // include random nulls and control characters + for (int j = 0; j < data.length; j++) { + if (random.nextInt(10) == 0) { + data[j] = 0; } - - // Start consumer threads - ConcurrentHashMap messageCount = new ConcurrentHashMap<>(); - for (int t = 0; t < threadCount; t++) { - executor.submit(() -> { - try { - while (true) { - Message msg = queue.poll(); - if (msg == null) { - // Check if all messages have been processed - if (messageCount.size() == threadCount * messagesPerThread) { - break; - } - Thread.sleep(10); - continue; - } - messageCount.put(new String(msg.getData()), msg.getMessageType()); - } - } catch (Exception e) { - fail("Consumer thread failed: " + e.getMessage()); - } finally { - latch.countDown(); - } - }); - } - - assertTrue(latch.await(30, TimeUnit.SECONDS), "Operations timed out"); - executor.shutdown(); - assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS), "Executor shutdown timed out"); - - assertEquals(threadCount * messagesPerThread, messageCount.size(), - "Should process all messages exactly once"); + } + Message msg = new Message(data, random.nextInt()); + messages.add(msg); + assertTrue(queue.offer(msg)); } - @Test - @Order(6) - @DisplayName("Queue should handle large messages") - void queueShouldHandleLargeMessages() throws IOException { - byte[] largeData = new byte[1024 * 1024]; // 1MB - ThreadLocalRandom.current().nextBytes(largeData); - - Message message = new Message(largeData, 1); - assertTrue(queue.offer(message), "Should accept large message"); + // close and reopen queue + queue.close(); + queue = new PersistentMessageQueue(testFile.getPath()); + // verify messages + for (Message original : messages) { + Message retrieved = queue.poll(); + assertNotNull(retrieved); + assertArrayEquals(original.getData(), retrieved.getData()); + assertEquals(original.getMessageType(), retrieved.getMessageType()); + } + } + + @Test + void stressTestRapidOperations() throws IOException { + for (int cycle = 0; cycle < 10; cycle++) { + List messages = new ArrayList<>(); + + // write messages + for (int i = 0; i < 100; i++) { + Message msg = new Message(("stress" + i).getBytes(), i); + messages.add(msg); + assertTrue(queue.offer(msg)); + } + + // verify messages + for (Message original : messages) { Message retrieved = queue.poll(); - assertNotNull(retrieved, "Should retrieve large message"); - assertArrayEquals(largeData, retrieved.getData(), "Large message data should match"); + assertNotNull(retrieved); + assertEquals(original.getMessageType(), retrieved.getMessageType()); + assertArrayEquals(original.getData(), retrieved.getData()); + } + } + } + + @Test + void shouldHandleMaxMessageSize() throws IOException { + byte[] largeData = new byte[1024 * 1024]; // 1MB + Message msg = new Message(largeData, 1); + assertTrue(queue.offer(msg)); + + Message retrieved = queue.poll(); + assertNotNull(retrieved); + assertArrayEquals(msg.getData(), retrieved.getData()); + } + + @Test + void shouldHandleRapidOpenClose() throws IOException { + List messages = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + queue.close(); + queue = new PersistentMessageQueue(testFile.getPath()); + Message msg = new Message(("cycle" + i).getBytes(), i); + messages.add(msg); + queue.offer(msg); } - @Test - @Order(7) - @DisplayName("Queue should maintain data integrity after compaction") - void queueShouldMaintainIntegrityAfterCompaction() throws IOException { - // Fill queue with messages - List messages = new ArrayList<>(); - for (int i = 0; i < 1000; i++) { - Message msg = new Message(("compaction test " + i).getBytes(), i); - messages.add(msg); - queue.offer(msg); - } - - // Poll half the messages - for (int i = 0; i < 500; i++) { - assertNotNull(queue.poll(), "Should retrieve message during compaction test"); - } - - // Add more messages - for (int i = 1000; i < 1500; i++) { - Message msg = new Message(("compaction test " + i).getBytes(), i); - messages.add(msg); - queue.offer(msg); - } - - // Verify remaining messages - for (int i = 500; i < messages.size(); i++) { - Message original = messages.get(i); - Message retrieved = queue.poll(); - - assertNotNull(retrieved, "Should retrieve message after compaction"); - assertArrayEquals(original.getData(), retrieved.getData(), - "Message data should be preserved after compaction"); - assertEquals(original.getMessageType(), retrieved.getMessageType(), - "Message type should be preserved after compaction"); - } + // verify messages + queue.close(); + queue = new PersistentMessageQueue(testFile.getPath()); + for (Message original : messages) { + Message retrieved = queue.poll(); + assertNotNull(retrieved); + assertArrayEquals(original.getData(), retrieved.getData()); + assertEquals(original.getMessageType(), retrieved.getMessageType()); } + } } \ No newline at end of file