Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix read loop infinite loop when EOF received #71

Merged
merged 4 commits into from
Nov 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,5 @@ env:
- PACKAGE="httpaf-async" DISTRO="ubuntu-16.04" OCAML_VERSION="4.06.0"
- PACKAGE="httpaf" DISTRO="ubuntu-16.04" OCAML_VERSION="4.04.2"
- PACKAGE="httpaf-async" DISTRO="ubuntu-16.04" OCAML_VERSION="4.04.2"
- PACKAGE="httpaf" DISTRO="alpine-3.5" OCAML_VERSION="4.03.0"
- PACKAGE="httpaf-async" DISTRO="alpine-3.5" OCAML_VERSION="4.03.0"
- PACKAGE="httpaf" DISTRO="debian-unstable" OCAML_VERSION="4.03.0"
- PACKAGE="httpaf-async" DISTRO="debian-unstable" OCAML_VERSION="4.03.0"
8 changes: 6 additions & 2 deletions async/httpaf_async.ml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ module Server = struct
read fd buffer
>>> begin function
| `Eof ->
Server_connection.shutdown_reader conn;
Buffer.get buffer ~f:(fun bigstring ~off ~len ->
Server_connection.read_eof conn bigstring ~off ~len)
|> ignore;
reader_thread ()
| `Ok _ ->
Buffer.get buffer ~f:(fun bigstring ~off ~len ->
Expand Down Expand Up @@ -170,7 +172,9 @@ module Client = struct
read fd buffer
>>> begin function
| `Eof ->
Client_connection.shutdown_reader conn;
Buffer.get buffer ~f:(fun bigstring ~off ~len ->
Client_connection.read_eof conn bigstring ~off ~len)
|> ignore;
reader_thread ()
| `Ok _ ->
Buffer.get buffer ~f:(fun bigstring ~off ~len ->
Expand Down
1 change: 0 additions & 1 deletion httpaf-async.descr

This file was deleted.

11 changes: 5 additions & 6 deletions httpaf-async.opam
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
opam-version: "1.2"
opam-version: "2.0"
name: "httpaf-async"
maintainer: "Spiros Eliopoulos <[email protected]>"
authors: [ "Spiros Eliopoulos <[email protected]>" ]
license: "BSD-3-clause"
homepage: "https://github.com/inhabitedtype/httpaf"
bug-reports: "https://github.com/inhabitedtype/httpaf/issues"
dev-repo: "https://github.com/inhabitedtype/httpaf.git"
dev-repo: "git+https://github.com/inhabitedtype/httpaf.git"
build: [
["jbuilder" "subst" "-p" name] {pinned}
["jbuilder" "build" "-p" name "-j" jobs]
]
build-test: [
["jbuilder" "runtest" "-p" name]
["jbuilder" "runtest" "-p" name] {with-test}
]
depends: [
"ocaml" {>= "4.03.0"}
"jbuilder" {build & >= "1.0+beta10"}
"angstrom-async" {>= "0.9.0"}
"faraday-async"
"async"
"httpaf"
]
available: [ ocaml-version >= "4.03.0" ]
synopsis: "Async support for http/af"
7 changes: 0 additions & 7 deletions httpaf.descr

This file was deleted.

22 changes: 14 additions & 8 deletions httpaf.opam
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
opam-version: "1.2"
opam-version: "2.0"
maintainer: "Spiros Eliopoulos <[email protected]>"
authors: [ "Spiros Eliopoulos <[email protected]>" ]
license: "BSD-3-clause"
homepage: "https://github.com/inhabitedtype/httpaf"
bug-reports: "https://github.com/inhabitedtype/httpaf/issues"
dev-repo: "https://github.com/inhabitedtype/httpaf.git"
dev-repo: "git+https://github.com/inhabitedtype/httpaf.git"
build: [
["jbuilder" "subst" "-p" name] {pinned}
["jbuilder" "build" "-p" name "-j" jobs]
]
build-test: [
["jbuilder" "runtest" "-p" name]
["jbuilder" "runtest" "-p" name] {with-test}
]
depends: [
"ocaml" {>= "4.03.0"}
"jbuilder" {build & >= "1.0+beta10"}
"alcotest" {test}
"alcotest" {with-test}
"angstrom" {>= "0.9.0"}
"faraday" {>= "0.5.0"}
"faraday" {>= "0.5.0"}
"result"
]
available: [ ocaml-version >= "4.03.0" ]
synopsis:
"A high-performance, memory-efficient, and scalable web server for OCaml."
description: """
http/af implements the HTTP 1.1 specification with respect to parsing,
serialization, and connection pipelining as a state machine that is agnostic to
the underlying IO mechanism, and is therefore portable across many platform.
It uses the Angstrom and Faraday libraries to implement the parsing and
serialization layers of the HTTP standard, hence the name."""
16 changes: 11 additions & 5 deletions lib/client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ module Oneshot = struct
;;

let shutdown_reader t =
Reader.close t.reader;
Reader.force_close t.reader;
begin match !(t.state) with
| Awaiting_response | Closed -> ()
| Received_response(_, response_body) ->
Expand Down Expand Up @@ -138,8 +138,8 @@ module Oneshot = struct
if not (Body.is_closed response_body)
then Reader.next t.reader
else begin
Reader.close t.reader;
Reader.next t.reader
Reader.force_close t.reader;
Reader.next t.reader
end
;;

Expand All @@ -155,12 +155,18 @@ module Oneshot = struct
| (`Read | `Close) as operation -> operation
;;

let read t bs ~off ~len =
let consumed = Reader.read t.reader bs ~off ~len in
let read_with_more t bs ~off ~len more =
let consumed = Reader.read_with_more t.reader bs ~off ~len more in
flush_response_body t;
consumed
;;

let read t bs ~off ~len =
read_with_more t bs ~off ~len Incomplete

let read_eof t bs ~off ~len =
read_with_more t bs ~off ~len Complete

let next_write_operation t =
flush_request_body t;
Writer.next t.writer
Expand Down
28 changes: 15 additions & 13 deletions lib/httpaf.mli
Original file line number Diff line number Diff line change
Expand Up @@ -733,18 +733,19 @@ module Server_connection : sig
returns a [`Read] value and additional input is available for the
connection to consume. *)

val read_eof : _ t -> Bigstring.t -> off:int -> len:int -> int
(** [read t bigstring ~off ~len] reads bytes of input from the provided range
of [bigstring] and returns the number of bytes consumed by the
connection. {!read} should be called after {!next_read_operation}
returns a [`Read] and an EOF has been received from the communication
channel. The connection will attempt to consume any buffered input and
then shutdown the HTTP parser for the connection. *)

val yield_reader : _ t -> (unit -> unit) -> unit
(** [yield_reader t continue] registers with the connection to call
[continue] when reading should resume. {!yield_reader} should be called
after {next_read_operation} returns a [`Yield] value. *)

val shutdown_reader : _ t -> unit
(** [shutdown_reader t] shuts down the read processor for the connection. All
subsequent calls to {!next_read_operations} will return [`Close].
{!shutdown_reader} should be called after {!next_read_operation} returns
a [`Read] value and there is no further input available for the
connection to consume. *)

val next_write_operation : _ t -> [
| `Write of Bigstring.t IOVec.t list
| `Yield
Expand Down Expand Up @@ -820,12 +821,13 @@ module Client_connection : sig
returns a [`Read] value and additional input is available for the
connection to consume. *)

val shutdown_reader : t -> unit
(** [shutdown_reader t] shuts down the read processor for the connection. All
subsequent calls to {!next_read_operations} will return [`Close].
{!shutdown_reader} should be called after {!next_read_operation} returns
a [`Read] value and there is no further input available for the
connection to consume. *)
val read_eof : t -> Bigstring.t -> off:int -> len:int -> int
(** [read t bigstring ~off ~len] reads bytes of input from the provided range
of [bigstring] and returns the number of bytes consumed by the
connection. {!read} should be called after {!next_read_operation}
returns a [`Read] and an EOF has been received from the communication
channel. The connection will attempt to consume any buffered input and
then shutdown the HTTP parser for the connection. *)

val next_write_operation : t -> [
| `Write of Bigstring.t IOVec.t list
Expand Down
2 changes: 1 addition & 1 deletion lib/jbuild
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
((name httpaf)
(public_name httpaf)
(libraries
(angstrom faraday result))
(angstrom faraday bigstringaf result))
(flags (:standard -safe-string))))
29 changes: 19 additions & 10 deletions lib/parse.ml
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,6 @@ module Reader = struct
create parser
;;

let close t =
t.closed <- true

let is_closed t =
t.closed
Expand Down Expand Up @@ -291,14 +289,25 @@ module Reader = struct
| _ -> assert false
;;

let rec read t bs ~off ~len =
match t.parse_state with
| Fail _ -> 0
| Done ->
start t (AU.parse t.parser);
read t bs ~off ~len;
| Partial continue ->
transition t (continue bs Incomplete ~off ~len)
let rec read_with_more t bs ~off ~len more =
let consumed =
match t.parse_state with
| Fail _ -> 0
| Done ->
start t (AU.parse t.parser);
read_with_more t bs ~off ~len more;
| Partial continue ->
transition t (continue bs more ~off ~len)
in
begin match more with
| Complete -> t.closed <- true;
| Incomplete -> ()
end;
consumed;
;;

let force_close t =
ignore (read_with_more t Bigstringaf.empty ~off:0 ~len:0 Complete : int);
;;

let next t =
Expand Down
12 changes: 9 additions & 3 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ let create ?(config=Config.default) ?(error_handler=default_error_handler) reque
let is_closed t = Reader.is_closed t.reader && Writer.is_closed t.writer

let shutdown_reader t =
Reader.close t.reader;
Reader.force_close t.reader;
if is_active t
then Reqd.close_request_body (current_reqd_exn t)
else wakeup_reader t
Expand Down Expand Up @@ -241,13 +241,19 @@ let next_read_operation t =
| `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close
| (`Read | `Yield | `Close) as operation -> operation

let read t bs ~off ~len =
let consumed = Reader.read t.reader bs ~off ~len in
let read_with_more t bs ~off ~len more =
let consumed = Reader.read_with_more t.reader bs ~off ~len more in
if is_active t then
Reqd.flush_request_body (current_reqd_exn t);
consumed
;;

let read t bs ~off ~len =
read_with_more t bs ~off ~len Incomplete

let read_eof t bs ~off ~len =
read_with_more t bs ~off ~len Complete

let yield_reader t k =
on_wakeup_reader t k
;;
Expand Down
2 changes: 1 addition & 1 deletion lib_test/jbuild
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(jbuild_version 1)

(executables
((libraries (httpaf alcotest))
((libraries (bigstringaf httpaf alcotest))
(modules (test_httpaf test_httpaf_server test_httpaf_client simulator))
(names (test_httpaf test_httpaf_server test_httpaf_client))))

Expand Down
12 changes: 8 additions & 4 deletions lib_test/simulator.ml
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ let test_server ~input ~output ~handler () =
else Bigstring.sub ~off:result input, reads'
| `Read, [] ->
debug " server iloop: eof";
Server_connection.shutdown_reader conn;
let input_len = Bigstring.length input in
ignore (Server_connection.read_eof conn input ~off:0 ~len:input_len : int);
bigstring_empty, []
| _ , [] ->
debug " server iloop: eof";
Server_connection.shutdown_reader conn;
let input_len = Bigstring.length input in
ignore (Server_connection.read_eof conn input ~off:0 ~len:input_len : int);
bigstring_empty, []
| `Close , _ ->
debug " server iloop: close(ok)"; input, []
Expand Down Expand Up @@ -167,11 +169,13 @@ let test_client ~request ~request_body_writes ~response_stream () =
else Bigstring.sub ~off:result input, reads'
| `Read, [] ->
debug " client iloop: eof";
Client_connection.shutdown_reader conn;
let input_len = Bigstring.length input in
ignore (Client_connection.read_eof conn input ~off:0 ~len:input_len : int);
input, []
| _ , [] ->
debug " client iloop: eof";
Client_connection.shutdown_reader conn;
let input_len = Bigstring.length input in
ignore (Client_connection.read_eof conn input ~off:0 ~len:input_len : int);
input, []
| `Close , _ ->
debug " client iloop: close(ok)";
Expand Down
57 changes: 54 additions & 3 deletions lib_test/test_httpaf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,60 @@ module IOVec = struct
; "shiftv raises ", `Quick, test_shiftv_raises ]
end

module Server_connection = struct
include Server_connection

module Read_operation = struct
type t = [ `Read | `Yield | `Close ]

let pp_hum fmt t =
let str =
match t with
| `Read -> "Read"
| `Yield -> "Yield"
| `Close -> "Close"
in
Format.pp_print_string fmt str
;;
end

let default_request_handler reqd =
Reqd.respond_with_string reqd (Response.create `OK) ""
;;

let test_initial_reader_state () =
let t = create default_request_handler in
Alcotest.(check (of_pp Read_operation.pp_hum)) "A new reader wants input"
`Read (next_read_operation t);
;;

let test_reader_is_closed_after_eof () =
let t = create default_request_handler in
let c = read_eof t Bigstringaf.empty ~off:0 ~len:0 in
Alcotest.(check int) "read_eof with no input returns 0" 0 c;
Alcotest.(check (of_pp Read_operation.pp_hum)) "Shutting down a reader closes it"
`Close (next_read_operation t);

let t = create default_request_handler in
let c = read t Bigstringaf.empty ~off:0 ~len:0 in
Alcotest.(check int) "read with no input returns 0" 0 c;
let c = read_eof t Bigstringaf.empty ~off:0 ~len:0; in
Alcotest.(check int) "read_eof with no input returns 0" 0 c;
Alcotest.(check (of_pp Read_operation.pp_hum)) "Shutting down a reader closes it"
`Close (next_read_operation t);
;;

let tests =
[ "initial reader state" , `Quick, test_initial_reader_state
; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof
]

end

let () =
Alcotest.run "httpaf unit tests"
[ "version" , Version.tests
; "method" , Method.tests
; "iovec" , IOVec.tests
[ "version" , Version.tests
; "method" , Method.tests
; "iovec" , IOVec.tests
; "server_connection", Server_connection.tests
]