Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[UNDERTOW-2532] Fix NPE when transmitting text or binary message to websocket session at the same time #1708

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public interface JsrWebSocketMessages {
@Message(id = 3015, value = "No decoder accepted message %s")
String noDecoderAcceptedMessage(List<? extends Decoder> decoders);

@Message(id = 3016, value = "Cannot send in middle of fragmeneted message")
@Message(id = 3016, value = "Cannot send in middle of fragmented message")
IllegalStateException cannotSendInMiddleOfFragmentedMessage();

@Message(id = 3017, value = "Cannot add endpoint after deployment")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@
*/
package io.undertow.websockets.jsr;

import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Future;

import io.undertow.websockets.core.BinaryOutputStream;
import io.undertow.websockets.core.StreamSinkFrameChannel;
import io.undertow.websockets.core.WebSocketCallback;
import io.undertow.websockets.core.WebSocketFrameType;
import io.undertow.websockets.core.WebSocketUtils;
import io.undertow.websockets.core.WebSockets;
import org.xnio.channels.Channels;

import jakarta.websocket.EncodeException;
import jakarta.websocket.RemoteEndpoint;
import jakarta.websocket.SendHandler;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Future;
import org.xnio.channels.Channels;

/**
* {@link RemoteEndpoint} implementation which uses a WebSocketSession for all its operation.
Expand Down Expand Up @@ -243,7 +243,7 @@ class BasicWebSocketSessionRemoteEndpoint implements Basic {
private StreamSinkFrameChannel binaryFrameSender;
private StreamSinkFrameChannel textFrameSender;

public void assertNotInFragment() {
public synchronized void assertNotInFragment() {
if (textFrameSender != null || binaryFrameSender != null) {
throw JsrWebSocketMessages.MESSAGES.cannotSendInMiddleOfFragmentedMessage();
}
Expand All @@ -268,57 +268,78 @@ public void sendBinary(final ByteBuffer data) throws IOException {
data.clear(); //for some reason the TCK expects this, might as well just match the RI behaviour
}

@Override
@Override
public void sendText(final String partialMessage, final boolean isLast) throws IOException {
if(partialMessage == null) {
if (partialMessage == null) {
throw JsrWebSocketMessages.MESSAGES.messageInNull();
}
if (binaryFrameSender != null) {
throw JsrWebSocketMessages.MESSAGES.cannotSendInMiddleOfFragmentedMessage();
}
if (textFrameSender == null) {
textFrameSender = undertowSession.getWebSocketChannel().send(WebSocketFrameType.TEXT);
}

StreamSinkFrameChannel sender = getTextFrameSender();

try {
Channels.writeBlocking(textFrameSender, WebSocketUtils.fromUtf8String(partialMessage));
if(isLast) {
textFrameSender.shutdownWrites();
Channels.writeBlocking(sender, WebSocketUtils.fromUtf8String(partialMessage));
if (isLast) {
sender.shutdownWrites();
}
Channels.flushBlocking(textFrameSender);
Channels.flushBlocking(sender);
} finally {
if (isLast) {
textFrameSender = null;
clearTextFrameSender();
}
}

}

@Override
public void sendBinary(final ByteBuffer partialByte, final boolean isLast) throws IOException {

if(partialByte == null) {
throw JsrWebSocketMessages.MESSAGES.messageInNull();
}
if (textFrameSender != null) {
throw JsrWebSocketMessages.MESSAGES.cannotSendInMiddleOfFragmentedMessage();
}
if (binaryFrameSender == null) {
binaryFrameSender = undertowSession.getWebSocketChannel().send(WebSocketFrameType.BINARY);
}

StreamSinkFrameChannel sender = getBinaryFrameSender();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im going to talk about this with @ropalka . But it seems its technically possible for competing send to undercut one another, though I might be missing how this piece is being used.

Copy link
Author

@raccoonback raccoonback Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@baranowb
I would like to share my understanding and seek your feedback.

For example, in the case of sending a Text message, I assume that a single message is divided into multiple frames for transmission. In this process, the TextFrameSender class variable is used to call sendText(partialMessage, isLast) multiple times, sending partial messages through the same channel. (During this process, other send() methods are not allowed to send messages.)

However, if partial frames are being sent from multiple threads, I believe there is a possibility that one thread might set textFrameSender to null first, and then another thread could call Channels.flushBlocking(StreamSinkFrameChannel), leading to a NullPointerException (NPE).

To address this, I have modified the code to ensure synchronization when accessing the shared resource, the textFrameSender class variable.

Additionally, could you please clarify what specific scenarios you had in mind regarding 'competing send to undercut one another'?
Understanding this will help me consider those cases and make further improvements accordingly.


try {
Channels.writeBlocking(binaryFrameSender, partialByte);
if(isLast) {
binaryFrameSender.shutdownWrites();
Channels.writeBlocking(sender, partialByte);
if (isLast) {
sender.shutdownWrites();
}
Channels.flushBlocking(binaryFrameSender);
} finally {
Channels.flushBlocking(sender);
}
finally {
if (isLast) {
binaryFrameSender = null;
clearBinaryFrameSender();
}
}
partialByte.clear();
}

private synchronized StreamSinkFrameChannel getTextFrameSender() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two different senders are guarded by single instance lock. Is there a reason for such arrangement?

Copy link
Author

@raccoonback raccoonback Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@baranowb
Thank you for your question.
The textFrameSender and binaryFrameSender class variables are initialized and set to null during partial message transmission.
During this process, all other send() methods are blocked from sending messages. Specifically, while sending partial messages using textFrameSender, transmissions using binaryFrameSender are also restricted.

If we were to apply separate locks for textFrameSender and binaryFrameSender, there would be a high risk of deadlocks, especially when different methods interact with these variables concurrently.
To mitigate this, we decided to use a single instance lock to manage both senders.

Additionally, other send() methods call assertNotInFragment() to check if a partial message is being transmitted.
Using separate locks in this scenario could similarly lead to deadlocks.
For these reasons, we opted for a single lock to ensure safe and consistent behavior.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@baranowb
Alternatively, it seems possible to control concurrency by declaring textFrameSender and binaryFrameSender as AtomicReference<StreamSinkFrameChannel> without using synchronized.

if (binaryFrameSender != null) {
throw JsrWebSocketMessages.MESSAGES.cannotSendInMiddleOfFragmentedMessage();
}
if (textFrameSender == null) {
textFrameSender = undertowSession.getWebSocketChannel().send(WebSocketFrameType.TEXT);
}
return textFrameSender;
}

private synchronized void clearTextFrameSender() {
textFrameSender = null;
}

private synchronized StreamSinkFrameChannel getBinaryFrameSender() throws IOException {
if (textFrameSender != null) {
throw JsrWebSocketMessages.MESSAGES.cannotSendInMiddleOfFragmentedMessage();
}
if (binaryFrameSender == null) {
binaryFrameSender = undertowSession.getWebSocketChannel().send(WebSocketFrameType.BINARY);
}
return binaryFrameSender;
}

private synchronized void clearBinaryFrameSender() {
binaryFrameSender = null;
}

@Override
public OutputStream getSendStream() throws IOException {
assertNotInFragment();
Expand Down
Loading