Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow channel handler to control adjust_window message sending #8775

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions lib/ssh/src/ssh_client_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -496,14 +496,12 @@ handle_info({ssh_cm, ConnectionManager, {closed, ChannelId}},
(catch ssh_connection:close(ConnectionManager, ChannelId)),
{stop, normal, State#state{close_sent = true}};

handle_info({ssh_cm, _, _} = Msg, #state{channel_cb = Module,
channel_state = ChannelState0} = State) ->
handle_info({ssh_cm, _, _} = Msg, #state{channel_cb = Module,
channel_state = ChannelState0} = State) ->
try Module:handle_ssh_msg(Msg, ChannelState0) of
{ok, ChannelState} ->
adjust_window(Msg),
{noreply, State#state{channel_state = ChannelState}};
{ok, ChannelState, Timeout} ->
adjust_window(Msg),
{noreply, State#state{channel_state = ChannelState}, Timeout};
{stop, ChannelId, ChannelState} ->
do_the_close(Msg, ChannelId, State#state{channel_state = ChannelState})
Expand Down Expand Up @@ -626,13 +624,6 @@ handle_cb_result({stop, Reason, Reply, ChannelState}, State) ->
handle_cb_result({stop, Reason, ChannelState}, State) ->
{stop, Reason, State#state{channel_state = ChannelState}}.

adjust_window({ssh_cm, ConnectionManager,
{data, ChannelId, _, Data}}) ->
ssh_connection:adjust_window(ConnectionManager, ChannelId, byte_size(Data));
adjust_window(_) ->
ok.


%%%################################################################
%%%#
%%%# Tracing
Expand Down
3 changes: 2 additions & 1 deletion lib/ssh/src/ssh_connect.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@
send_window_size,
send_packet_size,
sent_close = false,
send_buf = []
send_buf = [],
window_handling_mode = auto
}).

-record(connection, {
Expand Down
31 changes: 28 additions & 3 deletions lib/ssh/src/ssh_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ these messages are handled by
-export([session_channel/2, session_channel/4,
exec/4, shell/2, subsystem/4, send/3, send/4, send/5,
send_eof/2, adjust_window/3, setenv/5, close/2, reply_request/4,
ptty_alloc/3, ptty_alloc/4]).
ptty_alloc/3, ptty_alloc/4,
set_window_handling_mode/3]).

%% Potential API currently unsupported and not tested
-export([window_change/4, window_change/6,
Expand Down Expand Up @@ -548,6 +549,27 @@ server-side channel processes.
adjust_window(ConnectionHandler, Channel, Bytes) ->
ssh_connection_handler:adjust_window(ConnectionHandler, Channel, Bytes).

%%--------------------------------------------------------------------
-doc """
Sets the SSH flow control mode. This can be done by the client- or server-side
channel process.

If the flow control mode is set to `auto` (default) then the channel process
does not need to take any action with regard to flow control.
If the flow control mode is set to `manual` then it is up to the channel process
to send the window adjust messages by use of
[ssh_connection:adjust_window/3](`adjust_window/3`).
The flow control mode can be changed at any time when the corresponding channel
is open.
""".
%%
-spec set_window_handling_mode(ConnectionRef, ChannelId, Mode) -> ok when
ConnectionRef :: ssh:connection_ref(),
ChannelId :: ssh:channel_id(),
Mode :: auto | manual.
set_window_handling_mode(ConnectionRef, ChannelId, Mode) ->
ssh_connection_handler:set_window_handling_mode(ConnectionRef, ChannelId, Mode).

%%--------------------------------------------------------------------
-doc """
Environment variables can be passed before starting the shell/command. Is to be
Expand Down Expand Up @@ -1817,11 +1839,14 @@ handle_cli_msg(C0, ChId, Reply0) ->

channel_data_reply_msg(ChannelId, Connection, DataType, Data) ->
case ssh_client_channel:cache_lookup(Connection#connection.channel_cache, ChannelId) of
#channel{recv_window_size = Size} = Channel ->
#channel{recv_window_size = Size, window_handling_mode = Mode} = Channel ->
WantedSize = Size - byte_size(Data),
ssh_client_channel:cache_update(Connection#connection.channel_cache,
Channel#channel{recv_window_size = WantedSize}),
adjust_window(self(), ChannelId, byte_size(Data)),
if Mode == auto ->
adjust_window(self(), ChannelId, byte_size(Data));
true -> ok
end,
reply_msg(Channel, Connection, {data, ChannelId, DataType, Data});
undefined ->
{[], Connection}
Expand Down
32 changes: 32 additions & 0 deletions lib/ssh/src/ssh_connection_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
info/1, info/2,
connection_info/2,
channel_info/3,
set_window_handling_mode/3,
adjust_window/3, close/2,
disconnect/4,
get_print_info/1,
Expand All @@ -95,6 +96,10 @@
-define(call_disconnectfun_and_log_cond(LogMsg, DetailedText, StateName, D),
call_disconnectfun_and_log_cond(LogMsg, DetailedText, ?MODULE, ?LINE, StateName, D)).

%% Minimum number of bytes reported by the "upper layer" that cause
%% #ssh_msg_channel_adjust_window to be sent to the SSH peer
-define(MIN_ADJUST, 64).

%%====================================================================
%% Start / stop
%%====================================================================
Expand Down Expand Up @@ -317,6 +322,12 @@ connection_info(ConnectionHandler, Options) ->
channel_info(ConnectionHandler, ChannelId, Options) ->
call(ConnectionHandler, {channel_info, ChannelId, Options}).

-spec set_window_handling_mode(connection_ref(),
channel_id(),
auto | manual) -> ok.
set_window_handling_mode(ConnectionHandler, Channel, Mode) ->
cast(ConnectionHandler, {set_window_handling_mode, Channel, Mode}).

%%--------------------------------------------------------------------
-spec adjust_window(connection_ref(),
channel_id(),
Expand Down Expand Up @@ -840,6 +851,16 @@ handle_event({call,From}, get_alg, _, D) ->
handle_event(cast, _, StateName, _) when not ?CONNECTED(StateName) ->
{keep_state_and_data, [postpone]};

handle_event(cast, {set_window_handling_mode, ChannelId, NewMode}, StateName, D) when ?CONNECTED(StateName) ->
case ssh_client_channel:cache_lookup(cache(D), ChannelId) of
#channel{window_handling_mode = OldMode} = Channel when OldMode /= NewMode ->
ssh_client_channel:cache_update(cache(D),
Channel#channel{window_handling_mode = NewMode});
_ ->
ok
end,
keep_state_and_data;

handle_event(cast, {adjust_window,ChannelId,Bytes}, StateName, D) when ?CONNECTED(StateName) ->
case ssh_client_channel:cache_lookup(cache(D), ChannelId) of
#channel{recv_window_size = WinSize,
Expand All @@ -851,6 +872,17 @@ handle_event(cast, {adjust_window,ChannelId,Bytes}, StateName, D) when ?CONNECTE
Channel#channel{recv_window_pending = Pending + Bytes}),
keep_state_and_data;

#channel{recv_window_size = WinSize,
recv_window_pending = Pending,
recv_packet_size = _PktSize} = Channel
when ((Bytes + Pending) < ?MIN_ADJUST andalso (WinSize > 0)) ->
%% It does not make sense to send updates of e.g. 1 byte
%% if we are still able to receive something
ssh_client_channel:cache_update(cache(D),
Channel#channel{recv_window_pending =
Pending + Bytes}),
keep_state_and_data;

#channel{recv_window_size = WinSize,
recv_window_pending = Pending,
remote_id = Id} = Channel ->
Expand Down
176 changes: 172 additions & 4 deletions lib/ssh/test/ssh_connection_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
gracefull_invalid_version/1,
kex_error/1,
interrupted_send/1,
manual_window_handling/1,
auto_window_handling/1,
max_channels_option/1,
no_sensitive_leak/1,
ptty_alloc/1,
Expand Down Expand Up @@ -129,6 +131,8 @@ all() ->
{group, openssh},
small_interrupted_send,
interrupted_send,
manual_window_handling,
auto_window_handling,
exec_erlang_term,
exec_erlang_term_non_default_shell,
exec_disabled,
Expand Down Expand Up @@ -231,12 +235,24 @@ end_per_group(_, Config) ->
Config.

%%--------------------------------------------------------------------
init_per_testcase(_TestCase, Config) ->
init_per_testcase(TestCase, Config) ->
PktSize = 1024, % 1KiB
NewConfig = case TestCase of
manual_window_handling ->
[{init_win_size, 2 * PktSize}, {init_pkt_size, PktSize},
{window_handling_mode, manual},
{adjust_timeout, 2000} | Config];
auto_window_handling ->
[{init_win_size, 2 * PktSize}, {init_pkt_size, PktSize},
{window_handling_mode, auto},
{adjust_timeout, 2000} | Config];
_ -> Config
end,
ssh:stop(),
ssh:start(),
{ok, TestLogHandlerRef} = ssh_test_lib:add_log_handler(),
ssh_test_lib:verify_sanity_check(Config),
[{log_handler_ref, TestLogHandlerRef} | Config].
ssh_test_lib:verify_sanity_check(NewConfig),
[{log_handler_ref, TestLogHandlerRef} | NewConfig].

end_per_testcase(TestCase, Config) ->
{ok, Events} = ssh_test_lib:get_log_events(
Expand Down Expand Up @@ -777,7 +793,7 @@ small_interrupted_send(Config) ->
interrupted_send(Config) ->
K = 1024,
SendSize = 10 * K * K,
EchoSize = 4 * K * K,
EchoSize = 10 * K * K,
do_interrupted_send(Config, SendSize, EchoSize, ok).

do_interrupted_send(Config, SendSize, EchoSize, SenderResult) ->
Expand Down Expand Up @@ -878,6 +894,158 @@ do_interrupted_send(Config, SendSize, EchoSize, SenderResult) ->
end
end.

manual_window_handling(Config) ->
do_adjusted_send(Config, 10, ok).

auto_window_handling(Config) ->
do_adjusted_send(Config, 10, fail).

sender_loop(Parent, _ConnRef, _ChannelId, _Bin, 0) ->
Parent ! {self(), ok};
sender_loop(Parent, ConnRef, ChannelId, Bin, Ctr) ->
SendBin = <<Ctr:32, Bin/binary>>,
ct:log("~p:~p Sending packet number ~p", [?MODULE, ?LINE, Ctr]),
ssh_connection:send(ConnRef, ChannelId, SendBin, 30000),
sender_loop(Parent, ConnRef, ChannelId, Bin, Ctr - 1).

mk_receive_loop_fun(ConnRef, ChannelId, RcvTmo, ReportAdjustFun) ->
fun(Continue, RcvCtr, 0) ->
receive
{ssh_cm, ConnRef, {eof, ChannelId}} ->
ct:log("~p:~p Received eof", [?MODULE, ?LINE]),
Continue(Continue, RcvCtr, 0);
{ssh_cm, ConnRef, {closed, ChannelId}} ->
ct:log("~p:~p Peer closed the channel, closing", [?MODULE, ?LINE]),
ssh:close(ConnRef);
What ->
ct:log("~p:~p What is this ~p?", [?MODULE, ?LINE, What]),
Continue(Continue, RcvCtr, 0)
end;
(Continue, RcvCtr, ExpectedCtr) ->
receive
{ssh_cm, ConnRef, {data, ChannelId, 0, <<Ctr:32, _/binary>> = Data}} when byte_size(Data) == 1024 ->
ct:log("~p:~p Received back number ~p", [?MODULE, ?LINE, Ctr]),
if ExpectedCtr /= Ctr ->
ct:fail("Unexpected counter value: ~p (expected ~p)", [Ctr, ExpectedCtr]);
true ->
Continue(Continue, RcvCtr + 1, ExpectedCtr - 1)
end;
{ssh_cm, ConnRef, {eof, ChannelId}} ->
ct:fail("Unexpected eof");
{ssh_cm, ConnRef, {closed,ChannelId}} ->
ct:fail("Unexpected eof")
after RcvTmo ->
ct:log("~p:~p After ~p packets should adjust window", [?MODULE, ?LINE, RcvCtr]),

%% This function will either send adjust_window message (if allowed)
%% or send a message to the parent PID, which it does not expect
%% The latter would mean that the sender sees our recv window as full
%% as no auto-adjust happened
ReportAdjustFun(),
Continue(Continue, RcvCtr, ExpectedCtr)
end
end.

do_adjusted_send(Config, NPkts, IsAdjustOk) ->
PrivDir = proplists:get_value(priv_dir, Config),
UserDir = filename:join(PrivDir, nopubkey), % to make sure we don't use public-key-auth
file:make_dir(UserDir),
PktSize = proplists:get_value(init_pkt_size, Config),
WHM = proplists:get_value(window_handling_mode, Config),
WinSize = proplists:get_value(init_win_size, Config),
SendSize = PktSize * NPkts,
SysDir = proplists:get_value(data_dir, Config),
EchoSS_spec = {ssh_echo_server, [SendSize, [{dbg,true}]]},
{Pid, Host, Port} = ssh_test_lib:daemon([{system_dir, SysDir},
{user_dir, UserDir},
{password, "morot"},
{subsystems, [{"echo_n",EchoSS_spec}]}]),
ct:log("~p:~p connect", [?MODULE,?LINE]),
ConnectionRef = ssh_test_lib:connect(Host, Port, [{silently_accept_hosts, true},
{user, "foo"},
{password, "morot"},
{user_interaction, false},
{user_dir, UserDir}]),
ct:log("~p:~p connected, ref ~p", [?MODULE,?LINE, ConnectionRef]),
%% Spawn listener. Otherwise we could get a deadlock due to filled buffers
Parent = self(),
ResultPid = spawn(
fun() ->
ct:log("~p:~p open channel",[?MODULE,?LINE]),
{ok, ChannelId} = ssh_connection:session_channel(ConnectionRef, WinSize, PktSize, infinity),
ct:log("~p:~p set window handling mode ~p", [?MODULE,?LINE, WHM]),
ssh_connection:set_window_handling_mode(ConnectionRef, ChannelId, WHM),
ct:log("~p:~p start subsystem", [?MODULE,?LINE]),
case ssh_connection:subsystem(ConnectionRef, ChannelId, "echo_n", infinity) of
success ->
Parent ! {self(), channelId, ChannelId},
ct:log("~p:~p Starting receiver loop", [?MODULE, ?LINE]),
AdjTmo = proplists:get_value(adjust_timeout, Config, 0),
ReportAdjustFun =
if IsAdjustOk == ok ->
fun() ->
ssh_connection:adjust_window(ConnectionRef, ChannelId, WinSize)
end;
true ->
fun() -> Parent ! adjust_nok end
end,
LoopFun = mk_receive_loop_fun(ConnectionRef, ChannelId, AdjTmo, ReportAdjustFun),
Result = LoopFun(LoopFun, 0, NPkts),
Parent ! {self(), result, Result};
Other ->
Parent ! {self(), channelId, error, Other}
end
end),
ct:log("Client channel handler ~p~n", [ResultPid]),
receive
{ResultPid, channelId, error, Other} ->
ct:log("~p:~p channelId error ~p", [?MODULE,?LINE,Other]),
ssh:close(ConnectionRef),
ssh:stop_daemon(Pid),
{fail, "ssh_connection:subsystem"};
{ResultPid, channelId, ChannelId} ->
ct:log("~p:~p Starting to send data in 1Kib chunks", [?MODULE, ?LINE]),
%% Prepare the binary of (1 KiB - 4 bytes) size
%% 4-byte counter is added when sending
SendBin = << <<X:32>> || X <- lists:duplicate(255, 0)>>,
SenderPid = spawn(fun() -> sender_loop(Parent, ConnectionRef, ChannelId, SendBin, 10) end),
ct:log("Sender pid ~p~n", [SenderPid]),
receive
{ResultPid, result, Result} ->
ct:log("~p:~p Receiver result: ~p", [?MODULE,?LINE,Result]),
ssh:close(ConnectionRef),
ssh:stop_daemon(Pid),
ct:log("~p:~p Check sender", [?MODULE,?LINE]),
receive
{SenderPid, SenderResult} ->
ct:log("~p:~p Sender result: ~p",
[?MODULE,?LINE, SenderResult]),
ok;
Msg ->
ct:log("~p:~p Unexpected msg: ~p",[?MODULE,?LINE,Msg]),
{fail, "Unexpected msg"}
end;
{SenderPid, SenderResult} ->
ct:log("~p:~p Sender result: ~p", [?MODULE,?LINE, SenderResult]),
receive
{ResultPid, result, Result} ->
ct:log("~p:~p Receiver result: ~p", [?MODULE,?LINE,Result]),
ssh:close(ConnectionRef),
ssh:stop_daemon(Pid),
ok;
Msg ->
ct:log("~p:~p Unexpected msg: ~p",[?MODULE,?LINE,Msg]),
{fail, "Unexpected msg"}
end;
Msg ->
ct:log("~p:~p Unexpected msg: ~p",[?MODULE,?LINE,Msg]),
ssh:stop_daemon(Pid),
catch exit(ResultPid, terminate),
catch exit(SenderPid, terminate),
{fail, "Unexpected msg"}
end
end.

%%--------------------------------------------------------------------
start_shell(Config) when is_list(Config) ->
PrivDir = proplists:get_value(priv_dir, Config),
Expand Down
7 changes: 4 additions & 3 deletions lib/ssh/test/ssh_echo_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@ handle_ssh_msg({ssh_cm, CM, {data, ChannelId, 0, Data}}, #state{n = N} = State)
M = N - size(Data),
case M > 0 of
true ->
?DBG(State, "ssh_cm data Cid=~p size(Data)=~p M=~p",[ChannelId,size(Data),M]),
ssh_connection:adjust_window(CM, ChannelId, size(Data)),
<<DWord:32, _/binary>> = Data,
?DBG(State, "ssh_cm data(~p) Cid=~p size(Data)=~p M=~p",[DWord, ChannelId,size(Data),M]),
ssh_connection:send(CM, ChannelId, Data),
{ok, State#state{n = M}};
false ->
<<SendData:N/binary, _/binary>> = Data,
?DBG(State, "ssh_cm data Cid=~p size(Data)=~p M=~p size(SendData)=~p~nSend eof",[ChannelId,size(Data),M,size(SendData)]),
?DBG(State, "ssh_cm data Cid=~p size(Data)=~p M=~p size(SendData)=~p~nSend eof",
[ChannelId, size(Data), M, size(SendData)]),
ssh_connection:send(CM, ChannelId, SendData),
ssh_connection:send_eof(CM, ChannelId),
{stop, ChannelId, State}
Expand Down
Loading