The goal of SSEparser is to provide robust functionality to parse Server-Sent Events and to build on top of it.
You can install SEEparser
from CRAN like so:
install.packages("SSEparser")
Alternatively, you can install the development version like so:
pak::pak("calderonsamuel/SSEparser")
The parse_sse()
function takes a string containing a server-sent event
and converts it to a R list.
library(SSEparser)
event <- "data: test\nevent: message\nid: 123\n\n"
parse_sse(event)
#> [[1]]
#> [[1]]$data
#> [1] "test"
#>
#> [[1]]$event
#> [1] "message"
#>
#> [[1]]$id
#> [1] "123"
Comments are usually received in a line starting with a colon. They are not parsed.
with_comment <- "data: test\n: comment\nevent: example\n\n"
parse_sse(with_comment)
#> [[1]]
#> [[1]]$data
#> [1] "test"
#>
#> [[1]]$event
#> [1] "example"
parse_sse()
wraps the SSEparser
R6 class, which is also exported to
be used with real-time streaming data. The following code handles a
request with MIME type “text/event-stream”.
parser <- SSEparser$new()
response <- httr2::request("https://postman-echo.com/server-events/3") %>%
httr2::req_body_json(data = list(
event = "message",
request = "POST"
)) %>%
httr2::req_perform_stream(callback = \(x) {
event <- rawToChar(x)
parser$parse_sse(event)
TRUE
})
str(parser$events)
#> List of 3
#> $ :List of 3
#> ..$ event: chr "error"
#> ..$ data : chr "{\"event\":\"message\",\"request\":\"POST\"}"
#> ..$ id : chr "1"
#> $ :List of 3
#> ..$ event: chr "notification"
#> ..$ data : chr "{\"event\":\"message\",\"request\":\"POST\"}"
#> ..$ id : chr "2"
#> $ :List of 3
#> ..$ event: chr "message"
#> ..$ data : chr "{\"event\":\"message\",\"request\":\"POST\"}"
#> ..$ id : chr "3"
Following the previous example, it should be useful to parse the content
of every data
field to be also an R list instead of a JSON string. For
that, we can create a new R6 class which inherits from SSEparser
. We
just need to overwrite the append_parsed_sse()
method.
CustomParser <- R6::R6Class(
classname = "CustomParser",
inherit = SSEparser,
public = list(
initialize = function() {
super$initialize()
},
append_parsed_sse = function(parsed_event) {
parsed_event$data <- jsonlite::fromJSON(parsed_event$data)
self$events = c(self$events, list(parsed_event))
invisible(self)
}
)
)
Notice that the only thing we are modifying is the parsing of the data
field, not the parsing of the event itself. This is the original method
from SSEparser
:
SSEparser$public_methods$append_parsed_sse
#> function (parsed_event)
#> {
#> self$events <- c(self$events, list(parsed_event))
#> invisible(self)
#> }
#> <bytecode: 0x00000147d67ed9b8>
#> <environment: namespace:SSEparser>
CustomParser
uses jsonlite::fromJSON()
to parse the data field of
every chunk in the event stream. We can now use our custom class with
the previous request1.
parser <- CustomParser$new()
response <- httr2::request("https://postman-echo.com/server-events/3") %>%
httr2::req_body_json(data = list(
event = "message",
request = "POST"
)) %>%
httr2::req_perform_stream(callback = \(x) {
event <- rawToChar(x)
parser$parse_sse(event)
TRUE
})
str(parser$events)
#> List of 3
#> $ :List of 3
#> ..$ event: chr "ping"
#> ..$ data :List of 2
#> .. ..$ event : chr "message"
#> .. ..$ request: chr "POST"
#> ..$ id : chr "1"
#> $ :List of 3
#> ..$ event: chr "message"
#> ..$ data :List of 2
#> .. ..$ event : chr "message"
#> .. ..$ request: chr "POST"
#> ..$ id : chr "2"
#> $ :List of 3
#> ..$ event: chr "info"
#> ..$ data :List of 2
#> .. ..$ event : chr "message"
#> .. ..$ request: chr "POST"
#> ..$ id : chr "3"
Now instead of a JSON string we can have an R list in the data field while the stream is still in process.
Footnotes
-
This endpoint returns random event field names for each chunk in every request, so the response will not be exactly the same. ↩