Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reenable subscribe message with security #6

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,8 @@ RUN git clone -b 0.x https://github.com/meetecho/janus-gateway.git && \
cd / && rm -rf janus-gateway


RUN git clone -b master https://github.com/networked-aframe/janus-plugin-sfu.git && \
RUN git clone -b reenable-subscribe https://github.com/networked-aframe/janus-plugin-sfu.git && \
cd janus-plugin-sfu && \
git checkout 1914dfa7e22c793f4a684ebeb002304661270519 && \
echo version 2 increment this line to invalidate cache of this layer while iterating build during development && \
cargo build --release && \
mkdir -p /usr/lib/janus/plugins && \
Expand Down
89 changes: 73 additions & 16 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ expect consumers of this plugin to use WebSockets, but you can probably use what
1. Signal your attachment to the Janus plugin. See the [Janus documentation][janus-transports] on how to attach to a
plugin. This plugin's name is `janus.plugin.sfu`.

2. Determine your user ID. This should be a unique ID that nobody else is likely to share. In the future, we will actually
have authentication; as it stands just pick a big random ID and pray for no collisions.
2. Determine your user ID. This should be a unique ID that nobody else is likely to share. Pick a big random ID and pray for no collisions.

3. Create an RTC connection.

Expand Down Expand Up @@ -55,44 +54,102 @@ join a room. You can only join one room with any connection.
"kind": "join",
"room_id": room ID,
"user_id": user ID,
"subscribe": [none|subscription object]
"subscribe": {
"notifications": [none|boolean],
"data": [none|boolean],
"media": [none|user ID]
},
"token": [none|string]
}
```

If `subscription: {...}` is passed, you will synchronously configure an initial subscription to the traffic that you
want to get pushed through your connection. The format of the subscription should be identical to that in the
[subscribe](#subscribe) message, below.
If `notifications` is `true`, you will get websocket events corresponding to every time someone joins or leaves the server, someone blocked or unblocked you.

If `data` is `true`, you will get all data traffic from other users (publishers) in your room. You can also send data. Currently this flag is used to register the
connection as a publisher, if false or not defined the connection is registered as a subscriber.

If `media` is a user ID, the server will respond with a JSEP offer which you can use to establish a connection suitable to receive audio and video RTP data coming from that user ID.

Although `subscribe: {...}` can be omitted and is valid, it doesn't
make much sense to register a connection that does nothing.

This is usually used as follow for a publisher connection:

"subscribe": {
"notifications": true,
"data": true
}

to send data, receive data, and subscribe to notifications in the currently-joined room.

And for a subscriber connection:

"subscribe": {
"media": user ID
}

`token` is a JWT to verify you allowed to connect to the room `room_id`, this is verified if you specified an `auth_key` to a public RSA key in DER format in `janus.plugin.sfu.cfg`. The JWT contains the following claims:

```
{kick_users: [true|false], join_hub: true, room_ids: ["room_alpha"]}
```

`room_ids` is optional. If `room_ids` is not specified, you can connect to the room `room_id` if `join_hub` is `true`.
If `room_ids` is specified, `room_id` needs to be listed in `room_ids` to be able to connect to the room.

The response will return the users on the server in the room you joined, as below, including yourself. If you `subscribe`d to a user's media, you will also get a JSEP offer you can use to get that user's RTP traffic.

```
{
"success": true,
"response": {
"users": {room_alpha: ["123", "789"]}
"users": {"room_alpha": ["123", "789"]}
}
}
```

### Subscribe

Subscribes to some kind of traffic coming from the server.
Subscribes to audio/video of a user (publisher) if you know the user ID.
This message is only useful if you're using an external user presence system
to know the participants currently in the room.
With this plugin alone, you can't know a publisher user ID without
being a publisher in the room to get the users in the room, and in this case
you use the Join message to subscribe to other users, not this message.

```
{
"kind": "subscribe",
"notifications": [none|boolean],
"data": [none|boolean],
"media": [none|user ID]
"what": {
"notifications": [none|boolean],
"data": [none|boolean],
"media": [none|user ID]
},
"token": [none|string]
}
```

If `notifications` is `true`, you will get websocket events corresponding to every time someone joins or leaves the server.

If `data` is `true`, you will get all data traffic from other users in your room, if you've joined a room.
`notifications` and `data` are completely ignored here. Those flags are only
checked for a publisher connection with the Join message.

If `media` is a user ID, the server will respond with a JSEP offer which you can use to establish a connection suitable to receive audio and video RTP data coming from that user ID.

If `token` is specified, `room_ids` claim is present in the JWT and `auth_key` configured, the following security check is done:
the user is allowed to subscribe to a publisher user ID (specified in `media`) if this publisher is connected to a room listed in the JWT `room_ids`.

### Kick

Kick another user from the room. You need a token with the `kick_users: true` claim.

```
{
"kind": "kick",
"room_id": room ID,
"user_id": user ID,
"token": string
}
```

### Block

Blocks another user. Blocks are bidirectional; the targeted user won't get your data, audio, or video, and you won't get
Expand All @@ -101,7 +158,7 @@ theirs. That user will get a `blocked` event letting them know.
```
{
"kind": "block",
"whom": [user ID]
"whom": user ID
}
```

Expand All @@ -114,7 +171,7 @@ Unblock a user who you previously blocked. That user will get an `unblocked` eve
```
{
"kind": "block",
"whom": [user ID]
"whom": user ID
}
```

Expand Down
81 changes: 57 additions & 24 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,27 +568,62 @@ fn process_unblock(from: &Arc<Session>, whom: UserId) -> MessageResult {
}
}

// fn process_subscribe(from: &Arc<Session>, what: &Subscription) -> MessageResult {
// janus_info!("Processing subscription from {:p}: {:?}", from.handle, what);
// if let Err(_existing) = from.subscription.set(what.clone()) {
// return Err(From::from("Users may only subscribe once!"));
// }

// let mut switchboard = SWITCHBOARD.write()?;
// if let Some(ref publisher_id) = what.media {
// let publisher = switchboard
// .get_publisher(publisher_id)
// .ok_or("Can't subscribe to a nonexistent publisher.")?
// .clone();
// let jsep = json!({
// "type": "offer",
// "sdp": publisher.subscriber_offer.lock().unwrap().as_ref().unwrap()
// });
// switchboard.subscribe_to_user(from.clone(), publisher);
// return Ok(MessageResponse::new(json!({}), jsep));
// }
// Ok(MessageResponse::msg(json!({})))
// }
fn process_subscribe(from: &Arc<Session>, what: Subscription, token: Option<String>) -> MessageResult {
janus_info!("Processing subscription from {:p}: {:?}", from.handle, what);
if let Err(_existing) = from.subscription.set(what.clone()) {
return Err(From::from("Users may only subscribe once!"));
}

let mut switchboard = SWITCHBOARD.write()?;
if let Some(ref publisher_id) = what.media {
let publisher = switchboard
.get_publisher(publisher_id)
.ok_or("Can't subscribe to a nonexistent publisher.")?
.clone();

let config = CONFIG.get().unwrap();
match (&config.auth_key, token) {
(None, _) => {
janus_verb!(
"No auth_key configured. Allowing subscription from {:p} to publisher {}.",
from.handle,
publisher_id
);
}
(Some(_), None) => {
janus_warn!("Rejecting anonymous subscription from {:p} to publisher {}.", from.handle, publisher_id);
return Err(From::from("Rejecting anonymous subscription!"));
}
(Some(key), Some(ref token)) => match ValidatedToken::from_str(token, key) {
Ok(ref claims) => {
if let Some(joined) = publisher.join_state.get() {
if claims.may_join(&joined.room_id) {
janus_verb!("Allowing subscription from {:p} to publisher {}.", from.handle, publisher_id);
} else {
janus_warn!("Rejecting subscription from {:p} to publisher {}.", from.handle, publisher_id);
return Err(From::from("Rejecting subscription without permission!"));
}
} else {
janus_warn!("Cannot subscribe from {:p} to a publisher {} not in a room.", from.handle, publisher_id);
return Err(From::from("Cannot subscribe to a publisher not in a room."));
}
}
Err(e) => {
janus_warn!("Rejecting subscription from {:p} to publisher {}. Error: {}", from.handle, publisher_id, e);
return Err(From::from("Rejecting subscription with invalid token!"));
}
},
}

let jsep = json!({
"type": "offer",
"sdp": publisher.subscriber_offer.lock().unwrap().as_ref().unwrap()
});
switchboard.subscribe_to_user(from.clone(), publisher);
return Ok(MessageResponse::new(json!({}), jsep));
}
Ok(MessageResponse::msg(json!({})))
}

fn process_data(from: &Arc<Session>, whom: Option<UserId>, body: &str) -> MessageResult {
janus_huge!("Processing data message from {:p}: {:?}", from.handle, body);
Expand Down Expand Up @@ -616,9 +651,7 @@ fn process_message(from: &Arc<Session>, msg: MessageKind) -> MessageResult {
token,
} => process_join(from, room_id, user_id, subscribe, token),
MessageKind::Kick { room_id, user_id, token } => process_kick(from, room_id, user_id, token),
// process_subscribe doesn't check the JWT, we need to change the API to add room_id and token,
// comment it for now.
// MessageKind::Subscribe { what } => process_subscribe(from, &what),
MessageKind::Subscribe { what, token } => process_subscribe(from, what, token),
MessageKind::Block { whom } => process_block(from, whom),
MessageKind::Unblock { whom } => process_unblock(from, whom),
MessageKind::Data { whom, body } => process_data(from, whom, &body),
Expand Down
35 changes: 18 additions & 17 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub enum MessageKind {
Kick { room_id: RoomId, user_id: UserId, token: String },

/// Indicates that a client wishes to subscribe to traffic described by the given subscription specification.
// Subscribe { what: Subscription },
Subscribe { what: Subscription, token: Option<String> },

/// Indicates that a given user should be blocked from receiving your traffic, and that you should not
/// receive their traffic (superseding any subscriptions you have.)
Expand All @@ -88,7 +88,7 @@ pub enum MessageKind {
#[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize)]
#[serde(default)]
pub struct Subscription {
/// Whether to subscribe to server-wide notifications (e.g. user joins and leaves, room creates and destroys).
/// Whether to subscribe to server-wide notifications (e.g. user joins and leaves, someone blocked/unblocked you).
pub notifications: bool,

/// Whether to subscribe to data in the currently-joined room.
Expand Down Expand Up @@ -163,20 +163,21 @@ mod tests {
);
}

// #[test]
// fn parse_subscribe() {
// let json = r#"{"kind": "subscribe", "what": {"notifications": false, "data": true, "media": "steve"}}"#;
// let result: MessageKind = serde_json::from_str(json).unwrap();
// assert_eq!(
// result,
// MessageKind::Subscribe {
// what: Subscription {
// notifications: false,
// data: true,
// media: Some("steve".into())
// }
// }
// );
// }
#[test]
fn parse_subscribe() {
let json = r#"{"kind": "subscribe", "what": {"notifications": false, "data": true, "media": "steve"}}"#;
let result: MessageKind = serde_json::from_str(json).unwrap();
assert_eq!(
result,
MessageKind::Subscribe {
what: Subscription {
notifications: false,
data: true,
media: Some("steve".into())
},
token: None
}
);
}
}
}