diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index d1d5bc7cbbd9e..27c0ba8601f1f 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -44,8 +44,8 @@ class ActiveCall # rubocop:disable Metrics/ClassLength include Core::CallOps extend Forwardable attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert - def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=, - :trailing_metadata, :status + def_delegators :@call, :cancel, :cancel_with_status, :metadata, + :write_flag, :write_flag=, :trailing_metadata, :status # client_invoke begins a client invocation. # @@ -620,6 +620,8 @@ def maybe_finish_and_close_call_locked # @param metadata [Hash] metadata to be sent to the server. If a value is # a list, multiple metadata for its key are sent def start_call(metadata = {}) + # TODO(apolcyn): we should cancel and clean up the call in case this + # send initial MD op fails. merge_metadata_to_send(metadata) && send_initial_metadata end @@ -665,9 +667,10 @@ def initialize(wrapped) # Operation limits access to an ActiveCall's methods for use as # a Operation on the client. - Operation = view_class(:cancel, :cancelled?, :deadline, :execute, - :metadata, :status, :start_call, :wait, :write_flag, - :write_flag=, :trailing_metadata) + # TODO(apolcyn): expose peer getter + Operation = view_class(:cancel, :cancel_with_status, :cancelled?, :deadline, + :execute, :metadata, :status, :start_call, :wait, + :write_flag, :write_flag=, :trailing_metadata) # InterceptableView further limits access to an ActiveCall's methods # for use in interceptors on the client, exposing only the deadline diff --git a/src/ruby/spec/call_spec.rb b/src/ruby/spec/call_spec.rb index c4296bb5ada74..880167515397b 100644 --- a/src/ruby/spec/call_spec.rb +++ b/src/ruby/spec/call_spec.rb @@ -90,88 +90,101 @@ describe '#status' do it 'can save the status and read it back' do - call = make_test_call - sts = Struct::Status.new(OK, 'OK') - expect { call.status = sts }.not_to raise_error - expect(call.status).to eq(sts) + make_test_call do |call| + sts = Struct::Status.new(OK, 'OK') + expect { call.status = sts }.not_to raise_error + expect(call.status).to eq(sts) + end end it 'must be set to a status' do - call = make_test_call - bad_sts = Object.new - expect { call.status = bad_sts }.to raise_error(TypeError) + make_test_call do |call| + bad_sts = Object.new + expect { call.status = bad_sts }.to raise_error(TypeError) + end end it 'can be set to nil' do - call = make_test_call - expect { call.status = nil }.not_to raise_error + make_test_call do |call| + expect { call.status = nil }.not_to raise_error + end end end describe '#metadata' do it 'can save the metadata hash and read it back' do - call = make_test_call - md = { 'k1' => 'v1', 'k2' => 'v2' } - expect { call.metadata = md }.not_to raise_error - expect(call.metadata).to be(md) + make_test_call do |call| + md = { 'k1' => 'v1', 'k2' => 'v2' } + expect { call.metadata = md }.not_to raise_error + expect(call.metadata).to be(md) + end end it 'must be set with a hash' do - call = make_test_call - bad_md = Object.new - expect { call.metadata = bad_md }.to raise_error(TypeError) + make_test_call do |call| + bad_md = Object.new + expect { call.metadata = bad_md }.to raise_error(TypeError) + end end it 'can be set to nil' do - call = make_test_call - expect { call.metadata = nil }.not_to raise_error + make_test_call do |call| + expect { call.metadata = nil }.not_to raise_error + end end end describe '#set_credentials!' do it 'can set a valid CallCredentials object' do - call = make_test_call - auth_proc = proc { { 'plugin_key' => 'plugin_value' } } - creds = GRPC::Core::CallCredentials.new auth_proc - expect { call.set_credentials! creds }.not_to raise_error + make_test_call do |call| + auth_proc = proc { { 'plugin_key' => 'plugin_value' } } + creds = GRPC::Core::CallCredentials.new auth_proc + expect { call.set_credentials! creds }.not_to raise_error + end end end describe '#cancel' do it 'completes ok' do - call = make_test_call - expect { call.cancel }.not_to raise_error + make_test_call do |call| + expect { call.cancel }.not_to raise_error + end end it 'completes ok when the call is closed' do - call = make_test_call - call.close - expect { call.cancel }.not_to raise_error + make_test_call do |call| + call.close + expect { call.cancel }.not_to raise_error + end end end describe '#cancel_with_status' do it 'completes ok' do - call = make_test_call - expect do - call.cancel_with_status(0, 'test status') - end.not_to raise_error - expect do - call.cancel_with_status(0, nil) - end.to raise_error(TypeError) + make_test_call do |call| + expect do + call.cancel_with_status(0, 'test status') + end.not_to raise_error + expect do + call.cancel_with_status(0, nil) + end.to raise_error(TypeError) + end end it 'completes ok when the call is closed' do - call = make_test_call - call.close - expect do - call.cancel_with_status(0, 'test status') - end.not_to raise_error + make_test_call do |call| + call.close + expect do + call.cancel_with_status(0, 'test status') + end.not_to raise_error + end end end def make_test_call - @ch.create_call(nil, nil, 'phony_method', nil, deadline) + call = @ch.create_call(nil, nil, 'phony_method', nil, deadline) + yield call + call.close end def deadline diff --git a/src/ruby/spec/channel_spec.rb b/src/ruby/spec/channel_spec.rb index 45f8c3a096a41..21a3a62386226 100644 --- a/src/ruby/spec/channel_spec.rb +++ b/src/ruby/spec/channel_spec.rb @@ -118,7 +118,8 @@ def construct_with_args(a) deadline = Time.now + 5 blk = proc do - ch.create_call(nil, nil, 'phony_method', nil, deadline) + call = ch.create_call(nil, nil, 'phony_method', nil, deadline) + call.close end expect(&blk).to_not raise_error end @@ -132,8 +133,9 @@ def construct_with_args(a) deadline = Time.now + 5 blk = proc do - ch.create_call(nil, nil, 'phony_method', nil, deadline) + call = ch.create_call(nil, nil, 'phony_method', nil, deadline) STDERR.puts "#{Time.now}: created call" + call.close end expect(&blk).to raise_error(RuntimeError) STDERR.puts "#{Time.now}: finished: raises an error if called on a closed channel" diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb index c27262ec70cad..0f4710e9e6f66 100644 --- a/src/ruby/spec/client_server_spec.rb +++ b/src/ruby/spec/client_server_spec.rb @@ -16,36 +16,8 @@ include GRPC::Core -shared_context 'setup: tags' do - let(:sent_message) { 'sent message' } - let(:reply_text) { 'the reply' } - - def deadline - Time.now + 5 - end - - def server_allows_client_to_proceed(metadata = {}) - recvd_rpc = @server.request_call - expect(recvd_rpc).to_not eq nil - server_call = recvd_rpc.call - ops = { CallOps::SEND_INITIAL_METADATA => metadata } - server_batch = server_call.run_batch(ops) - expect(server_batch.send_metadata).to be true - server_call - end - - def new_client_call - @ch.create_call(nil, nil, '/method', nil, deadline) - end - - def ok_status - Struct::Status.new(StatusCodes::OK, 'OK') - end -end - shared_examples 'basic GRPC message delivery is OK' do include GRPC::Core - include_context 'setup: tags' context 'the test channel' do it 'should have a target' do @@ -53,272 +25,45 @@ def ok_status end end - context 'a client call' do - it 'should have a peer' do - expect(new_client_call.peer).to be_a(String) - end - end - - it 'calls have peer info' do - call = new_client_call - expect(call.peer).to be_a(String) - end - - it 'servers receive requests from clients and can respond' do - call = new_client_call - server_call = nil - - server_thread = Thread.new do - server_call = server_allows_client_to_proceed - end - - client_ops = { - CallOps::SEND_INITIAL_METADATA => {}, - CallOps::SEND_MESSAGE => sent_message, - CallOps::SEND_CLOSE_FROM_CLIENT => nil - } - client_batch = call.run_batch(client_ops) - expect(client_batch.send_metadata).to be true - expect(client_batch.send_message).to be true - expect(client_batch.send_close).to be true - - # confirm the server can read the inbound message - server_thread.join - server_ops = { - CallOps::RECV_MESSAGE => nil - } - server_batch = server_call.run_batch(server_ops) - expect(server_batch.message).to eq(sent_message) - server_ops = { - CallOps::RECV_CLOSE_ON_SERVER => nil, - CallOps::SEND_STATUS_FROM_SERVER => ok_status - } - server_batch = server_call.run_batch(server_ops) - expect(server_batch.send_close).to be true - expect(server_batch.send_status).to be true - - # finish the call - final_client_batch = call.run_batch( - CallOps::RECV_INITIAL_METADATA => nil, - CallOps::RECV_STATUS_ON_CLIENT => nil) - expect(final_client_batch.metadata).to eq({}) - expect(final_client_batch.status.code).to eq(0) - end - - it 'responses written by servers are received by the client' do - call = new_client_call - server_call = nil - - server_thread = Thread.new do - server_call = server_allows_client_to_proceed - end - - client_ops = { - CallOps::SEND_INITIAL_METADATA => {}, - CallOps::SEND_MESSAGE => sent_message, - CallOps::SEND_CLOSE_FROM_CLIENT => nil - } - client_batch = call.run_batch(client_ops) - expect(client_batch.send_metadata).to be true - expect(client_batch.send_message).to be true - expect(client_batch.send_close).to be true - - # confirm the server can read the inbound message - server_thread.join - server_ops = { - CallOps::RECV_MESSAGE => nil - } - server_batch = server_call.run_batch(server_ops) - expect(server_batch.message).to eq(sent_message) - server_ops = { - CallOps::RECV_CLOSE_ON_SERVER => nil, - CallOps::SEND_MESSAGE => reply_text, - CallOps::SEND_STATUS_FROM_SERVER => ok_status - } - server_batch = server_call.run_batch(server_ops) - expect(server_batch.send_close).to be true - expect(server_batch.send_message).to be true - expect(server_batch.send_status).to be true - - # finish the call - final_client_batch = call.run_batch( - CallOps::RECV_INITIAL_METADATA => nil, - CallOps::RECV_MESSAGE => nil, - CallOps::RECV_STATUS_ON_CLIENT => nil) - expect(final_client_batch.metadata).to eq({}) - expect(final_client_batch.message).to eq(reply_text) - expect(final_client_batch.status.code).to eq(0) - end - - it 'compressed messages can be sent and received' do - call = new_client_call - server_call = nil - long_request_str = '0' * 2000 - long_response_str = '1' * 2000 - md = { 'grpc-internal-encoding-request' => 'gzip' } - - server_thread = Thread.new do - server_call = server_allows_client_to_proceed(md) + it 'unary calls work' do + run_services_on_server(@server, services: [EchoService]) do + call = @stub.an_rpc(EchoMsg.new, return_op: true) + expect(call.execute).to be_a(EchoMsg) end - - client_ops = { - CallOps::SEND_INITIAL_METADATA => md, - CallOps::SEND_MESSAGE => long_request_str, - CallOps::SEND_CLOSE_FROM_CLIENT => nil - } - client_batch = call.run_batch(client_ops) - expect(client_batch.send_metadata).to be true - expect(client_batch.send_message).to be true - expect(client_batch.send_close).to be true - - # confirm the server can read the inbound message - server_thread.join - server_ops = { - CallOps::RECV_MESSAGE => nil - } - server_batch = server_call.run_batch(server_ops) - expect(server_batch.message).to eq(long_request_str) - server_ops = { - CallOps::RECV_CLOSE_ON_SERVER => nil, - CallOps::SEND_MESSAGE => long_response_str, - CallOps::SEND_STATUS_FROM_SERVER => ok_status - } - server_batch = server_call.run_batch(server_ops) - expect(server_batch.send_close).to be true - expect(server_batch.send_message).to be true - expect(server_batch.send_status).to be true - - client_ops = { - CallOps::RECV_INITIAL_METADATA => nil, - CallOps::RECV_MESSAGE => nil, - CallOps::RECV_STATUS_ON_CLIENT => nil - } - final_client_batch = call.run_batch(client_ops) - expect(final_client_batch.metadata).to eq({}) - expect(final_client_batch.message).to eq long_response_str - expect(final_client_batch.status.code).to eq(0) end - it 'servers can ignore a client write and send a status' do - call = new_client_call - server_call = nil - - server_thread = Thread.new do - server_call = server_allows_client_to_proceed + it 'unary calls work when enabling compression' do + run_services_on_server(@server, services: [EchoService]) do + long_request_str = '0' * 2000 + md = { 'grpc-internal-encoding-request' => 'gzip' } + call = @stub.an_rpc(EchoMsg.new(msg: long_request_str), + return_op: true, + metadata: md) + response = call.execute + expect(response).to be_a(EchoMsg) + expect(response.msg).to eq(long_request_str) end - - client_ops = { - CallOps::SEND_INITIAL_METADATA => {}, - CallOps::SEND_MESSAGE => sent_message, - CallOps::SEND_CLOSE_FROM_CLIENT => nil - } - client_batch = call.run_batch(client_ops) - expect(client_batch.send_metadata).to be true - expect(client_batch.send_message).to be true - expect(client_batch.send_close).to be true - - # confirm the server can read the inbound message - the_status = Struct::Status.new(StatusCodes::OK, 'OK') - server_thread.join - server_ops = { - CallOps::SEND_STATUS_FROM_SERVER => the_status - } - server_batch = server_call.run_batch(server_ops) - expect(server_batch.message).to eq nil - expect(server_batch.send_status).to be true - - final_client_batch = call.run_batch( - CallOps::RECV_INITIAL_METADATA => nil, - CallOps::RECV_STATUS_ON_CLIENT => nil) - expect(final_client_batch.metadata).to eq({}) - expect(final_client_batch.status.code).to eq(0) - end - - it 'completes calls by sending status to client and server' do - call = new_client_call - server_call = nil - - server_thread = Thread.new do - server_call = server_allows_client_to_proceed - end - - client_ops = { - CallOps::SEND_INITIAL_METADATA => {}, - CallOps::SEND_MESSAGE => sent_message - } - client_batch = call.run_batch(client_ops) - expect(client_batch.send_metadata).to be true - expect(client_batch.send_message).to be true - - # confirm the server can read the inbound message and respond - the_status = Struct::Status.new(StatusCodes::OK, 'OK', {}) - server_thread.join - server_ops = { - CallOps::RECV_MESSAGE => nil - } - server_batch = server_call.run_batch(server_ops) - expect(server_batch.message).to eq sent_message - server_ops = { - CallOps::SEND_MESSAGE => reply_text, - CallOps::SEND_STATUS_FROM_SERVER => the_status - } - server_batch = server_call.run_batch(server_ops) - expect(server_batch.send_status).to be true - expect(server_batch.send_message).to be true - - # confirm the client can receive the server response and status. - client_ops = { - CallOps::SEND_CLOSE_FROM_CLIENT => nil, - CallOps::RECV_INITIAL_METADATA => nil, - CallOps::RECV_MESSAGE => nil, - CallOps::RECV_STATUS_ON_CLIENT => nil - } - final_client_batch = call.run_batch(client_ops) - expect(final_client_batch.send_close).to be true - expect(final_client_batch.message).to eq reply_text - expect(final_client_batch.status).to eq the_status - - # confirm the server can receive the client close. - server_ops = { - CallOps::RECV_CLOSE_ON_SERVER => nil - } - final_server_batch = server_call.run_batch(server_ops) - expect(final_server_batch.send_close).to be true end def client_cancel_test(cancel_proc, expected_code, expected_details) - call = new_client_call - server_call = nil - - server_thread = Thread.new do - server_call = server_allows_client_to_proceed + call = @stub.an_rpc(EchoMsg.new, return_op: true) + run_services_on_server(@server, services: [EchoService]) do + # start the call, but don't send a message yet + call.start_call + # cancel the call + cancel_proc.call(call) + # check the client's status + failed = false + begin + call.execute + rescue GRPC::BadStatus => e + failed = true + expect(e.code).to be expected_code + expect(e.details).to eq expected_details + end + expect(failed).to be(true) end - - client_ops = { - CallOps::SEND_INITIAL_METADATA => {}, - CallOps::RECV_INITIAL_METADATA => nil - } - client_batch = call.run_batch(client_ops) - expect(client_batch.send_metadata).to be true - expect(client_batch.metadata).to eq({}) - - cancel_proc.call(call) - - server_thread.join - server_ops = { - CallOps::RECV_CLOSE_ON_SERVER => nil - } - server_batch = server_call.run_batch(server_ops) - expect(server_batch.send_close).to be true - - client_ops = { - CallOps::RECV_STATUS_ON_CLIENT => {} - } - client_batch = call.run_batch(client_ops) - - expect(client_batch.status.code).to be expected_code - expect(client_batch.status.details).to eq expected_details end it 'clients can cancel a call on the server' do @@ -344,8 +89,6 @@ def client_cancel_test(cancel_proc, expected_code, end shared_examples 'GRPC metadata delivery works OK' do - include_context 'setup: tags' - describe 'from client => server' do before(:example) do n = 7 # arbitrary number of metadata @@ -364,53 +107,31 @@ def client_cancel_test(cancel_proc, expected_code, it 'raises an exception if a metadata key is invalid' do @bad_keys.each do |md| - call = new_client_call - client_ops = { - CallOps::SEND_INITIAL_METADATA => md - } - blk = proc do - call.run_batch(client_ops) + # NOTE: no need to run a server in this test b/c the failure + # happens while validating metadata to send. + failed = false + begin + @stub.an_rpc(EchoMsg.new, metadata: md) + rescue TypeError => e + failed = true + expect(e.message).to eq('grpc_rb_md_ary_fill_hash_cb: bad type for key parameter') end - expect(&blk).to raise_error + expect(failed).to be(true) end end it 'sends all the metadata pairs when keys and values are valid' do - @valid_metadata.each do |md| - recvd_rpc = nil - rcv_thread = Thread.new do - recvd_rpc = @server.request_call + service = EchoService.new + run_services_on_server(@server, services: [service]) do + @valid_metadata.each_with_index do |md, i| + expect(@stub.an_rpc(EchoMsg.new, metadata: md)).to be_a(EchoMsg) + # confirm the server can receive the client metadata + # finish the call + expect(service.received_md.length).to eq(i + 1) + md.each do |k, v| + expect(service.received_md[i][k.to_s]).to eq(v) + end end - - call = new_client_call - client_ops = { - CallOps::SEND_INITIAL_METADATA => md, - CallOps::SEND_CLOSE_FROM_CLIENT => nil - } - client_batch = call.run_batch(client_ops) - expect(client_batch.send_metadata).to be true - - # confirm the server can receive the client metadata - rcv_thread.join - expect(recvd_rpc).to_not eq nil - recvd_md = recvd_rpc.metadata - replace_symbols = Hash[md.each_pair.collect { |x, y| [x.to_s, y] }] - expect(recvd_md).to eq(recvd_md.merge(replace_symbols)) - - # finish the call - final_server_batch = recvd_rpc.call.run_batch( - CallOps::RECV_CLOSE_ON_SERVER => nil, - CallOps::SEND_INITIAL_METADATA => nil, - CallOps::SEND_STATUS_FROM_SERVER => ok_status) - expect(final_server_batch.send_close).to be(true) - expect(final_server_batch.send_metadata).to be(true) - expect(final_server_batch.send_status).to be(true) - - final_client_batch = call.run_batch( - CallOps::RECV_INITIAL_METADATA => nil, - CallOps::RECV_STATUS_ON_CLIENT => nil) - expect(final_client_batch.metadata).to eq({}) - expect(final_client_batch.status.code).to eq(0) end end end @@ -432,120 +153,61 @@ def client_cancel_test(cancel_proc, expected_code, end it 'raises an exception if a metadata key is invalid' do - @bad_keys.each do |md| - recvd_rpc = nil - rcv_thread = Thread.new do - recvd_rpc = @server.request_call - end - - call = new_client_call - # client signals that it's done sending metadata to allow server to - # respond - client_ops = { - CallOps::SEND_INITIAL_METADATA => nil - } - call.run_batch(client_ops) - - # server gets the invocation - rcv_thread.join - expect(recvd_rpc).to_not eq nil - server_ops = { - CallOps::SEND_INITIAL_METADATA => md - } - blk = proc do - recvd_rpc.call.run_batch(server_ops) + service = EchoService.new + run_services_on_server(@server, services: [service]) do + @bad_keys.each do |md| + proceed = Queue.new + server_exception = nil + service.on_call_started = proc do |call| + call.send_initial_metadata(md) + rescue TypeError => e + server_exception = e + ensure + proceed.push(1) + end + client_exception = nil + client_call = @stub.an_rpc(EchoMsg.new, return_op: true) + thr = Thread.new do + client_call.execute + rescue GRPC::BadStatus => e + client_exception = e + end + proceed.pop + # TODO(apolcyn): we shouldn't need this cancel here. It's + # only currently needed b/c the server does not seem to properly + # terminate the RPC if it fails to send initial metadata. That + # should be fixed, in which case this cancellation can be removed. + client_call.cancel + thr.join + p client_exception + expect(client_exception.nil?).to be(false) + expect(server_exception.nil?).to be(false) + expect(server_exception.message).to eq( + 'grpc_rb_md_ary_fill_hash_cb: bad type for key parameter') end - expect(&blk).to raise_error - - # cancel the call so the server can shut down immediately - call.cancel end end it 'sends an empty hash if no metadata is added' do - recvd_rpc = nil - rcv_thread = Thread.new do - recvd_rpc = @server.request_call + run_services_on_server(@server, services: [EchoService]) do + call = @stub.an_rpc(EchoMsg.new, return_op: true) + expect(call.execute).to be_a(EchoMsg) + expect(call.metadata).to eq({}) end - - call = new_client_call - # client signals that it's done sending metadata to allow server to - # respond - client_ops = { - CallOps::SEND_INITIAL_METADATA => nil, - CallOps::SEND_CLOSE_FROM_CLIENT => nil - } - client_batch = call.run_batch(client_ops) - expect(client_batch.send_metadata).to be true - expect(client_batch.send_close).to be true - - # server gets the invocation but sends no metadata back - rcv_thread.join - expect(recvd_rpc).to_not eq nil - server_call = recvd_rpc.call - server_ops = { - # receive close and send status to finish the call - CallOps::RECV_CLOSE_ON_SERVER => nil, - CallOps::SEND_INITIAL_METADATA => nil, - CallOps::SEND_STATUS_FROM_SERVER => ok_status - } - srv_batch = server_call.run_batch(server_ops) - expect(srv_batch.send_close).to be true - expect(srv_batch.send_metadata).to be true - expect(srv_batch.send_status).to be true - - # client receives nothing as expected - client_ops = { - CallOps::RECV_INITIAL_METADATA => nil, - # receive status to finish the call - CallOps::RECV_STATUS_ON_CLIENT => nil - } - final_client_batch = call.run_batch(client_ops) - expect(final_client_batch.metadata).to eq({}) - expect(final_client_batch.status.code).to eq(0) end it 'sends all the pairs when keys and values are valid' do - @valid_metadata.each do |md| - recvd_rpc = nil - rcv_thread = Thread.new do - recvd_rpc = @server.request_call + service = EchoService.new + run_services_on_server(@server, services: [service]) do + @valid_metadata.each do |md| + service.on_call_started = proc do |call| + call.send_initial_metadata(md) + end + call = @stub.an_rpc(EchoMsg.new, return_op: true) + call.execute + replace_symbols = Hash[md.each_pair.collect { |x, y| [x.to_s, y] }] + expect(call.metadata).to eq(replace_symbols) end - - call = new_client_call - # client signals that it's done sending metadata to allow server to - # respond - client_ops = { - CallOps::SEND_INITIAL_METADATA => nil, - CallOps::SEND_CLOSE_FROM_CLIENT => nil - } - client_batch = call.run_batch(client_ops) - expect(client_batch.send_metadata).to be true - expect(client_batch.send_close).to be true - - # server gets the invocation but sends no metadata back - rcv_thread.join - expect(recvd_rpc).to_not eq nil - server_call = recvd_rpc.call - server_ops = { - CallOps::RECV_CLOSE_ON_SERVER => nil, - CallOps::SEND_INITIAL_METADATA => md, - CallOps::SEND_STATUS_FROM_SERVER => ok_status - } - srv_batch = server_call.run_batch(server_ops) - expect(srv_batch.send_close).to be true - expect(srv_batch.send_metadata).to be true - expect(srv_batch.send_status).to be true - - # client receives nothing as expected - client_ops = { - CallOps::RECV_INITIAL_METADATA => nil, - CallOps::RECV_STATUS_ON_CLIENT => nil - } - final_client_batch = call.run_batch(client_ops) - replace_symbols = Hash[md.each_pair.collect { |x, y| [x.to_s, y] }] - expect(final_client_batch.metadata).to eq(replace_symbols) - expect(final_client_batch.status.code).to eq(0) end end end @@ -554,16 +216,11 @@ def client_cancel_test(cancel_proc, expected_code, describe 'the http client/server' do before(:example) do server_host = '0.0.0.0:0' - @server = new_core_server_for_testing(nil) + @server = new_rpc_server_for_testing server_port = @server.add_http2_port(server_host, :this_port_is_insecure) - @server.start @ch = Channel.new("0.0.0.0:#{server_port}", nil, :this_channel_is_insecure) - end - - after(:example) do - @ch.close - @server.shutdown_and_notify(deadline) - @server.close + @stub = EchoStub.new( + "0.0.0.0:#{server_port}", nil, channel_override: @ch) end it_behaves_like 'basic GRPC message delivery is OK' do @@ -574,8 +231,6 @@ def client_cancel_test(cancel_proc, expected_code, end describe 'the secure http client/server' do - include_context 'setup: tags' - def load_test_certs test_root = File.join(File.dirname(__FILE__), 'testdata') files = ['ca.pem', 'server1.key', 'server1.pem'] @@ -587,17 +242,14 @@ def load_test_certs server_host = '0.0.0.0:0' server_creds = GRPC::Core::ServerCredentials.new( nil, [{ private_key: certs[1], cert_chain: certs[2] }], false) - @server = new_core_server_for_testing(nil) + @server = new_rpc_server_for_testing server_port = @server.add_http2_port(server_host, server_creds) - @server.start args = { Channel::SSL_TARGET => 'foo.test.google.fr' } - @ch = Channel.new("0.0.0.0:#{server_port}", args, - GRPC::Core::ChannelCredentials.new(certs[0], nil, nil)) - end - - after(:example) do - @server.shutdown_and_notify(deadline) - @server.close + @ch = Channel.new( + "0.0.0.0:#{server_port}", args, + GRPC::Core::ChannelCredentials.new(certs[0], nil, nil)) + @stub = EchoStub.new( + "0.0.0.0:#{server_port}", nil, channel_override: @ch) end it_behaves_like 'basic GRPC message delivery is OK' do @@ -606,59 +258,25 @@ def load_test_certs it_behaves_like 'GRPC metadata delivery works OK' do end - def credentials_update_test(creds_update_md) - auth_proc = proc { creds_update_md } + it 'modifies metadata with CallCredentials' do + # create call creds + auth_proc = proc { { 'k1' => 'v1' } } call_creds = GRPC::Core::CallCredentials.new(auth_proc) - - initial_md_key = 'k2' - initial_md_val = 'v2' - initial_md = { initial_md_key => initial_md_val } - expected_md = creds_update_md.clone - fail 'bad test param' unless expected_md[initial_md_key].nil? - expected_md[initial_md_key] = initial_md_val - - recvd_rpc = nil - rcv_thread = Thread.new do - recvd_rpc = @server.request_call + # create arbitrary custom metadata + custom_md = { 'k2' => 'v2' } + # perform an RPC + echo_service = EchoService.new + run_services_on_server(@server, services: [echo_service]) do + expect(@stub.an_rpc(EchoMsg.new, + credentials: call_creds, + metadata: custom_md)).to be_a(EchoMsg) + end + # call creds metadata should be merged with custom MD + expect(echo_service.received_md.length).to eq(1) + expected_md = { 'k1' => 'v1', 'k2' => 'v2' } + expected_md.each do |k, v| + expect(echo_service.received_md[0][k]).to eq(v) end - - call = new_client_call - call.set_credentials! call_creds - - client_batch = call.run_batch( - CallOps::SEND_INITIAL_METADATA => initial_md, - CallOps::SEND_CLOSE_FROM_CLIENT => nil) - expect(client_batch.send_metadata).to be true - expect(client_batch.send_close).to be true - - # confirm the server can receive the client metadata - rcv_thread.join - expect(recvd_rpc).to_not eq nil - recvd_md = recvd_rpc.metadata - replace_symbols = Hash[expected_md.each_pair.collect { |x, y| [x.to_s, y] }] - expect(recvd_md).to eq(recvd_md.merge(replace_symbols)) - - credentials_update_test_finish_call(call, recvd_rpc.call) - end - - def credentials_update_test_finish_call(client_call, server_call) - final_server_batch = server_call.run_batch( - CallOps::RECV_CLOSE_ON_SERVER => nil, - CallOps::SEND_INITIAL_METADATA => nil, - CallOps::SEND_STATUS_FROM_SERVER => ok_status) - expect(final_server_batch.send_close).to be(true) - expect(final_server_batch.send_metadata).to be(true) - expect(final_server_batch.send_status).to be(true) - - final_client_batch = client_call.run_batch( - CallOps::RECV_INITIAL_METADATA => nil, - CallOps::RECV_STATUS_ON_CLIENT => nil) - expect(final_client_batch.metadata).to eq({}) - expect(final_client_batch.status.code).to eq(0) - end - - it 'modifies metadata with CallCredentials' do - credentials_update_test('k1' => 'updated-v1') end it 'modifies large metadata with CallCredentials' do @@ -666,11 +284,34 @@ def credentials_update_test_finish_call(client_call, server_call) '00000000000000000000000000000000000000000000000000000000000000', '11111111111111111111111111111111111111111111111111111111111111', ) - md = { - k3: val_array, - k4: '0000000000000000000000000000000000000000000000000000000000', - keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey5: 'v1' + # create call creds + auth_proc = proc do + { + k2: val_array, + k3: '0000000000000000000000000000000000000000000000000000000000', + keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey4: 'v4' + } + end + call_creds = GRPC::Core::CallCredentials.new(auth_proc) + # create arbitrary custom metadata + custom_md = { k1: 'v1' } + # perform an RPC + echo_service = EchoService.new + run_services_on_server(@server, services: [echo_service]) do + expect(@stub.an_rpc(EchoMsg.new, + credentials: call_creds, + metadata: custom_md)).to be_a(EchoMsg) + end + # call creds metadata should be merged with custom MD + expect(echo_service.received_md.length).to eq(1) + expected_md = { + k1: 'v1', + k2: val_array, + k3: '0000000000000000000000000000000000000000000000000000000000', + keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey4: 'v4' } - credentials_update_test(md) + expected_md.each do |k, v| + expect(echo_service.received_md[0][k.to_s]).to eq(v) + end end end diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 1a686bbe18534..5bd28d3c3631b 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -55,17 +55,20 @@ def inner_call_of_active_call(active_call) end @ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil, :this_channel_is_insecure) + @call = make_test_call end after(:each) do @server.shutdown_and_notify(deadline) @server.close @server_thread.join + # Don't rely on GC to unref the call, since that can prevent + # the channel connectivity state polling thread from shutting down. + @call.close end describe 'restricted view methods' do before(:each) do - @call = make_test_call ActiveCall.client_invoke(@call) @client_call = ActiveCall.new(@call, @pass_through, @pass_through, deadline) @@ -117,9 +120,8 @@ def inner_call_of_active_call(active_call) describe '#remote_send' do it 'allows a client to send a payload to the server', test: true do - call = make_test_call - ActiveCall.client_invoke(call) - client_call = ActiveCall.new(call, @pass_through, + ActiveCall.client_invoke(@call) + client_call = ActiveCall.new(@call, @pass_through, @pass_through, deadline) msg = 'message is a string' client_call.remote_send(msg) @@ -137,15 +139,14 @@ def inner_call_of_active_call(active_call) expect(server_call.remote_read).to eq(msg) # finish the call server_call.send_initial_metadata - call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) - send_and_receive_close_and_status(call, recvd_call) + @call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) + send_and_receive_close_and_status(@call, recvd_call) end it 'marshals the payload using the marshal func' do - call = make_test_call - ActiveCall.client_invoke(call) + ActiveCall.client_invoke(@call) marshal = proc { |x| 'marshalled:' + x } - client_call = ActiveCall.new(call, marshal, @pass_through, deadline) + client_call = ActiveCall.new(@call, marshal, @pass_through, deadline) msg = 'message is a string' client_call.remote_send(msg) @@ -161,23 +162,22 @@ def inner_call_of_active_call(active_call) metadata_received: true) expect(server_call.remote_read).to eq('marshalled:' + msg) # finish the call - call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) - send_and_receive_close_and_status(call, recvd_call) + @call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) + send_and_receive_close_and_status(@call, recvd_call) end TEST_WRITE_FLAGS = [WriteFlags::BUFFER_HINT, WriteFlags::NO_COMPRESS] TEST_WRITE_FLAGS.each do |f| it "successfully makes calls with write_flag set to #{f}" do - call = make_test_call - ActiveCall.client_invoke(call) + ActiveCall.client_invoke(@call) marshal = proc { |x| 'marshalled:' + x } - client_call = ActiveCall.new(call, marshal, + client_call = ActiveCall.new(@call, marshal, @pass_through, deadline) msg = 'message is a string' client_call.write_flag = f client_call.remote_send(msg) # flush the message in case writes are set to buffered - call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) if f == 1 + @call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) if f == 1 # confirm that the message was marshalled recvd_rpc = @received_rpcs_queue.pop @@ -199,9 +199,8 @@ def inner_call_of_active_call(active_call) describe 'sending initial metadata', send_initial_metadata: true do it 'sends metadata before sending a message if it hasnt been sent yet' do - call = make_test_call @client_call = ActiveCall.new( - call, + @call, @pass_through, @pass_through, deadline, @@ -213,13 +212,13 @@ def inner_call_of_active_call(active_call) message = 'phony message' - expect(call).to( + expect(@call).to( receive(:run_batch) .with( hash_including( CallOps::SEND_INITIAL_METADATA => metadata)).once) - expect(call).to( + expect(@call).to( receive(:run_batch).with(hash_including( CallOps::SEND_MESSAGE => message)).once) @client_call.remote_send(message) @@ -228,14 +227,12 @@ def inner_call_of_active_call(active_call) end it 'doesnt send metadata if it thinks its already been sent' do - call = make_test_call - - @client_call = ActiveCall.new(call, + @client_call = ActiveCall.new(@call, @pass_through, @pass_through, deadline) expect(@client_call.metadata_sent).to eql(true) - expect(call).to( + expect(@call).to( receive(:run_batch).with(hash_including( CallOps::SEND_INITIAL_METADATA)).never) @@ -243,9 +240,7 @@ def inner_call_of_active_call(active_call) end it 'sends metadata if it is explicitly sent and ok to do so' do - call = make_test_call - - @client_call = ActiveCall.new(call, + @client_call = ActiveCall.new(@call, @pass_through, @pass_through, deadline, @@ -257,7 +252,7 @@ def inner_call_of_active_call(active_call) @client_call.merge_metadata_to_send(metadata) expect(@client_call.metadata_to_send).to eq(metadata) - expect(call).to( + expect(@call).to( receive(:run_batch).with(hash_including( CallOps::SEND_INITIAL_METADATA => metadata)).once) @@ -265,9 +260,7 @@ def inner_call_of_active_call(active_call) end it 'explicit sending does nothing if metadata has already been sent' do - call = make_test_call - - @client_call = ActiveCall.new(call, + @client_call = ActiveCall.new(@call, @pass_through, @pass_through, deadline) @@ -284,7 +277,6 @@ def inner_call_of_active_call(active_call) describe '#merge_metadata_to_send', merge_metadata_to_send: true do it 'adds to existing metadata when there is existing metadata to send' do - call = make_test_call starting_metadata = { k1: 'key1_val', k2: 'key2_val', @@ -292,7 +284,7 @@ def inner_call_of_active_call(active_call) } @client_call = ActiveCall.new( - call, + @call, @pass_through, @pass_through, deadline, started: false, @@ -318,9 +310,8 @@ def inner_call_of_active_call(active_call) end it 'fails when initial metadata has already been sent' do - call = make_test_call @client_call = ActiveCall.new( - call, + @call, @pass_through, @pass_through, deadline, @@ -338,9 +329,8 @@ def inner_call_of_active_call(active_call) describe '#client_invoke' do it 'sends metadata to the server when present' do - call = make_test_call metadata = { k1: 'v1', k2: 'v2' } - ActiveCall.client_invoke(call, metadata) + ActiveCall.client_invoke(@call, metadata) recvd_rpc = @received_rpcs_queue.pop recvd_call = recvd_rpc.call expect(recvd_call).to_not be_nil @@ -349,15 +339,14 @@ def inner_call_of_active_call(active_call) expect(recvd_rpc.metadata['k2']).to eq('v2') # finish the call recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) - call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) - send_and_receive_close_and_status(call, recvd_call) + @call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) + send_and_receive_close_and_status(@call, recvd_call) end end describe '#send_status', send_status: true do it 'works when no metadata or messages have been sent yet' do - call = make_test_call - ActiveCall.client_invoke(call) + ActiveCall.client_invoke(@call) recvd_rpc = @received_rpcs_queue.pop server_call = ActiveCall.new( @@ -375,9 +364,8 @@ def inner_call_of_active_call(active_call) describe '#remote_read', remote_read: true do it 'reads the response sent by a server' do - call = make_test_call - ActiveCall.client_invoke(call) - client_call = ActiveCall.new(call, @pass_through, + ActiveCall.client_invoke(@call) + client_call = ActiveCall.new(@call, @pass_through, @pass_through, deadline) msg = 'message is a string' client_call.remote_send(msg) @@ -385,13 +373,12 @@ def inner_call_of_active_call(active_call) server_call.remote_send('server_response') expect(client_call.remote_read).to eq('server_response') send_and_receive_close_and_status( - call, inner_call_of_active_call(server_call)) + @call, inner_call_of_active_call(server_call)) end it 'saves no metadata when the server adds no metadata' do - call = make_test_call - ActiveCall.client_invoke(call) - client_call = ActiveCall.new(call, @pass_through, + ActiveCall.client_invoke(@call) + client_call = ActiveCall.new(@call, @pass_through, @pass_through, deadline) msg = 'message is a string' client_call.remote_send(msg) @@ -401,13 +388,12 @@ def inner_call_of_active_call(active_call) client_call.remote_read expect(client_call.metadata).to eq({}) send_and_receive_close_and_status( - call, inner_call_of_active_call(server_call)) + @call, inner_call_of_active_call(server_call)) end it 'saves metadata add by the server' do - call = make_test_call - ActiveCall.client_invoke(call) - client_call = ActiveCall.new(call, @pass_through, + ActiveCall.client_invoke(@call) + client_call = ActiveCall.new(@call, @pass_through, @pass_through, deadline) msg = 'message is a string' client_call.remote_send(msg) @@ -418,12 +404,11 @@ def inner_call_of_active_call(active_call) expected = { 'k1' => 'v1', 'k2' => 'v2' } expect(client_call.metadata).to eq(expected) send_and_receive_close_and_status( - call, inner_call_of_active_call(server_call)) + @call, inner_call_of_active_call(server_call)) end it 'get a status from server when nothing else sent from server' do - client_call = make_test_call - ActiveCall.client_invoke(client_call) + ActiveCall.client_invoke(@call) recvd_rpc = @received_rpcs_queue.pop recvd_call = recvd_rpc.call @@ -438,22 +423,21 @@ def inner_call_of_active_call(active_call) server_call.send_status(OK, 'OK') # Check that we can receive initial metadata and a status - client_call.run_batch( + @call.run_batch( CallOps::RECV_INITIAL_METADATA => nil) - batch_result = client_call.run_batch( + batch_result = @call.run_batch( CallOps::RECV_STATUS_ON_CLIENT => nil) expect(batch_result.status.code).to eq(OK) end it 'get a nil msg before a status when an OK status is sent' do - call = make_test_call - ActiveCall.client_invoke(call) - client_call = ActiveCall.new(call, @pass_through, + ActiveCall.client_invoke(@call) + client_call = ActiveCall.new(@call, @pass_through, @pass_through, deadline) msg = 'message is a string' client_call.remote_send(msg) - call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) + @call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) server_call = expect_server_to_receive(msg) server_call.remote_send('server_response') server_call.send_status(OK, 'OK') @@ -463,10 +447,9 @@ def inner_call_of_active_call(active_call) end it 'unmarshals the response using the unmarshal func' do - call = make_test_call - ActiveCall.client_invoke(call) + ActiveCall.client_invoke(@call) unmarshal = proc { |x| 'unmarshalled:' + x } - client_call = ActiveCall.new(call, @pass_through, + client_call = ActiveCall.new(@call, @pass_through, unmarshal, deadline) # confirm the client receives the unmarshalled message @@ -476,14 +459,13 @@ def inner_call_of_active_call(active_call) server_call.remote_send('server_response') expect(client_call.remote_read).to eq('unmarshalled:server_response') send_and_receive_close_and_status( - call, inner_call_of_active_call(server_call)) + @call, inner_call_of_active_call(server_call)) end end describe '#each_remote_read' do it 'creates an Enumerator' do - call = make_test_call - client_call = ActiveCall.new(call, @pass_through, + client_call = ActiveCall.new(@call, @pass_through, @pass_through, deadline) expect(client_call.each_remote_read).to be_a(Enumerator) # finish the call @@ -491,9 +473,8 @@ def inner_call_of_active_call(active_call) end it 'the returned enumerator can read n responses' do - call = make_test_call - ActiveCall.client_invoke(call) - client_call = ActiveCall.new(call, @pass_through, + ActiveCall.client_invoke(@call) + client_call = ActiveCall.new(@call, @pass_through, @pass_through, deadline) msg = 'message is a string' reply = 'server_response' @@ -506,18 +487,17 @@ def inner_call_of_active_call(active_call) expect(e.next).to eq(reply) end send_and_receive_close_and_status( - call, inner_call_of_active_call(server_call)) + @call, inner_call_of_active_call(server_call)) end it 'the returns an enumerator that stops after an OK Status' do - call = make_test_call - ActiveCall.client_invoke(call) - client_call = ActiveCall.new(call, @pass_through, + ActiveCall.client_invoke(@call) + client_call = ActiveCall.new(@call, @pass_through, @pass_through, deadline) msg = 'message is a string' reply = 'server_response' client_call.remote_send(msg) - call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) + @call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) server_call = expect_server_to_receive(msg) e = client_call.each_remote_read n = 3 # arbitrary value > 1 @@ -532,14 +512,13 @@ def inner_call_of_active_call(active_call) describe '#closing the call from the client' do it 'finishes ok if the server sends a status response' do - call = make_test_call - ActiveCall.client_invoke(call) - client_call = ActiveCall.new(call, @pass_through, + ActiveCall.client_invoke(@call) + client_call = ActiveCall.new(@call, @pass_through, @pass_through, deadline) msg = 'message is a string' client_call.remote_send(msg) expect do - call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) + @call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) end.to_not raise_error server_call = expect_server_to_receive(msg) server_call.remote_send('server_response') @@ -549,9 +528,8 @@ def inner_call_of_active_call(active_call) end it 'finishes ok if the server sends an early status response' do - call = make_test_call - ActiveCall.client_invoke(call) - client_call = ActiveCall.new(call, @pass_through, + ActiveCall.client_invoke(@call) + client_call = ActiveCall.new(@call, @pass_through, @pass_through, deadline) msg = 'message is a string' client_call.remote_send(msg) @@ -560,15 +538,14 @@ def inner_call_of_active_call(active_call) server_call.send_status(OK, 'status code is OK') expect(client_call.remote_read).to eq('server_response') expect do - call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) + @call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) end.to_not raise_error expect { client_call.receive_and_check_status }.to_not raise_error end it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do - call = make_test_call - ActiveCall.client_invoke(call) - client_call = ActiveCall.new(call, @pass_through, + ActiveCall.client_invoke(@call) + client_call = ActiveCall.new(@call, @pass_through, @pass_through, deadline) msg = 'message is a string' client_call.remote_send(msg) @@ -577,7 +554,7 @@ def inner_call_of_active_call(active_call) server_call.send_status(OK, 'status code is OK') expect(client_call.remote_read).to eq('server_response') expect do - call.run_batch( + @call.run_batch( CallOps::SEND_CLOSE_FROM_CLIENT => nil, CallOps::RECV_STATUS_ON_CLIENT => nil) end.to_not raise_error @@ -631,6 +608,7 @@ def inner_call_of_active_call(active_call) batch_result = @client_call.run_batch( CallOps::RECV_STATUS_ON_CLIENT => nil) expect(batch_result.status.code).to eq(@server_status) + @client_call.close end it 'sends the initial metadata implicitly if not already sent' do diff --git a/src/ruby/spec/support/services.rb b/src/ruby/spec/support/services.rb index a5d8e7c187b83..f4d0f68f20a25 100644 --- a/src/ruby/spec/support/services.rb +++ b/src/ruby/spec/support/services.rb @@ -41,14 +41,17 @@ class EchoService rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg) rpc :a_client_streaming_rpc_unimplemented, stream(EchoMsg), EchoMsg attr_reader :received_md + attr_accessor :on_call_started def initialize(**kw) @trailing_metadata = kw @received_md = [] + @on_call_started = nil end def an_rpc(req, call) GRPC.logger.info('echo service received a request') + on_call_started&.call(call) call.output_metadata.update(@trailing_metadata) @received_md << call.metadata unless call.metadata.nil? req