diff --git a/src/grpc/iot_config/hpr_route_stream_worker.erl b/src/grpc/iot_config/hpr_route_stream_worker.erl index 03f15d21..de4f4547 100644 --- a/src/grpc/iot_config/hpr_route_stream_worker.erl +++ b/src/grpc/iot_config/hpr_route_stream_worker.erl @@ -99,7 +99,6 @@ devaddr_added := non_neg_integer() }. - -record(state, { stream :: grpcbox_client:stream() | undefined, conn_backoff :: backoff:backoff() diff --git a/src/grpc/packet_router/hpr_packet_router_service.erl b/src/grpc/packet_router/hpr_packet_router_service.erl index 3af1a38c..32167c62 100644 --- a/src/grpc/packet_router/hpr_packet_router_service.erl +++ b/src/grpc/packet_router/hpr_packet_router_service.erl @@ -17,13 +17,20 @@ -define(REG_KEY(Gateway), {?MODULE, Gateway}). -define(SESSION_TIMER, timer:minutes(30)). -define(SESSION_RESET, session_reset). +-ifdef(TEST). +-define(SESSION_OFFER_RES_TIMEOUT, timer:seconds(2)). +-else. +-define(SESSION_OFFER_RES_TIMEOUT, timer:minutes(5)). +-endif. +-define(SESSION_OFFER_TIMEOUT, session_offer_timeout). -record(handler_state, { started :: non_neg_integer(), pubkey_bin :: undefined | binary(), nonce :: undefined | binary(), session_key :: undefined | binary(), - session_timer :: undefined | reference() + session_timer :: undefined | reference(), + session_offer_timer :: undefined | reference() }). -spec init(atom(), grpcbox_stream:t()) -> grpcbox_stream:t(). @@ -71,6 +78,23 @@ handle_info({give_away, NewPid, PubKeyBin}, StreamState) -> handle_info(?SESSION_RESET, StreamState0) -> {EnvDown, StreamState1} = create_session_offer(StreamState0), grpcbox_stream:send(false, EnvDown, StreamState1); +handle_info({?SESSION_OFFER_TIMEOUT, OldSessionKey}, StreamState) -> + lager:debug("got session offer timeout ~s", [hpr_utils:bin_to_hex_string(OldSessionKey)]), + HandlerState = grpcbox_stream:stream_handler_state(StreamState), + CurrSessionKey = HandlerState#handler_state.session_key, + case CurrSessionKey =/= undefined andalso CurrSessionKey =/= OldSessionKey of + false -> + lager:debug("session offer timeout triggered but session_key is set and new"), + StreamState; + true -> + lager:warning( + "session offer ~s timeout triggered, closing stream", [ + hpr_utils:bin_to_hex_string(HandlerState#handler_state.nonce) + ] + ), + EnvDown = hpr_envelope_down:new(undefined), + grpcbox_stream:send(true, EnvDown, StreamState) + end; handle_info(_Msg, StreamState) -> StreamState. @@ -191,6 +215,9 @@ handle_session_init(SessionInit, StreamState) -> HandlerState0#handler_state.session_timer ) }, + ok = hpr_utils:cancel_timer( + HandlerState0#handler_state.session_offer_timer + ), {ok, grpcbox_stream:stream_handler_state(StreamState, HandlerState1)} end end. @@ -200,9 +227,14 @@ handle_session_init(SessionInit, StreamState) -> create_session_offer(StreamState0) -> HandlerState0 = grpcbox_stream:stream_handler_state(StreamState0), Nonce = crypto:strong_rand_bytes(32), + Timer = erlang:send_after( + ?SESSION_OFFER_RES_TIMEOUT, + self(), + {?SESSION_OFFER_TIMEOUT, HandlerState0#handler_state.session_key} + ), EnvDown = hpr_envelope_down:new(hpr_session_offer:new(Nonce)), StreamState1 = grpcbox_stream:stream_handler_state( - StreamState0, HandlerState0#handler_state{nonce = Nonce} + StreamState0, HandlerState0#handler_state{nonce = Nonce, session_offer_timer = Timer} ), lager:debug("session offer ~s", [ hpr_utils:bin_to_hex_string(Nonce) @@ -210,10 +242,8 @@ create_session_offer(StreamState0) -> {EnvDown, StreamState1}. -spec schedule_session_reset(OldTimer :: undefined | reference()) -> reference(). -schedule_session_reset(OldTimer) when is_reference(OldTimer) -> - _ = erlang:cancel_timer(OldTimer), - erlang:send_after(?SESSION_TIMER, self(), ?SESSION_RESET); -schedule_session_reset(_OldTimer) -> +schedule_session_reset(OldTimer) -> + _ = hpr_utils:cancel_timer(OldTimer), erlang:send_after(?SESSION_TIMER, self(), ?SESSION_RESET). %% ------------------------------------------------------------------ diff --git a/src/hpr_routing_cache.erl b/src/hpr_routing_cache.erl index 0f207c89..ba3784cb 100644 --- a/src/hpr_routing_cache.erl +++ b/src/hpr_routing_cache.erl @@ -174,15 +174,11 @@ terminate(_Reason, _State = #state{}) -> do_crawl_routing(Window) -> Now = erlang:system_time(millisecond) - Window, %% MS = ets:fun2ms(fun(#routing_entry{time = Time}) when Time < 1234 -> true end). - MS = [{ - {routing_entry, - '_', - '$1', - '_', - '_', - '_' - }, - [{'<', '$1', Now}], - [true] - }], + MS = [ + { + {routing_entry, '_', '$1', '_', '_', '_'}, + [{'<', '$1', Now}], + [true] + } + ], ets:select_delete(?ROUTING_ETS, MS). diff --git a/src/hpr_utils.erl b/src/hpr_utils.erl index dcd60d07..6d7afa41 100644 --- a/src/hpr_utils.erl +++ b/src/hpr_utils.erl @@ -14,6 +14,7 @@ -endif. -export([ + cancel_timer/1, gateway_name/1, gateway_mac/1, int_to_hex_string/1, @@ -37,53 +38,12 @@ -type trace() :: packet_gateway | stream_gateway | devaddr | app_eui | dev_eui. --spec load_key(KeyFileName :: string()) -> ok. -load_key(KeyFileName) -> - {PubKey, SigFun} = - Key = - case libp2p_crypto:load_keys(KeyFileName) of - {ok, #{secret := PrivKey, public := PubKey0}} -> - {PubKey0, libp2p_crypto:mk_sig_fun(PrivKey)}; - {error, enoent} -> - KeyMap = - #{secret := PrivKey, public := PubKey0} = libp2p_crypto:generate_keys( - ed25519 - ), - ok = libp2p_crypto:save_keys(KeyMap, KeyFileName), - {PubKey0, libp2p_crypto:mk_sig_fun(PrivKey)} - end, - - PubKeyBin = libp2p_crypto:pubkey_to_bin(PubKey), - B58 = libp2p_crypto:bin_to_b58(PubKeyBin), - ok = persistent_term:put(?HPR_PUBKEY_BIN, PubKeyBin), - - %% Keep as binary for http protocol jsx encoding/decoding - SenderNSID = - case application:get_env(hpr, http_roaming_sender_nsid, erlang:list_to_binary(B58)) of - <<"">> -> erlang:list_to_binary(B58); - Val -> Val - end, - ok = persistent_term:put(?HPR_SENDER_NSID, SenderNSID), - - ok = persistent_term:put(?HPR_B58, B58), - ok = persistent_term:put(?HPR_SIG_FUN, SigFun), - ok = persistent_term:put(?HPR_KEY, Key). - --spec pubkey_bin() -> libp2p_crypto:pubkey_bin(). -pubkey_bin() -> - persistent_term:get(?HPR_PUBKEY_BIN, undefined). - --spec sig_fun() -> libp2p_crypto:sig_fun(). -sig_fun() -> - persistent_term:get(?HPR_SIG_FUN, undefined). - --spec sender_nsid() -> string(). -sender_nsid() -> - persistent_term:get(?HPR_SENDER_NSID, undefined). - --spec b58() -> binary(). -b58() -> - persistent_term:get(?HPR_B58, undefined). +-spec cancel_timer(Timer :: reference() | any()) -> ok. +cancel_timer(Timer) when is_reference(Timer) -> + _ = erlang:cancel_timer(Timer), + ok; +cancel_timer(_Timer) -> + ok. -spec gateway_name(PubKeyBin :: libp2p_crypto:pubkey_bin() | string()) -> string(). gateway_name(PubKeyBin) when is_binary(PubKeyBin) -> @@ -229,6 +189,54 @@ get_env_int(Key, Default) -> I -> I end. +-spec load_key(KeyFileName :: string()) -> ok. +load_key(KeyFileName) -> + {PubKey, SigFun} = + Key = + case libp2p_crypto:load_keys(KeyFileName) of + {ok, #{secret := PrivKey, public := PubKey0}} -> + {PubKey0, libp2p_crypto:mk_sig_fun(PrivKey)}; + {error, enoent} -> + KeyMap = + #{secret := PrivKey, public := PubKey0} = libp2p_crypto:generate_keys( + ed25519 + ), + ok = libp2p_crypto:save_keys(KeyMap, KeyFileName), + {PubKey0, libp2p_crypto:mk_sig_fun(PrivKey)} + end, + + PubKeyBin = libp2p_crypto:pubkey_to_bin(PubKey), + B58 = libp2p_crypto:bin_to_b58(PubKeyBin), + ok = persistent_term:put(?HPR_PUBKEY_BIN, PubKeyBin), + + %% Keep as binary for http protocol jsx encoding/decoding + SenderNSID = + case application:get_env(hpr, http_roaming_sender_nsid, erlang:list_to_binary(B58)) of + <<"">> -> erlang:list_to_binary(B58); + Val -> Val + end, + ok = persistent_term:put(?HPR_SENDER_NSID, SenderNSID), + + ok = persistent_term:put(?HPR_B58, B58), + ok = persistent_term:put(?HPR_SIG_FUN, SigFun), + ok = persistent_term:put(?HPR_KEY, Key). + +-spec pubkey_bin() -> libp2p_crypto:pubkey_bin(). +pubkey_bin() -> + persistent_term:get(?HPR_PUBKEY_BIN, undefined). + +-spec sig_fun() -> libp2p_crypto:sig_fun(). +sig_fun() -> + persistent_term:get(?HPR_SIG_FUN, undefined). + +-spec sender_nsid() -> string(). +sender_nsid() -> + persistent_term:get(?HPR_SENDER_NSID, undefined). + +-spec b58() -> binary(). +b58() -> + persistent_term:get(?HPR_B58, undefined). + %% ------------------------------------------------------------------ %% Internal Function Definitions %% ------------------------------------------------------------------ diff --git a/test/hpr_packet_router_service_SUITE.erl b/test/hpr_packet_router_service_SUITE.erl index 0fd69cb8..20996709 100644 --- a/test/hpr_packet_router_service_SUITE.erl +++ b/test/hpr_packet_router_service_SUITE.erl @@ -7,7 +7,8 @@ ]). -export([ - session_test/1 + session_test/1, + timeout_test/1 ]). -include_lib("eunit/include/eunit.hrl"). @@ -24,7 +25,8 @@ %%-------------------------------------------------------------------- all() -> [ - session_test + session_test, + timeout_test ]. %%-------------------------------------------------------------------- @@ -137,6 +139,102 @@ session_test(_Config) -> ok. +timeout_test(_Config) -> + RouteID = "8d502f32-4d58-4746-965e-8c7dfdcfc625", + Route = hpr_route:test_new(#{ + id => RouteID, + net_id => 0, + oui => 4020, + server => #{ + host => "127.0.0.1", + port => 8082, + protocol => {packet_router, #{}} + }, + max_copies => 2 + }), + EUIPairs = [ + hpr_eui_pair:test_new(#{ + route_id => RouteID, app_eui => 802041902051071031, dev_eui => 8942655256770396549 + }) + ], + DevAddrRanges = [ + hpr_devaddr_range:test_new(#{ + route_id => RouteID, start_addr => 16#00000000, end_addr => 16#00000010 + }) + ], + + %% Normal test + {ok, GatewayPid1} = hpr_test_gateway:start(#{ + forward => self(), + route => Route, + eui_pairs => EUIPairs, + devaddr_ranges => DevAddrRanges, + ignore_session_offer => false + }), + + {ok, _} = hpr_test_gateway:receive_session_init(GatewayPid1, timer:seconds(1)), + {error, timeout} = hpr_test_gateway:receive_stream_down(GatewayPid1), + + SessionKey1 = hpr_test_gateway:session_key(GatewayPid1), + + PubKeyBin1 = hpr_test_gateway:pubkey_bin(GatewayPid1), + {ok, Pid1} = hpr_packet_router_service:locate(PubKeyBin1), + + %% Checking that session keys are the same + ?assertEqual(SessionKey1, session_key_from_stream(Pid1)), + + gen_server:stop(GatewayPid1), + + %% Testing with session key offer disabled, should result in closing of stream + {ok, GatewayPid2} = hpr_test_gateway:start(#{ + forward => self(), + route => Route, + eui_pairs => EUIPairs, + devaddr_ranges => DevAddrRanges, + ignore_session_offer => true + }), + + {error, timeout} = hpr_test_gateway:receive_session_init(GatewayPid2, timer:seconds(3)), + ok = hpr_test_gateway:receive_stream_down(GatewayPid2), + + gen_server:stop(GatewayPid2), + + %% Normal test with session reset + {ok, GatewayPid3} = hpr_test_gateway:start(#{ + forward => self(), + route => Route, + eui_pairs => EUIPairs, + devaddr_ranges => DevAddrRanges, + ignore_session_offer => false + }), + + {ok, _} = hpr_test_gateway:receive_session_init(GatewayPid3, timer:seconds(1)), + {error, timeout} = hpr_test_gateway:receive_stream_down(GatewayPid3), + + SessionKey3 = hpr_test_gateway:session_key(GatewayPid3), + PubKeyBin3 = hpr_test_gateway:pubkey_bin(GatewayPid3), + {ok, Pid3} = hpr_packet_router_service:locate(PubKeyBin3), + + %% Checking that session keys are the same + ?assertEqual(SessionKey3, session_key_from_stream(Pid3)), + Pid3 ! session_reset, + + {ok, _} = hpr_test_gateway:receive_session_init(GatewayPid3, timer:seconds(1)), + {error, timeout} = hpr_test_gateway:receive_stream_down(GatewayPid3), + + SessionKey4 = hpr_test_gateway:session_key(GatewayPid3), + ?assert(SessionKey3 =/= SessionKey4), + ?assertEqual(SessionKey4, session_key_from_stream(Pid3)), + + ok. + %% =================================================================== %% Helpers %% =================================================================== + +session_key_from_stream(Pid) -> + State = sys:get_state(Pid), + StreamState = erlang:element(2, State), + CallbackState = erlang:element(20, StreamState), + HandlerState = erlang:element(3, CallbackState), + erlang:element(5, HandlerState). diff --git a/test/hpr_protocol_router_SUITE.erl b/test/hpr_protocol_router_SUITE.erl index fcb91607..ab0cf43a 100644 --- a/test/hpr_protocol_router_SUITE.erl +++ b/test/hpr_protocol_router_SUITE.erl @@ -320,20 +320,20 @@ gateway_disconnect_test(_Config) -> _ConnPid = h2_stream_set:connection(StreamSet), ok = gen_server:stop(GatewayPid), - ok = - receive - {hpr_test_gateway, GatewayPid, - {terminate, #{channel := GatewayStreamSet, stream_pid := GatewayStreamPid}}} -> - GatewayConnPid = h2_stream_set:connection(GatewayStreamSet), - ok = test_utils:wait_until( - fun() -> - false == erlang:is_process_alive(GatewayConnPid) andalso - false == erlang:is_process_alive(GatewayStreamPid) andalso - false == erlang:is_process_alive(GatewayPid) - end - ) - after timer:seconds(3) -> ct:fail(no_terminate_rcvd) - end, + + case hpr_test_gateway:receive_terminate(GatewayPid) of + {error, timeout} -> + ct:fail(no_terminate_rcvd); + {ok, #{channel := GatewayStreamSet, stream_pid := GatewayStreamPid}} -> + GatewayConnPid = h2_stream_set:connection(GatewayStreamSet), + ok = test_utils:wait_until( + fun() -> + false == erlang:is_process_alive(GatewayConnPid) andalso + false == erlang:is_process_alive(GatewayStreamPid) andalso + false == erlang:is_process_alive(GatewayPid) + end + ) + end, ok = test_utils:wait_until( fun() -> diff --git a/test/hpr_test_gateway.erl b/test/hpr_test_gateway.erl index cbf1fdfe..164874ba 100644 --- a/test/hpr_test_gateway.erl +++ b/test/hpr_test_gateway.erl @@ -8,10 +8,14 @@ -export([ start/1, pubkey_bin/1, + session_key/1, send_packet/2, receive_send_packet/1, receive_env_down/1, - receive_register/1 + receive_register/1, + receive_session_init/2, + receive_stream_down/1, + receive_terminate/1 ]). %% ------------------------------------------------------------------ @@ -31,6 +35,8 @@ -define(RCV_TIMEOUT, 100). -define(SEND_PACKET, send_packet). -define(REGISTER, register). +-define(SESSION_INIT, session_init). +-define(STREAM_DOWN, stream_down). -record(state, { forward :: pid(), @@ -40,7 +46,8 @@ pubkey_bin :: libp2p_crypto:pubkey_bin(), sig_fun :: libp2p_crypto:sig_fun(), stream :: grpcbox_client:stream(), - session_key :: undefined | {libp2p_crypto:pubkey_bin(), libp2p_crypto:sig_fun()} + session_key :: undefined | {libp2p_crypto:pubkey_bin(), libp2p_crypto:sig_fun()}, + ignore_session_offer = false :: boolean() }). -type state() :: #state{}. @@ -57,6 +64,10 @@ start(Args) -> pubkey_bin(Pid) -> gen_server:call(Pid, pubkey_bin). +-spec session_key(Pid :: pid()) -> binary(). +session_key(Pid) -> + gen_server:call(Pid, session_key). + -spec send_packet(Pid :: pid(), Args :: map()) -> ok. send_packet(Pid, Args) -> gen_server:cast(Pid, {?SEND_PACKET, Args}). @@ -82,7 +93,7 @@ receive_env_down(GatewayPid) -> end. -spec receive_register(GatewayPid :: pid()) -> - {ok, EnvDown :: hpr_envelope_up:envelope()} | {error, timeout}. + {ok, EnvUp :: hpr_envelope_up:envelope()} | {error, timeout}. receive_register(GatewayPid) -> receive {?MODULE, GatewayPid, {?REGISTER, EnvUp}} -> @@ -91,6 +102,34 @@ receive_register(GatewayPid) -> {error, timeout} end. +-spec receive_session_init(GatewayPid :: pid(), Timeout :: non_neg_integer()) -> + {ok, EnvUp :: hpr_envelope_up:envelope()} | {error, timeout}. +receive_session_init(GatewayPid, Timeout) -> + receive + {?MODULE, GatewayPid, {?SESSION_INIT, EnvUp}} -> + {ok, EnvUp} + after Timeout -> + {error, timeout} + end. + +-spec receive_stream_down(GatewayPid :: pid()) -> ok | {error, timeout}. +receive_stream_down(GatewayPid) -> + receive + {?MODULE, GatewayPid, ?STREAM_DOWN} -> + ok + after timer:seconds(2) -> + {error, timeout} + end. + +-spec receive_terminate(GatewayPid :: pid()) -> {ok, any()} | {error, timeout}. +receive_terminate(GatewayPid) -> + receive + {?MODULE, GatewayPid, {terminate, Stream}} -> + {ok, Stream} + after timer:seconds(2) -> + {error, timeout} + end. + %% ------------------------------------------------------------------ %%% gen_server Function Definitions %% ------------------------------------------------------------------ @@ -110,11 +149,14 @@ init( eui_pairs = EUIPairs, devaddr_ranges = DevAddrRanges, pubkey_bin = libp2p_crypto:pubkey_to_bin(PubKey), - sig_fun = libp2p_crypto:mk_sig_fun(PrivKey) + sig_fun = libp2p_crypto:mk_sig_fun(PrivKey), + ignore_session_offer = maps:get(ignore_session_offer, Args, false) }}. handle_call(pubkey_bin, _From, #state{pubkey_bin = PubKeyBin} = State) -> {reply, PubKeyBin, State}; +handle_call(session_key, _From, #state{session_key = {SessionKey, _}} = State) -> + {reply, SessionKey, State}; handle_call(_Msg, _From, State) -> lager:debug("unknown call ~p", [_Msg]), {reply, ok, State}. @@ -199,50 +241,71 @@ handle_info(?CONNECT, #state{forward = Pid, pubkey_bin = PubKeyBin, sig_fun = Si EnvUp = hpr_envelope_up:new(SignedReg), ok = grpcbox_client:send(Stream, EnvUp), Pid ! {?MODULE, self(), {?REGISTER, EnvUp}}, - lager:debug("connected and registered"), + lager:debug("connected and registering"), {noreply, State#state{stream = Stream}} end; %% GRPC stream callbacks handle_info( {data, _StreamID, EnvDown}, - #state{forward = Pid, pubkey_bin = Gateway, sig_fun = SigFun, stream = Stream} = State + #state{ + forward = Pid, + pubkey_bin = Gateway, + sig_fun = SigFun, + stream = Stream, + ignore_session_offer = IgnoreSessionOffer + } = State ) -> lager:debug("got EnvDown ~p", [EnvDown]), case hpr_envelope_down:data(EnvDown) of + undefined -> + {noreply, State}; {packet, _Packet} -> Pid ! {?MODULE, self(), {data, EnvDown}}, {noreply, State}; {session_offer, SessionOffer} -> - Nonce = hpr_session_offer:nonce(SessionOffer), - #{public := PubKey, secret := PrivKey} = libp2p_crypto:generate_keys(ed25519), - SessionKey = libp2p_crypto:pubkey_to_bin(PubKey), - SessionInit = hpr_session_init:test_new(Gateway, Nonce, SessionKey), - SignedSessionInit = hpr_session_init:sign(SessionInit, SigFun), - EnvUp = hpr_envelope_up:new(SignedSessionInit), - ok = grpcbox_client:send(Stream, EnvUp), - Pid ! {?MODULE, self(), {session_init, EnvUp}}, - lager:debug("session initialized"), - {noreply, State#state{session_key = {SessionKey, libp2p_crypto:mk_sig_fun(PrivKey)}}} + case IgnoreSessionOffer of + true -> + lager:debug("session offer ignored"), + {noreply, State}; + false -> + Nonce = hpr_session_offer:nonce(SessionOffer), + #{public := PubKey, secret := PrivKey} = libp2p_crypto:generate_keys(ed25519), + SessionKey = libp2p_crypto:pubkey_to_bin(PubKey), + SessionInit = hpr_session_init:test_new(Gateway, Nonce, SessionKey), + SignedSessionInit = hpr_session_init:sign(SessionInit, SigFun), + EnvUp = hpr_envelope_up:new(SignedSessionInit), + ok = grpcbox_client:send(Stream, EnvUp), + Pid ! {?MODULE, self(), {?SESSION_INIT, EnvUp}}, + lager:debug("session initialized"), + {noreply, State#state{ + session_key = {SessionKey, libp2p_crypto:mk_sig_fun(PrivKey)} + }} + end end; -handle_info( - {'DOWN', Ref, process, Pid, _Reason}, - #state{stream = #{stream_pid := Pid, monitor_ref := Ref}} = State -) -> - lager:debug("test gateway stream went down"), - {noreply, State#state{stream = undefined}}; handle_info({headers, _StreamID, _Headers}, State) -> + lager:debug("test gateway got headers ~p for ~w", [_Headers, _StreamID]), {noreply, State}; handle_info({trailers, _StreamID, _Trailers}, State) -> + lager:debug("test gateway got trailers ~p for ~w", [_Trailers, _StreamID]), {noreply, State}; +handle_info({eos, StreamID}, #state{forward = ForwardPid} = State) -> + lager:debug("test gateway got eos for ~w", [StreamID]), + ForwardPid ! {?MODULE, self(), ?STREAM_DOWN}, + {noreply, State#state{stream = undefined}}; handle_info(_Msg, State) -> lager:debug("unknown info ~p", [_Msg]), {noreply, State}. +terminate(_Reason, #state{forward = Pid, pubkey_bin = PubKeyBin, stream = undefined}) -> + ok = grpcbox_channel:stop(PubKeyBin), + lager:debug("terminate ~p", [_Reason]), + Pid ! {?MODULE, self(), {terminate, undefined}}, + ok; terminate(_Reason, #state{forward = Pid, pubkey_bin = PubKeyBin, stream = Stream}) -> ok = grpcbox_client:close_send(Stream), ok = grpcbox_channel:stop(PubKeyBin), - Pid ! {?MODULE, self(), {terminate, Stream}}, lager:debug("terminate ~p", [_Reason]), + Pid ! {?MODULE, self(), {terminate, Stream}}, ok. %% ------------------------------------------------------------------