From 0375748e2cf5bf23861dea39d062a5011e918405 Mon Sep 17 00:00:00 2001 From: Michael Rebello Date: Tue, 28 May 2024 17:18:18 -0700 Subject: [PATCH 01/10] updates Signed-off-by: Michael Rebello --- .../Interceptors/ConnectInterceptor.swift | 66 ++++++++++--- .../Interceptors/GRPCWebInterceptor.swift | 93 ++++++++++++++++--- .../Streaming/BidirectionalAsyncStream.swift | 6 +- .../Streaming/BidirectionalStream.swift | 3 - .../Streaming/ClientOnlyAsyncStream.swift | 41 ++++++++ .../Internal/Streaming/ClientOnlyStream.swift | 77 +++++++++++++++ .../StreamResult+ClientOnlyStream.swift | 54 +++++++++++ .../Internal/Streaming/URLSessionStream.swift | 8 +- .../Connect/Internal/Utilities/Locked.swift | 15 +-- .../PackageInternal/ConnectError+GRPC.swift | 8 +- .../Connect/PackageInternal/Envelope.swift | 10 ++ .../Clients/ProtocolClient.swift | 14 +-- .../Clients/URLSessionHTTPClient.swift | 8 +- .../Public/Interfaces/ConnectError.swift | 13 +-- .../ClientOnlyAsyncStreamInterface.swift | 5 +- .../Callbacks/ClientOnlyStreamInterface.swift | 5 +- .../MockClientOnlyAsyncStream.swift | 9 +- .../ConnectMocks/MockClientOnlyStream.swift | 8 +- .../Extensions/ConnectError+Extensions.swift | 8 +- .../ConnectNIO/Internal/GRPCInterceptor.swift | 62 +++++++++++-- .../InvocationConfigs/nio-opt-outs.txt | 26 ------ .../InvocationConfigs/urlsession-opt-outs.txt | 17 ---- .../Sources/ConformanceInvoker.swift | 4 +- 23 files changed, 413 insertions(+), 147 deletions(-) create mode 100644 Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift create mode 100644 Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift create mode 100644 Libraries/Connect/Internal/Streaming/StreamResult+ClientOnlyStream.swift delete mode 100644 Tests/ConformanceClient/InvocationConfigs/nio-opt-outs.txt delete mode 100644 Tests/ConformanceClient/InvocationConfigs/urlsession-opt-outs.txt diff --git a/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift b/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift index 6d06a41e..369e6b32 100644 --- a/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift +++ b/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift @@ -83,28 +83,50 @@ extension ConnectInterceptor: UnaryInterceptor { ] = current.value }) - if let encoding = response.headers[HeaderConstants.contentEncoding]?.first, - let compressionPool = self.config.responseCompressionPool(forName: encoding), - let message = response.message.flatMap({ try? compressionPool.decompress(data: $0) }) - { - proceed(HTTPResponse( - code: response.code, - headers: headers, - message: message, - trailers: trailers, - error: response.error, + let finalResponse: HTTPResponse + let contentType = response.headers[HeaderConstants.contentType]?.first ?? "" + if response.code == .ok && !contentType.hasPrefix("application/\(config.codec.name())") { + // If content-type looks like it could be an RPC server's response, consider + // this an internal error. + let code: Code = contentType.hasPrefix("application/") ? .internalError : .unknown + finalResponse = HTTPResponse( + code: code, headers: headers, message: nil, trailers: trailers, + error: ConnectError(code: code, message: "unexpected content-type: \(contentType)"), tracingInfo: response.tracingInfo - )) + ) + } else if let encoding = response.headers[HeaderConstants.contentEncoding]?.first { + if let compressionPool = self.config.responseCompressionPool(forName: encoding), + let message = response.message.flatMap({ try? compressionPool.decompress(data: $0) }) + { + finalResponse = HTTPResponse( + code: response.code, + headers: headers, + message: message, + trailers: trailers, + error: response.error, + tracingInfo: response.tracingInfo + ) + } else { + finalResponse = HTTPResponse( + code: .internalError, + headers: headers, + message: nil, + trailers: trailers, + error: ConnectError(code: .internalError, message: "unexpected encoding"), + tracingInfo: response.tracingInfo + ) + } } else { - proceed(HTTPResponse( + finalResponse = HTTPResponse( code: response.code, headers: headers, message: response.message, trailers: trailers, error: response.error, tracingInfo: response.tracingInfo - )) + ) } + proceed(finalResponse) } } @@ -146,8 +168,22 @@ extension ConnectInterceptor: StreamInterceptor { switch result { case .headers(let headers): self.streamResponseHeaders.value = headers - proceed(result) + let contentType = headers[HeaderConstants.contentType]?.first ?? "" + if contentType != "application/connect+\(self.config.codec.name())" { + // If content-type looks like it could be an RPC server's response, consider + // this an internal error. + let code: Code = contentType.hasPrefix("application/connect+") + ? .internalError + : .unknown + proceed(.complete( + code: code, error: ConnectError( + code: code, message: "unexpected content-type: \(contentType)" + ), trailers: nil + )) + } else { + proceed(result) + } case .message(let data): do { let responseCompressionPool = self.streamResponseHeaders.value?[ @@ -168,7 +204,7 @@ extension ConnectInterceptor: StreamInterceptor { error: response.error, trailers: response.metadata )) - } else { + } else if !message.isEmpty { proceed(.message(message)) } } catch let error { diff --git a/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift b/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift index 63f09381..2e22f1c9 100644 --- a/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift +++ b/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift @@ -61,12 +61,38 @@ extension GRPCWebInterceptor: UnaryInterceptor { response.headers, trailers: response.trailers ) + if grpcCode != .ok || connectError != nil { + proceed(HTTPResponse( + code: grpcCode, + headers: response.headers, + message: response.message, + trailers: response.trailers, + error: connectError, + tracingInfo: response.tracingInfo + )) + } else { + proceed(HTTPResponse( + code: .unimplemented, + headers: response.headers, + message: response.message, + trailers: response.trailers, + error: ConnectError.unaryResponseHasNoMessage(), + tracingInfo: response.tracingInfo + )) + } + return + } + + let contentType = response.headers[HeaderConstants.contentType]?.first ?? "" + if response.code == .ok + && !self.contentTypeIsExpectedGRPCWeb(contentType, expectedCodec: config.codec) + { + // If content-type looks like it could be a gRPC server's response, consider + // this an internal error. + let code: Code = self.contentTypeIsGRPCWeb(contentType) ? .internalError : .unknown proceed(HTTPResponse( - code: grpcCode, - headers: response.headers, - message: response.message, - trailers: response.trailers, - error: connectError, + code: code, headers: response.headers, message: nil, trailers: response.trailers, + error: ConnectError(code: code, message: "unexpected content-type: \(contentType)"), tracingInfo: response.tracingInfo )) return @@ -107,7 +133,7 @@ extension GRPCWebInterceptor: UnaryInterceptor { } } catch let error { proceed(HTTPResponse( - code: .unknown, + code: .unimplemented, headers: response.headers, message: response.message, trailers: response.trailers, @@ -146,6 +172,19 @@ extension GRPCWebInterceptor: StreamInterceptor { ) { switch result { case .headers(let headers): + let contentType = headers[HeaderConstants.contentType]?.first ?? "" + if !self.contentTypeIsExpectedGRPCWeb(contentType, expectedCodec: self.config.codec) { + // If content-type looks like it could be a gRPC server's response, consider + // this an internal error. + let code: Code = self.contentTypeIsGRPCWeb(contentType) ? .internalError : .unknown + proceed(.complete( + code: code, error: ConnectError( + code: code, message: "unexpected content-type: \(contentType)" + ), trailers: headers + )) + return + } + if let grpcCode = headers.grpcStatus() { // Headers-only response. proceed(.complete( @@ -181,7 +220,7 @@ extension GRPCWebInterceptor: StreamInterceptor { trailers: trailers )) } - } else { + } else if !unpackedData.isEmpty { proceed(.message(unpackedData)) } } catch let error { @@ -193,9 +232,20 @@ extension GRPCWebInterceptor: StreamInterceptor { proceed(result) } } -} -// MARK: - Private + // MARK: - Private + + private func contentTypeIsGRPCWeb(_ contentType: String) -> Bool { + return contentType == "application/grpc-web" + || contentType.hasPrefix("application/grpc-web+") + } + + private func contentTypeIsExpectedGRPCWeb(_ contentType: String, expectedCodec: Codec) -> Bool { + let codecName = expectedCodec.name() + return (codecName == "proto" && contentType == "application/grpc-web") + || contentType == "application/grpc-web+\(codecName)" + } +} private struct TrailersDecodingError: Error {} @@ -228,13 +278,22 @@ private extension Trailers { private extension HTTPResponse { func withHandledGRPCWebTrailers(_ trailers: Trailers, message: Data?) -> Self { let (grpcCode, error) = ConnectError.parseGRPCHeaders(self.headers, trailers: trailers) - if grpcCode == .ok { + if grpcCode != .ok || error != nil { return HTTPResponse( code: grpcCode, headers: self.headers, - message: message, + message: nil, trailers: trailers, - error: nil, + error: error, + tracingInfo: self.tracingInfo + ) + } else if message?.isEmpty != false { + return HTTPResponse( + code: .unimplemented, + headers: self.headers, + message: nil, + trailers: trailers, + error: ConnectError.unaryResponseHasNoMessage(), tracingInfo: self.tracingInfo ) } else { @@ -243,9 +302,17 @@ private extension HTTPResponse { headers: self.headers, message: message, trailers: trailers, - error: error, + error: nil, tracingInfo: self.tracingInfo ) } } } + +private extension ConnectError { + static func unaryResponseHasNoMessage() -> Self { + return ConnectError( + code: .unimplemented, message: "unary response has no message" + ) + } +} diff --git a/Libraries/Connect/Internal/Streaming/BidirectionalAsyncStream.swift b/Libraries/Connect/Internal/Streaming/BidirectionalAsyncStream.swift index 565fc4b8..2e3e9d8d 100644 --- a/Libraries/Connect/Internal/Streaming/BidirectionalAsyncStream.swift +++ b/Libraries/Connect/Internal/Streaming/BidirectionalAsyncStream.swift @@ -18,7 +18,7 @@ import SwiftProtobuf /// Provides the necessary wiring to bridge from closures/callbacks to Swift's `AsyncStream` /// to work with async/await. @available(iOS 13, *) -final class BidirectionalAsyncStream< +class BidirectionalAsyncStream< Input: ProtobufMessage, Output: ProtobufMessage >: @unchecked Sendable { /// The underlying async stream that will be exposed to the consumer. @@ -103,7 +103,3 @@ extension BidirectionalAsyncStream: BidirectionalAsyncStreamInterface { self.requestCallbacks?.cancel() } } - -// Conforms to the client-only interface since it matches exactly and the implementation is internal -@available(iOS 13, *) -extension BidirectionalAsyncStream: ClientOnlyAsyncStreamInterface {} diff --git a/Libraries/Connect/Internal/Streaming/BidirectionalStream.swift b/Libraries/Connect/Internal/Streaming/BidirectionalStream.swift index 95195b72..29a95d1a 100644 --- a/Libraries/Connect/Internal/Streaming/BidirectionalStream.swift +++ b/Libraries/Connect/Internal/Streaming/BidirectionalStream.swift @@ -40,6 +40,3 @@ extension BidirectionalStream: BidirectionalStreamInterface { self.requestCallbacks.cancel() } } - -// Conforms to the client-only interface since it matches exactly and the implementation is internal -extension BidirectionalStream: ClientOnlyStreamInterface {} diff --git a/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift b/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift new file mode 100644 index 00000000..1abfa8bd --- /dev/null +++ b/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift @@ -0,0 +1,41 @@ +// Copyright 2022-2024 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import Foundation + +/// Concrete implementation of `ClientOnlyAsyncStreamInterface`. +/// Provides the necessary wiring to bridge from closures/callbacks to Swift's `AsyncStream` +/// to work with async/await. +@available(iOS 13, *) +final class ClientOnlyAsyncStream< + Input: ProtobufMessage, Output: ProtobufMessage +>: BidirectionalAsyncStream { + private let receivedMessageCount = Locked(0) + + override func receive(_ result: StreamResult) { + let receivedMessageCount = self.receivedMessageCount.perform { value in + if case .message = result { + value += 1 + } + return value + } + super.receive(result.validatedForClientStream(receivedMessageCount: receivedMessageCount)) + } +} + +extension ClientOnlyAsyncStream: ClientOnlyAsyncStreamInterface { + func closeAndReceive() { + self.close() + } +} diff --git a/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift b/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift new file mode 100644 index 00000000..369c28bb --- /dev/null +++ b/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift @@ -0,0 +1,77 @@ +// Copyright 2022-2024 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import SwiftProtobuf + +/// Concrete implementation of `ClientOnlyStreamInterface`. +final class ClientOnlyStream: @unchecked Sendable { + private let onResult: @Sendable (StreamResult) -> Void + /// Callbacks used to send outbound data and close the stream. + /// Optional because these callbacks are not available until the stream is initialized. + private var requestCallbacks: RequestCallbacks? + private let receivedMessageCount = Locked(0) + + private struct NotConfiguredForSendingError: Swift.Error {} + + init(onResult: @escaping @Sendable (StreamResult) -> Void) { + self.onResult = onResult + } + + /// Enable sending data over this stream by providing a set of request callbacks to route data + /// to the network client. Must be called before calling `send()`. + /// + /// - parameter requestCallbacks: Callbacks to use for sending request data and closing the + /// stream. + /// + /// - returns: This instance of the stream (useful for chaining). + @discardableResult + func configureForSending(with requestCallbacks: RequestCallbacks) -> Self { + self.requestCallbacks = requestCallbacks + return self + } + + /// Send a result to the consumer after doing additional validations for client-only streams. + /// Should be called by the protocol client when a result is received. + /// + /// - parameter result: The new result that was received. + func receive(_ result: StreamResult) { + let receivedMessageCount = self.receivedMessageCount.perform { value in + if case .message = result { + value += 1 + } + return value + } + self.onResult(result.validatedForClientStream(receivedMessageCount: receivedMessageCount)) + } +} + +extension ClientOnlyStream: ClientOnlyStreamInterface { + @discardableResult + func send(_ input: Input) throws -> Self { + guard let sendData = self.requestCallbacks?.sendData else { + throw NotConfiguredForSendingError() + } + + sendData(input) + return self + } + + func closeAndReceive() { + self.requestCallbacks?.sendClose() + } + + func cancel() { + self.requestCallbacks?.cancel() + } +} diff --git a/Libraries/Connect/Internal/Streaming/StreamResult+ClientOnlyStream.swift b/Libraries/Connect/Internal/Streaming/StreamResult+ClientOnlyStream.swift new file mode 100644 index 00000000..89a120f9 --- /dev/null +++ b/Libraries/Connect/Internal/Streaming/StreamResult+ClientOnlyStream.swift @@ -0,0 +1,54 @@ +// Copyright 2022-2024 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import Foundation + +extension StreamResult { + /// Applies some validations which are only relevant for client-only streams. + /// + /// - parameter receivedMessageCount: The number of response messages that have been received so + /// far, including the one being validated. + /// + /// - returns: The validated stream result, which may have been transformed into an error. + func validatedForClientStream(receivedMessageCount: Int) -> Self { + switch self { + case .complete(let code, _, _): + if code == .ok && receivedMessageCount < 1 { + return .complete( + code: .internalError, + error: ConnectError( + code: .unimplemented, message: "unary stream has no messages" + ), + trailers: nil + ) + } else { + return self + } + case .headers: + return self + case .message: + if receivedMessageCount > 1 { + return .complete( + code: .internalError, + error: ConnectError( + code: .unimplemented, message: "unary stream has multiple messages" + ), + trailers: nil + ) + } else { + return self + } + } + } +} diff --git a/Libraries/Connect/Internal/Streaming/URLSessionStream.swift b/Libraries/Connect/Internal/Streaming/URLSessionStream.swift index 80e5dab5..a735a1e0 100644 --- a/Libraries/Connect/Internal/Streaming/URLSessionStream.swift +++ b/Libraries/Connect/Internal/Streaming/URLSessionStream.swift @@ -122,13 +122,7 @@ final class URLSessionStream: NSObject, @unchecked Sendable { self.responseCallbacks.receiveClose( code, [:], - ConnectError( - code: code, - message: error.localizedDescription, - exception: nil, - details: [], - metadata: [:] - ) + ConnectError(code: code, message: error.localizedDescription) ) } else { self.responseCallbacks.receiveClose(.ok, [:], nil) diff --git a/Libraries/Connect/Internal/Utilities/Locked.swift b/Libraries/Connect/Internal/Utilities/Locked.swift index bd20c4e1..6dbf8984 100644 --- a/Libraries/Connect/Internal/Utilities/Locked.swift +++ b/Libraries/Connect/Internal/Utilities/Locked.swift @@ -16,12 +16,12 @@ import Foundation /// Class containing an internal lock which can be used to ensure thread-safe access to an /// underlying value. Conforms to `Sendable`, making it accessible from `@Sendable` closures. -final class Locked: @unchecked Sendable { +final class Locked: @unchecked Sendable { private let lock = Lock() - private var wrappedValue: T + private var wrappedValue: Wrapped /// Thread-safe access to the underlying value. - var value: T { + var value: Wrapped { get { self.lock.perform { self.wrappedValue } } set { self.lock.perform { self.wrappedValue = newValue } } } @@ -29,13 +29,16 @@ final class Locked: @unchecked Sendable { /// Perform an action with the underlying value, potentially updating that value. /// /// - parameter action: Closure to perform with the underlying value. - func perform(action: @escaping (inout T) -> Void) { - self.lock.perform { + /// + /// - returns: The value returned by the closure. + @discardableResult + func perform(action: @escaping (inout Wrapped) -> Result) -> Result { + return self.lock.perform { action(&self.wrappedValue) } } - init(_ value: T) { + init(_ value: Wrapped) { self.wrappedValue = value } } diff --git a/Libraries/Connect/PackageInternal/ConnectError+GRPC.swift b/Libraries/Connect/PackageInternal/ConnectError+GRPC.swift index b603c878..486bf523 100644 --- a/Libraries/Connect/PackageInternal/ConnectError+GRPC.swift +++ b/Libraries/Connect/PackageInternal/ConnectError+GRPC.swift @@ -29,12 +29,8 @@ extension ConnectError { _ headers: Headers?, trailers: Trailers? ) -> (grpcCode: Code, error: ConnectError?) { // "Trailers-only" responses can be sent in the headers or trailers block. - // Check for a valid gRPC status in the headers first, then in the trailers. - guard let grpcCode = headers?.grpcStatus() ?? trailers?.grpcStatus() else { - return (.unknown, ConnectError( - code: .unknown, message: "RPC response missing status", exception: nil, - details: [], metadata: [:] - )) + guard let grpcCode = trailers?.grpcStatus() ?? headers?.grpcStatus() else { + return (.unknown, ConnectError(code: .unknown, message: "RPC response missing status")) } if grpcCode == .ok { diff --git a/Libraries/Connect/PackageInternal/Envelope.swift b/Libraries/Connect/PackageInternal/Envelope.swift index 261ec8db..6edd5b52 100644 --- a/Libraries/Connect/PackageInternal/Envelope.swift +++ b/Libraries/Connect/PackageInternal/Envelope.swift @@ -85,6 +85,16 @@ public enum Envelope { } } + /// Determines whether the specified data contains more than 1 message. + /// + /// - parameter packedData: The packed data to analyze. + /// + /// - returns: True if the data contains more than 1 message. + public static func containsMultipleMessages(_ packedData: Data) -> Bool { + let messageLength = self.messageLength(forPackedData: packedData) + return packedData.count > messageLength + self.prefixLength + } + // MARK: - Internal enum Error: Swift.Error { diff --git a/Libraries/Connect/Public/Implementation/Clients/ProtocolClient.swift b/Libraries/Connect/Public/Implementation/Clients/ProtocolClient.swift index af314215..86574487 100644 --- a/Libraries/Connect/Public/Implementation/Clients/ProtocolClient.swift +++ b/Libraries/Connect/Public/Implementation/Clients/ProtocolClient.swift @@ -183,9 +183,11 @@ extension ProtocolClient: ProtocolClientInterface { headers: Headers, onResult: @escaping @Sendable (StreamResult) -> Void ) -> any ClientOnlyStreamInterface { - return BidirectionalStream(requestCallbacks: self.createRequestCallbacks( - path: path, headers: headers, onResult: onResult - )) + let clientOnly = ClientOnlyStream(onResult: onResult) + let callbacks: RequestCallbacks = self.createRequestCallbacks( + path: path, headers: headers, onResult: { clientOnly.receive($0) } + ) + return clientOnly.configureForSending(with: callbacks) } public func serverOnlyStream< @@ -236,11 +238,11 @@ extension ProtocolClient: ProtocolClientInterface { path: String, headers: Headers ) -> any ClientOnlyAsyncStreamInterface { - let bidirectionalAsync = BidirectionalAsyncStream() + let clientOnlyAsync = ClientOnlyAsyncStream() let callbacks: RequestCallbacks = self.createRequestCallbacks( - path: path, headers: headers, onResult: { bidirectionalAsync.receive($0) } + path: path, headers: headers, onResult: { clientOnlyAsync.receive($0) } ) - return bidirectionalAsync.configureForSending(with: callbacks) + return clientOnlyAsync.configureForSending(with: callbacks) } @available(iOS 13, *) diff --git a/Libraries/Connect/Public/Implementation/Clients/URLSessionHTTPClient.swift b/Libraries/Connect/Public/Implementation/Clients/URLSessionHTTPClient.swift index 551f6d1f..5bf3b8f8 100644 --- a/Libraries/Connect/Public/Implementation/Clients/URLSessionHTTPClient.swift +++ b/Libraries/Connect/Public/Implementation/Clients/URLSessionHTTPClient.swift @@ -67,11 +67,7 @@ open class URLSessionHTTPClient: NSObject, HTTPClientInterface, @unchecked Senda headers: [:], message: data, trailers: [:], - error: ConnectError( - code: code, - message: error.localizedDescription, - exception: error, details: [], metadata: [:] - ), + error: ConnectError(code: code, message: error.localizedDescription), tracingInfo: nil )) } else { @@ -83,7 +79,7 @@ open class URLSessionHTTPClient: NSObject, HTTPClientInterface, @unchecked Senda error: ConnectError( code: .unknown, message: "unexpected response type \(type(of: urlResponse))", - exception: error, details: [], metadata: [:] + exception: error ), tracingInfo: nil )) diff --git a/Libraries/Connect/Public/Interfaces/ConnectError.swift b/Libraries/Connect/Public/Interfaces/ConnectError.swift index d7432930..c935c94a 100644 --- a/Libraries/Connect/Public/Interfaces/ConnectError.swift +++ b/Libraries/Connect/Public/Interfaces/ConnectError.swift @@ -48,7 +48,8 @@ public struct ConnectError: Swift.Error, Sendable { } public init( - code: Code, message: String?, exception: Error?, details: [Detail], metadata: Headers + code: Code, message: String?, + exception: Error? = nil, details: [Detail] = [], metadata: Headers = [:] ) { self.code = code self.message = message @@ -113,8 +114,7 @@ extension ConnectError { guard let source = source else { return .init( - code: code, message: "empty error message from source", exception: nil, - details: [], metadata: metadata + code: code, message: "empty error message from source", metadata: metadata ) } @@ -123,16 +123,13 @@ extension ConnectError { } catch let error { return .init( code: code, message: String(data: source, encoding: .utf8), - exception: error, details: [], metadata: metadata + exception: error, metadata: metadata ) } } public static func canceled() -> Self { - return .init( - code: .canceled, message: "request canceled by client", - exception: nil, details: [], metadata: [:] - ) + return .init(code: .canceled, message: "request canceled by client") } } diff --git a/Libraries/Connect/Public/Interfaces/Streaming/AsyncAwait/ClientOnlyAsyncStreamInterface.swift b/Libraries/Connect/Public/Interfaces/Streaming/AsyncAwait/ClientOnlyAsyncStreamInterface.swift index 134f39db..4a1a4116 100644 --- a/Libraries/Connect/Public/Interfaces/Streaming/AsyncAwait/ClientOnlyAsyncStreamInterface.swift +++ b/Libraries/Connect/Public/Interfaces/Streaming/AsyncAwait/ClientOnlyAsyncStreamInterface.swift @@ -39,8 +39,9 @@ public protocol ClientOnlyAsyncStreamInterface { /// - returns: An `AsyncStream` that contains all outputs/results from the stream. func results() -> AsyncStream> - /// Close the stream. No calls to `send()` are valid after calling `close()`. - func close() + /// Close the stream and await a response. + /// No calls to `send()` are valid after calling `closeAndReceive()`. + func closeAndReceive() /// Cancel the stream and return a canceled code. /// No calls to `send()` are valid after calling `cancel()`. diff --git a/Libraries/Connect/Public/Interfaces/Streaming/Callbacks/ClientOnlyStreamInterface.swift b/Libraries/Connect/Public/Interfaces/Streaming/Callbacks/ClientOnlyStreamInterface.swift index 3a8bd5c0..e1ffc133 100644 --- a/Libraries/Connect/Public/Interfaces/Streaming/Callbacks/ClientOnlyStreamInterface.swift +++ b/Libraries/Connect/Public/Interfaces/Streaming/Callbacks/ClientOnlyStreamInterface.swift @@ -28,8 +28,9 @@ public protocol ClientOnlyStreamInterface { @discardableResult func send(_ input: Input) throws -> Self - /// Close the stream. No calls to `send()` are valid after calling `close()`. - func close() + /// Close the stream and await a response. + /// No calls to `send()` are valid after calling `closeAndReceive()`. + func closeAndReceive() /// Cancel the stream and return a canceled code. /// No calls to `send()` are valid after calling `cancel()`. diff --git a/Libraries/ConnectMocks/MockClientOnlyAsyncStream.swift b/Libraries/ConnectMocks/MockClientOnlyAsyncStream.swift index 07dd0289..692a0b94 100644 --- a/Libraries/ConnectMocks/MockClientOnlyAsyncStream.swift +++ b/Libraries/ConnectMocks/MockClientOnlyAsyncStream.swift @@ -27,6 +27,9 @@ import SwiftProtobuf open class MockClientOnlyAsyncStream< Input: ProtobufMessage, Output: ProtobufMessage ->: MockBidirectionalAsyncStream, - ClientOnlyAsyncStreamInterface, - @unchecked Sendable {} +>: MockBidirectionalAsyncStream, ClientOnlyAsyncStreamInterface, @unchecked Sendable +{ + open func closeAndReceive() { + self.close() + } +} diff --git a/Libraries/ConnectMocks/MockClientOnlyStream.swift b/Libraries/ConnectMocks/MockClientOnlyStream.swift index a1f14431..d14b80ab 100644 --- a/Libraries/ConnectMocks/MockClientOnlyStream.swift +++ b/Libraries/ConnectMocks/MockClientOnlyStream.swift @@ -26,6 +26,8 @@ import SwiftProtobuf open class MockClientOnlyStream< Input: ProtobufMessage, Output: ProtobufMessage ->: MockBidirectionalStream, - ClientOnlyStreamInterface, - @unchecked Sendable {} +>: MockBidirectionalStream, ClientOnlyStreamInterface, @unchecked Sendable { + open func closeAndReceive() { + self.close() + } +} diff --git a/Libraries/ConnectNIO/Internal/Extensions/ConnectError+Extensions.swift b/Libraries/ConnectNIO/Internal/Extensions/ConnectError+Extensions.swift index 66103100..29ea36f2 100644 --- a/Libraries/ConnectNIO/Internal/Extensions/ConnectError+Extensions.swift +++ b/Libraries/ConnectNIO/Internal/Extensions/ConnectError+Extensions.swift @@ -16,12 +16,6 @@ import Connect extension ConnectError { static func deadlineExceeded() -> Self { - return .init( - code: .deadlineExceeded, - message: "timed out", - exception: nil, - details: [], - metadata: [:] - ) + return .init(code: .deadlineExceeded, message: "timed out") } } diff --git a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift index 654d5443..a169fda8 100644 --- a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift +++ b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift @@ -64,14 +64,39 @@ extension GRPCInterceptor: UnaryInterceptor { trailers: response.trailers ) guard grpcCode == .ok, let rawData = response.message, !rawData.isEmpty else { - proceed(HTTPResponse( - code: grpcCode, - headers: response.headers, - message: response.message, - trailers: response.trailers, - error: connectError ?? response.error, - tracingInfo: response.tracingInfo - )) + if response.trailers.grpcStatus() == nil && response.message?.isEmpty == false { + proceed(HTTPResponse( + code: .internalError, + headers: response.headers, + message: response.message, + trailers: response.trailers, + error: ConnectError( + code: .internalError, + message: "unary response message should be followed by trailers" + ), + tracingInfo: response.tracingInfo + )) + } else if grpcCode == .ok { + proceed(HTTPResponse( + code: .unimplemented, + headers: response.headers, + message: response.message, + trailers: response.trailers, + error: ConnectError( + code: .unimplemented, message: "unary response has no message" + ), + tracingInfo: response.tracingInfo + )) + } else { + proceed(HTTPResponse( + code: grpcCode, + headers: response.headers, + message: response.message, + trailers: response.trailers, + error: connectError ?? response.error, + tracingInfo: response.tracingInfo + )) + } return } @@ -80,6 +105,20 @@ extension GRPCInterceptor: UnaryInterceptor { .first .flatMap { self.config.responseCompressionPool(forName: $0) } do { + guard Envelope.containsMultipleMessages(rawData) == false else { + proceed(HTTPResponse( + code: .unimplemented, + headers: response.headers, + message: nil, + trailers: response.trailers, + error: ConnectError( + code: .unimplemented, message: "unary response has multiple messages" + ), + tracingInfo: response.tracingInfo + )) + return + } + let messageData = try Envelope.unpackMessage( rawData, compressionPool: compressionPool ).unpacked @@ -140,9 +179,12 @@ extension GRPCInterceptor: StreamInterceptor { let responseCompressionPool = self.streamResponseHeaders.value?[ HeaderConstants.grpcContentEncoding ]?.first.flatMap { self.config.responseCompressionPool(forName: $0) } - proceed(.message(try Envelope.unpackMessage( + let unpackedMessage = try Envelope.unpackMessage( rawData, compressionPool: responseCompressionPool - ).unpacked)) + ).unpacked + if !unpackedMessage.isEmpty { + proceed(.message(unpackedMessage)) + } } catch let error { // TODO: Close the stream here? proceed(.complete(code: .unknown, error: error, trailers: nil)) diff --git a/Tests/ConformanceClient/InvocationConfigs/nio-opt-outs.txt b/Tests/ConformanceClient/InvocationConfigs/nio-opt-outs.txt deleted file mode 100644 index 260f45de..00000000 --- a/Tests/ConformanceClient/InvocationConfigs/nio-opt-outs.txt +++ /dev/null @@ -1,26 +0,0 @@ -Connect Unexpected Responses/HTTPVersion:2/TLS:false/client-stream/multiple-responses -Connect Unexpected Responses/HTTPVersion:2/TLS:false/client-stream/ok-but-no-response -Connect Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-codec -Connect Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-compressed-message -Connect Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-compression -Connect Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-content-type -Connect Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-stream-codec -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/client-stream/multiple-responses -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/client-stream/ok-but-no-response -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/trailers-only/ignore-header-if-body-present -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/trailers-only/ignore-header-if-trailer-present -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/unary/multiple-responses -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/unary/ok-but-no-response -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-codec -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-compressed-message -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-compression -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/trailers-in-body/client-stream/multiple-responses -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/trailers-in-body/client-stream/ok-but-no-response -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/trailers-in-body/unary/multiple-responses -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/trailers-in-body/unary/ok-but-no-response -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/trailers-only/client-stream/ok-but-no-response -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/trailers-only/ignore-header-if-body-present -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/trailers-only/unary/ok-but-no-response -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-codec -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-compressed-message -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-compression diff --git a/Tests/ConformanceClient/InvocationConfigs/urlsession-opt-outs.txt b/Tests/ConformanceClient/InvocationConfigs/urlsession-opt-outs.txt deleted file mode 100644 index f184a9d0..00000000 --- a/Tests/ConformanceClient/InvocationConfigs/urlsession-opt-outs.txt +++ /dev/null @@ -1,17 +0,0 @@ -Connect Unexpected Responses/HTTPVersion:1/TLS:false/client-stream/multiple-responses -Connect Unexpected Responses/HTTPVersion:1/TLS:false/client-stream/ok-but-no-response -Connect Unexpected Responses/HTTPVersion:1/TLS:false/unexpected-codec -Connect Unexpected Responses/HTTPVersion:1/TLS:false/unexpected-compressed-message -Connect Unexpected Responses/HTTPVersion:1/TLS:false/unexpected-compression -Connect Unexpected Responses/HTTPVersion:1/TLS:false/unexpected-content-type -Connect Unexpected Responses/HTTPVersion:1/TLS:false/unexpected-stream-codec -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/trailers-in-body/client-stream/multiple-responses -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/trailers-in-body/client-stream/ok-but-no-response -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/trailers-in-body/unary/multiple-responses -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/trailers-in-body/unary/ok-but-no-response -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/trailers-only/client-stream/ok-but-no-response -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/trailers-only/ignore-header-if-body-present -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/trailers-only/unary/ok-but-no-response -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/unexpected-codec -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/unexpected-compressed-message -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/unexpected-compression diff --git a/Tests/ConformanceClient/Sources/ConformanceInvoker.swift b/Tests/ConformanceClient/Sources/ConformanceInvoker.swift index 2b168b7e..a103a21f 100644 --- a/Tests/ConformanceClient/Sources/ConformanceInvoker.swift +++ b/Tests/ConformanceClient/Sources/ConformanceInvoker.swift @@ -272,9 +272,9 @@ final class ConformanceInvoker { stream.cancel() } case .afterNumResponses: // Does not apply to client-only streams. - stream.close() + stream.closeAndReceive() case .none: - stream.close() + stream.closeAndReceive() } var conformanceResult = ConformanceResult() From 993e365151141e20ad953adbbda8c8fc0cd2e843 Mon Sep 17 00:00:00 2001 From: Michael Rebello Date: Tue, 28 May 2024 18:04:22 -0700 Subject: [PATCH 02/10] fix Signed-off-by: Michael Rebello --- .../Interceptors/ConnectInterceptor.swift | 4 +- .../Interceptors/GRPCWebInterceptor.swift | 10 ++--- .../ConnectNIO/Internal/GRPCInterceptor.swift | 40 +++++++++++++++++++ 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift b/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift index 369e6b32..9387b9b1 100644 --- a/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift +++ b/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift @@ -85,7 +85,9 @@ extension ConnectInterceptor: UnaryInterceptor { let finalResponse: HTTPResponse let contentType = response.headers[HeaderConstants.contentType]?.first ?? "" - if response.code == .ok && !contentType.hasPrefix("application/\(config.codec.name())") { + if response.code == .ok + && !contentType.hasPrefix("application/\(self.config.codec.name())") + { // If content-type looks like it could be an RPC server's response, consider // this an internal error. let code: Code = contentType.hasPrefix("application/") ? .internalError : .unknown diff --git a/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift b/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift index 2e22f1c9..2ff9bc70 100644 --- a/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift +++ b/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift @@ -84,9 +84,7 @@ extension GRPCWebInterceptor: UnaryInterceptor { } let contentType = response.headers[HeaderConstants.contentType]?.first ?? "" - if response.code == .ok - && !self.contentTypeIsExpectedGRPCWeb(contentType, expectedCodec: config.codec) - { + if response.code == .ok && !self.contentTypeIsExpectedGRPCWeb(contentType) { // If content-type looks like it could be a gRPC server's response, consider // this an internal error. let code: Code = self.contentTypeIsGRPCWeb(contentType) ? .internalError : .unknown @@ -173,7 +171,7 @@ extension GRPCWebInterceptor: StreamInterceptor { switch result { case .headers(let headers): let contentType = headers[HeaderConstants.contentType]?.first ?? "" - if !self.contentTypeIsExpectedGRPCWeb(contentType, expectedCodec: self.config.codec) { + if !self.contentTypeIsExpectedGRPCWeb(contentType) { // If content-type looks like it could be a gRPC server's response, consider // this an internal error. let code: Code = self.contentTypeIsGRPCWeb(contentType) ? .internalError : .unknown @@ -240,8 +238,8 @@ extension GRPCWebInterceptor: StreamInterceptor { || contentType.hasPrefix("application/grpc-web+") } - private func contentTypeIsExpectedGRPCWeb(_ contentType: String, expectedCodec: Codec) -> Bool { - let codecName = expectedCodec.name() + private func contentTypeIsExpectedGRPCWeb(_ contentType: String) -> Bool { + let codecName = self.config.codec.name() return (codecName == "proto" && contentType == "application/grpc-web") || contentType == "application/grpc-web+\(codecName)" } diff --git a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift index a169fda8..c4e726a7 100644 --- a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift +++ b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift @@ -59,6 +59,19 @@ extension GRPCInterceptor: UnaryInterceptor { return } + let contentType = response.headers[HeaderConstants.contentType]?.first ?? "" + if !self.contentTypeIsExpectedGRPC(contentType) { + // If content-type looks like it could be a gRPC server's response, consider + // this an internal error. + let code: Code = self.contentTypeIsGRPC(contentType) ? .internalError : .unknown + proceed(HTTPResponse( + code: code, headers: response.headers, message: nil, trailers: response.trailers, + error: ConnectError(code: code, message: "unexpected content-type: \(contentType)"), + tracingInfo: response.tracingInfo + )) + return + } + let (grpcCode, connectError) = ConnectError.parseGRPCHeaders( response.headers, trailers: response.trailers @@ -172,6 +185,20 @@ extension GRPCInterceptor: StreamInterceptor { switch result { case .headers(let headers): self.streamResponseHeaders.value = headers + + let contentType = headers[HeaderConstants.contentType]?.first ?? "" + if !self.contentTypeIsExpectedGRPC(contentType) { + // If content-type looks like it could be a gRPC server's response, consider + // this an internal error. + let code: Code = self.contentTypeIsGRPC(contentType) ? .internalError : .unknown + proceed(.complete( + code: code, error: ConnectError( + code: code, message: "unexpected content-type: \(contentType)" + ), trailers: headers + )) + return + } + proceed(result) case .message(let rawData): @@ -216,6 +243,19 @@ extension GRPCInterceptor: StreamInterceptor { } } } + + // MARK: - Private + + private func contentTypeIsGRPC(_ contentType: String) -> Bool { + return contentType == "application/grpc" + || contentType.hasPrefix("application/grpc+") + } + + private func contentTypeIsExpectedGRPC(_ contentType: String) -> Bool { + let codecName = self.config.codec.name() + return (codecName == "proto" && contentType == "application/grpc") + || contentType == "application/grpc+\(codecName)" + } } private final class Locked: @unchecked Sendable { From d03fc6e9cc70f83434614d4e66d39acfe58ff2f9 Mon Sep 17 00:00:00 2001 From: Michael Rebello Date: Tue, 28 May 2024 18:55:48 -0700 Subject: [PATCH 03/10] fix Signed-off-by: Michael Rebello --- .../Internal/Interceptors/ConnectInterceptor.swift | 12 ++++++++++++ Libraries/Connect/PackageInternal/Envelope.swift | 10 ++++++++++ .../ConnectTests/EnvelopeTests.swift | 4 ++++ 3 files changed, 26 insertions(+) diff --git a/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift b/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift index 9387b9b1..f600004a 100644 --- a/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift +++ b/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift @@ -191,6 +191,18 @@ extension ConnectInterceptor: StreamInterceptor { let responseCompressionPool = self.streamResponseHeaders.value?[ HeaderConstants.connectStreamingContentEncoding ]?.first.flatMap { self.config.responseCompressionPool(forName: $0) } + if responseCompressionPool == nil && Envelope.isCompressed(data) { + proceed(.complete( + code: .internalError, + error: ConnectError( + code: .internalError, + message: "unexpectedly received compressed message" + ), + trailers: [:] + )) + return + } + let (headerByte, message) = try Envelope.unpackMessage( data, compressionPool: responseCompressionPool ) diff --git a/Libraries/Connect/PackageInternal/Envelope.swift b/Libraries/Connect/PackageInternal/Envelope.swift index 6edd5b52..a0421964 100644 --- a/Libraries/Connect/PackageInternal/Envelope.swift +++ b/Libraries/Connect/PackageInternal/Envelope.swift @@ -95,6 +95,16 @@ public enum Envelope { return packedData.count > messageLength + self.prefixLength } + /// Determines whether the specified data is compressed + /// by assessing its compression prefix flag. + /// + /// - parameter packedData: The packed data to analyze. + /// + /// - returns: True if the data is compressed. + public static func isCompressed(_ packedData: Data) -> Bool { + return !packedData.isEmpty && (0b00000001 & packedData[0] != 0) + } + // MARK: - Internal enum Error: Swift.Error { diff --git a/Tests/UnitTests/ConnectLibraryTests/ConnectTests/EnvelopeTests.swift b/Tests/UnitTests/ConnectLibraryTests/ConnectTests/EnvelopeTests.swift index 3e09378d..6fc0ee22 100644 --- a/Tests/UnitTests/ConnectLibraryTests/ConnectTests/EnvelopeTests.swift +++ b/Tests/UnitTests/ConnectLibraryTests/ConnectTests/EnvelopeTests.swift @@ -23,6 +23,7 @@ final class EnvelopeTests: XCTestCase { ) let compressed = try GzipCompressionPool().compress(data: originalData) XCTAssertEqual(packed[0], 1) // Compression flag = true + XCTAssertTrue(Envelope.isCompressed(packed)) XCTAssertEqual(Envelope.messageLength(forPackedData: packed), compressed.count) XCTAssertEqual(packed[5...], compressed) // Post-prefix data should match compressed value @@ -35,6 +36,7 @@ final class EnvelopeTests: XCTestCase { let originalData = Data(repeating: 0xa, count: 50) let packed = Envelope.packMessage(originalData, using: nil) XCTAssertEqual(packed[0], 0) // Compression flag = false + XCTAssertFalse(Envelope.isCompressed(packed)) XCTAssertEqual(Envelope.messageLength(forPackedData: packed), originalData.count) XCTAssertEqual(packed[5...], originalData) // Post-prefix data should match compressed value @@ -50,6 +52,7 @@ final class EnvelopeTests: XCTestCase { originalData, using: .init(minBytes: 100, pool: GzipCompressionPool()) ) XCTAssertEqual(packed[0], 0) // Compression flag = false + XCTAssertFalse(Envelope.isCompressed(packed)) XCTAssertEqual(Envelope.messageLength(forPackedData: packed), originalData.count) XCTAssertEqual(packed[5...], originalData) // Post-prefix data should match compressed value @@ -66,6 +69,7 @@ final class EnvelopeTests: XCTestCase { ) let compressed = try GzipCompressionPool().compress(data: originalData) XCTAssertEqual(packed[0], 1) // Compression flag = true + XCTAssertTrue(Envelope.isCompressed(packed)) XCTAssertEqual(Envelope.messageLength(forPackedData: packed), compressed.count) XCTAssertEqual(packed[5...], compressed) // Post-prefix data should match compressed value From 1807343058803955bfbdedd001c1e0104ea34865 Mon Sep 17 00:00:00 2001 From: Michael Rebello Date: Tue, 28 May 2024 19:14:55 -0700 Subject: [PATCH 04/10] fix Signed-off-by: Michael Rebello --- .../Interceptors/ConnectInterceptor.swift | 7 ++----- .../Interceptors/GRPCWebInterceptor.swift | 10 ++++++++++ .../ConnectNIO/Internal/GRPCInterceptor.swift | 19 +++++++++++++++++++ Makefile | 4 ++-- 4 files changed, 33 insertions(+), 7 deletions(-) diff --git a/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift b/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift index f600004a..26aac99e 100644 --- a/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift +++ b/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift @@ -194,11 +194,8 @@ extension ConnectInterceptor: StreamInterceptor { if responseCompressionPool == nil && Envelope.isCompressed(data) { proceed(.complete( code: .internalError, - error: ConnectError( - code: .internalError, - message: "unexpectedly received compressed message" - ), - trailers: [:] + error: ConnectError(code: .internalError, message: "unexpected encoding"), + trailers: nil )) return } diff --git a/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift b/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift index 2ff9bc70..05e1f3a5 100644 --- a/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift +++ b/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift @@ -99,6 +99,16 @@ extension GRPCWebInterceptor: UnaryInterceptor { let compressionPool = response.headers[HeaderConstants.grpcContentEncoding]? .first .flatMap { self.config.responseCompressionPool(forName: $0) } + if compressionPool == nil && Envelope.isCompressed(responseData) { + proceed(HTTPResponse( + code: .internalError, headers: response.headers, message: nil, + trailers: response.trailers, + error: ConnectError(code: .internalError, message: "unexpected encoding"), + tracingInfo: response.tracingInfo + )) + return + } + do { // gRPC Web returns data in 2 chunks (either/both of which may be compressed): // 1. OPTIONAL (when not trailers-only): The (headers and length prefixed) diff --git a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift index c4e726a7..c4d26894 100644 --- a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift +++ b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift @@ -117,6 +117,16 @@ extension GRPCInterceptor: UnaryInterceptor { .headers[HeaderConstants.grpcContentEncoding]? .first .flatMap { self.config.responseCompressionPool(forName: $0) } + if compressionPool == nil && Envelope.isCompressed(rawData) { + proceed(HTTPResponse( + code: .internalError, headers: response.headers, message: nil, + trailers: response.trailers, + error: ConnectError(code: .internalError, message: "unexpected encoding"), + tracingInfo: response.tracingInfo + )) + return + } + do { guard Envelope.containsMultipleMessages(rawData) == false else { proceed(HTTPResponse( @@ -206,6 +216,15 @@ extension GRPCInterceptor: StreamInterceptor { let responseCompressionPool = self.streamResponseHeaders.value?[ HeaderConstants.grpcContentEncoding ]?.first.flatMap { self.config.responseCompressionPool(forName: $0) } + if responseCompressionPool == nil && Envelope.isCompressed(rawData) { + proceed(.complete( + code: .internalError, + error: ConnectError(code: .internalError, message: "unexpected encoding"), + trailers: [:] + )) + return + } + let unpackedMessage = try Envelope.unpackMessage( rawData, compressionPool: responseCompressionPool ).unpacked diff --git a/Makefile b/Makefile index c3fb7b61..c077052d 100644 --- a/Makefile +++ b/Makefile @@ -78,8 +78,8 @@ $(BIN)/license-headers: Makefile testconformance: ## Run all conformance tests swift build -c release --product ConnectConformanceClient mv ./.build/release/ConnectConformanceClient $(BIN) - PATH="$(abspath $(BIN)):$(PATH)" connectconformance --trace --conf ./Tests/ConformanceClient/InvocationConfigs/urlsession.yaml --known-failing @./Tests/ConformanceClient/InvocationConfigs/urlsession-opt-outs.txt --mode client $(BIN)/ConnectConformanceClient httpclient=urlsession - PATH="$(abspath $(BIN)):$(PATH)" connectconformance --trace --conf ./Tests/ConformanceClient/InvocationConfigs/nio.yaml --known-failing @./Tests/ConformanceClient/InvocationConfigs/nio-opt-outs.txt --mode client $(BIN)/ConnectConformanceClient httpclient=nio + PATH="$(abspath $(BIN)):$(PATH)" connectconformance --trace --conf ./Tests/ConformanceClient/InvocationConfigs/urlsession.yaml --mode client $(BIN)/ConnectConformanceClient httpclient=urlsession + PATH="$(abspath $(BIN)):$(PATH)" connectconformance --trace --conf ./Tests/ConformanceClient/InvocationConfigs/nio.yaml --mode client $(BIN)/ConnectConformanceClient httpclient=nio .PHONY: testunit testunit: ## Run all unit tests From 7946f12e828600707e26e855713a9c438c5c182e Mon Sep 17 00:00:00 2001 From: Michael Rebello Date: Tue, 28 May 2024 19:35:31 -0700 Subject: [PATCH 05/10] update Signed-off-by: Michael Rebello --- .../Internal/Interceptors/ConnectInterceptor.swift | 9 ++++----- .../Internal/Interceptors/GRPCWebInterceptor.swift | 4 +++- .../Internal/Streaming/ClientOnlyStream.swift | 2 +- Libraries/ConnectNIO/Internal/GRPCInterceptor.swift | 12 ++++++------ 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift b/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift index 26aac99e..ae51ac39 100644 --- a/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift +++ b/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift @@ -85,8 +85,7 @@ extension ConnectInterceptor: UnaryInterceptor { let finalResponse: HTTPResponse let contentType = response.headers[HeaderConstants.contentType]?.first ?? "" - if response.code == .ok - && !contentType.hasPrefix("application/\(self.config.codec.name())") + if response.code == .ok && !contentType.hasPrefix("application/\(self.config.codec.name())") { // If content-type looks like it could be an RPC server's response, consider // this an internal error. @@ -193,9 +192,9 @@ extension ConnectInterceptor: StreamInterceptor { ]?.first.flatMap { self.config.responseCompressionPool(forName: $0) } if responseCompressionPool == nil && Envelope.isCompressed(data) { proceed(.complete( - code: .internalError, - error: ConnectError(code: .internalError, message: "unexpected encoding"), - trailers: nil + code: .internalError, error: ConnectError( + code: .internalError, message: "received unexpected compressed message" + ), trailers: nil )) return } diff --git a/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift b/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift index 05e1f3a5..7dff6061 100644 --- a/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift +++ b/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift @@ -103,7 +103,9 @@ extension GRPCWebInterceptor: UnaryInterceptor { proceed(HTTPResponse( code: .internalError, headers: response.headers, message: nil, trailers: response.trailers, - error: ConnectError(code: .internalError, message: "unexpected encoding"), + error: ConnectError( + code: .internalError, message: "received unexpected compressed message" + ), tracingInfo: response.tracingInfo )) return diff --git a/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift b/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift index 369c28bb..029ec754 100644 --- a/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift +++ b/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift @@ -17,10 +17,10 @@ import SwiftProtobuf /// Concrete implementation of `ClientOnlyStreamInterface`. final class ClientOnlyStream: @unchecked Sendable { private let onResult: @Sendable (StreamResult) -> Void + private let receivedMessageCount = Locked(0) /// Callbacks used to send outbound data and close the stream. /// Optional because these callbacks are not available until the stream is initialized. private var requestCallbacks: RequestCallbacks? - private let receivedMessageCount = Locked(0) private struct NotConfiguredForSendingError: Swift.Error {} diff --git a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift index c4d26894..9f554f5d 100644 --- a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift +++ b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift @@ -120,9 +120,9 @@ extension GRPCInterceptor: UnaryInterceptor { if compressionPool == nil && Envelope.isCompressed(rawData) { proceed(HTTPResponse( code: .internalError, headers: response.headers, message: nil, - trailers: response.trailers, - error: ConnectError(code: .internalError, message: "unexpected encoding"), - tracingInfo: response.tracingInfo + trailers: response.trailers, error: ConnectError( + code: .internalError, message: "received unexpected compressed message" + ), tracingInfo: response.tracingInfo )) return } @@ -218,9 +218,9 @@ extension GRPCInterceptor: StreamInterceptor { ]?.first.flatMap { self.config.responseCompressionPool(forName: $0) } if responseCompressionPool == nil && Envelope.isCompressed(rawData) { proceed(.complete( - code: .internalError, - error: ConnectError(code: .internalError, message: "unexpected encoding"), - trailers: [:] + code: .internalError, error: ConnectError( + code: .internalError, message: "received unexpected compressed message" + ), trailers: [:] )) return } From 3118c1abcde78aa52f31872624e23deb8d3fea07 Mon Sep 17 00:00:00 2001 From: Michael Rebello Date: Tue, 28 May 2024 20:01:00 -0700 Subject: [PATCH 06/10] updates Signed-off-by: Michael Rebello --- Examples/ElizaCocoaPodsApp/Podfile.lock | 2 +- .../Interceptors/GRPCWebInterceptor.swift | 14 +++------- .../Streaming/BidirectionalAsyncStream.swift | 3 +++ .../Streaming/ClientOnlyAsyncStream.swift | 4 +++ .../Internal/Streaming/ClientOnlyStream.swift | 3 +++ .../StreamResult+ClientOnlyStream.swift | 16 ++++++------ .../ConnectNIO/Internal/GRPCInterceptor.swift | 26 +++++++++---------- 7 files changed, 35 insertions(+), 33 deletions(-) diff --git a/Examples/ElizaCocoaPodsApp/Podfile.lock b/Examples/ElizaCocoaPodsApp/Podfile.lock index 727fcb34..51daf1d2 100644 --- a/Examples/ElizaCocoaPodsApp/Podfile.lock +++ b/Examples/ElizaCocoaPodsApp/Podfile.lock @@ -20,4 +20,4 @@ SPEC CHECKSUMS: PODFILE CHECKSUM: b598f373a6ab5add976b09c2ac79029bf2200d48 -COCOAPODS: 1.13.0 +COCOAPODS: 1.15.2 diff --git a/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift b/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift index 7dff6061..e331187d 100644 --- a/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift +++ b/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift @@ -76,7 +76,9 @@ extension GRPCWebInterceptor: UnaryInterceptor { headers: response.headers, message: response.message, trailers: response.trailers, - error: ConnectError.unaryResponseHasNoMessage(), + error: ConnectError( + code: .unimplemented, message: "unary response has no message" + ), tracingInfo: response.tracingInfo )) } @@ -303,7 +305,7 @@ private extension HTTPResponse { headers: self.headers, message: nil, trailers: trailers, - error: ConnectError.unaryResponseHasNoMessage(), + error: ConnectError(code: .unimplemented, message: "unary response has no message"), tracingInfo: self.tracingInfo ) } else { @@ -318,11 +320,3 @@ private extension HTTPResponse { } } } - -private extension ConnectError { - static func unaryResponseHasNoMessage() -> Self { - return ConnectError( - code: .unimplemented, message: "unary response has no message" - ) - } -} diff --git a/Libraries/Connect/Internal/Streaming/BidirectionalAsyncStream.swift b/Libraries/Connect/Internal/Streaming/BidirectionalAsyncStream.swift index 2e3e9d8d..f2213799 100644 --- a/Libraries/Connect/Internal/Streaming/BidirectionalAsyncStream.swift +++ b/Libraries/Connect/Internal/Streaming/BidirectionalAsyncStream.swift @@ -17,6 +17,9 @@ import SwiftProtobuf /// Concrete implementation of `BidirectionalAsyncStreamInterface`. /// Provides the necessary wiring to bridge from closures/callbacks to Swift's `AsyncStream` /// to work with async/await. +/// +/// If the library removes callback support in favor of only supporting async/await in the future, +/// this class can be simplified. @available(iOS 13, *) class BidirectionalAsyncStream< Input: ProtobufMessage, Output: ProtobufMessage diff --git a/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift b/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift index 1abfa8bd..7a60a671 100644 --- a/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift +++ b/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift @@ -17,6 +17,9 @@ import Foundation /// Concrete implementation of `ClientOnlyAsyncStreamInterface`. /// Provides the necessary wiring to bridge from closures/callbacks to Swift's `AsyncStream` /// to work with async/await. +/// +/// This subclasses `BidirectionalAsyncStream` since its behavior is purely additive (it overlays +/// some additional validation) and both types are internal to the package, not public. @available(iOS 13, *) final class ClientOnlyAsyncStream< Input: ProtobufMessage, Output: ProtobufMessage @@ -34,6 +37,7 @@ final class ClientOnlyAsyncStream< } } +@available(iOS 13, *) extension ClientOnlyAsyncStream: ClientOnlyAsyncStreamInterface { func closeAndReceive() { self.close() diff --git a/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift b/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift index 029ec754..acdf051c 100644 --- a/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift +++ b/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift @@ -15,6 +15,9 @@ import SwiftProtobuf /// Concrete implementation of `ClientOnlyStreamInterface`. +/// +/// The complexity around configuring callbacks on this type is an artifact of the library +/// supporting both callbacks and async/await. This is internal to the package, and not public. final class ClientOnlyStream: @unchecked Sendable { private let onResult: @Sendable (StreamResult) -> Void private let receivedMessageCount = Locked(0) diff --git a/Libraries/Connect/Internal/Streaming/StreamResult+ClientOnlyStream.swift b/Libraries/Connect/Internal/Streaming/StreamResult+ClientOnlyStream.swift index 89a120f9..c0f7d2a0 100644 --- a/Libraries/Connect/Internal/Streaming/StreamResult+ClientOnlyStream.swift +++ b/Libraries/Connect/Internal/Streaming/StreamResult+ClientOnlyStream.swift @@ -23,26 +23,26 @@ extension StreamResult { /// - returns: The validated stream result, which may have been transformed into an error. func validatedForClientStream(receivedMessageCount: Int) -> Self { switch self { - case .complete(let code, _, _): - if code == .ok && receivedMessageCount < 1 { + case .headers: + return self + case .message: + if receivedMessageCount > 1 { return .complete( code: .internalError, error: ConnectError( - code: .unimplemented, message: "unary stream has no messages" + code: .unimplemented, message: "unary stream has multiple messages" ), trailers: nil ) } else { return self } - case .headers: - return self - case .message: - if receivedMessageCount > 1 { + case .complete(let code, _, _): + if code == .ok && receivedMessageCount < 1 { return .complete( code: .internalError, error: ConnectError( - code: .unimplemented, message: "unary stream has multiple messages" + code: .unimplemented, message: "unary stream has no messages" ), trailers: nil ) diff --git a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift index 9f554f5d..8a534f24 100644 --- a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift +++ b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift @@ -125,23 +125,21 @@ extension GRPCInterceptor: UnaryInterceptor { ), tracingInfo: response.tracingInfo )) return + } else if Envelope.containsMultipleMessages(rawData) { + proceed(HTTPResponse( + code: .unimplemented, + headers: response.headers, + message: nil, + trailers: response.trailers, + error: ConnectError( + code: .unimplemented, message: "unary response has multiple messages" + ), + tracingInfo: response.tracingInfo + )) + return } do { - guard Envelope.containsMultipleMessages(rawData) == false else { - proceed(HTTPResponse( - code: .unimplemented, - headers: response.headers, - message: nil, - trailers: response.trailers, - error: ConnectError( - code: .unimplemented, message: "unary response has multiple messages" - ), - tracingInfo: response.tracingInfo - )) - return - } - let messageData = try Envelope.unpackMessage( rawData, compressionPool: compressionPool ).unpacked From ba9641af4ff7f1be03588b5db5f819afa02fd014 Mon Sep 17 00:00:00 2001 From: Michael Rebello Date: Wed, 29 May 2024 17:39:22 -0700 Subject: [PATCH 07/10] code review Signed-off-by: Michael Rebello --- .../Interceptors/GRPCWebInterceptor.swift | 6 ++- .../Streaming/BidirectionalAsyncStream.swift | 6 +-- .../Streaming/BidirectionalStream.swift | 2 +- .../Streaming/ClientOnlyAsyncStream.swift | 8 ++-- .../Internal/Streaming/ClientOnlyStream.swift | 6 +-- .../Streaming/ServerOnlyAsyncStream.swift | 2 +- .../Internal/Streaming/ServerOnlyStream.swift | 2 +- .../PackageInternal/ConnectError+GRPC.swift | 4 +- .../Connect/PackageInternal/Envelope.swift | 40 +++++++------------ .../PackageInternal/Headers+GRPC.swift | 4 +- .../PackageInternal/Trailers+gRPC.swift | 4 +- .../Clients/ProtocolClient.swift | 10 +++-- .../ConnectNIO/Internal/GRPCInterceptor.swift | 9 ++++- 13 files changed, 53 insertions(+), 50 deletions(-) diff --git a/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift b/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift index e331187d..b81cdbb2 100644 --- a/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift +++ b/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift @@ -63,7 +63,8 @@ extension GRPCWebInterceptor: UnaryInterceptor { ) if grpcCode != .ok || connectError != nil { proceed(HTTPResponse( - code: grpcCode, + // Rewrite the gRPC code if it is "ok" but `connectError` is non-nil. + code: grpcCode == .ok ? .unknown : grpcCode, headers: response.headers, message: response.message, trailers: response.trailers, @@ -292,7 +293,8 @@ private extension HTTPResponse { let (grpcCode, error) = ConnectError.parseGRPCHeaders(self.headers, trailers: trailers) if grpcCode != .ok || error != nil { return HTTPResponse( - code: grpcCode, + // Rewrite the gRPC code if it is "ok" but `connectError` is non-nil. + code: grpcCode == .ok ? .unknown : grpcCode, headers: self.headers, message: nil, trailers: trailers, diff --git a/Libraries/Connect/Internal/Streaming/BidirectionalAsyncStream.swift b/Libraries/Connect/Internal/Streaming/BidirectionalAsyncStream.swift index f2213799..f0f2d53c 100644 --- a/Libraries/Connect/Internal/Streaming/BidirectionalAsyncStream.swift +++ b/Libraries/Connect/Internal/Streaming/BidirectionalAsyncStream.swift @@ -14,7 +14,7 @@ import SwiftProtobuf -/// Concrete implementation of `BidirectionalAsyncStreamInterface`. +/// Concrete **internal** implementation of `BidirectionalAsyncStreamInterface`. /// Provides the necessary wiring to bridge from closures/callbacks to Swift's `AsyncStream` /// to work with async/await. /// @@ -74,10 +74,10 @@ class BidirectionalAsyncStream< } /// Send a result to the consumer over the `results()` `AsyncStream`. - /// Should be called by the protocol client when a result is received. + /// Should be called by the protocol client when a result is received from the network. /// /// - parameter result: The new result that was received. - func receive(_ result: StreamResult) { + func handleResultFromServer(_ result: StreamResult) { self.receiveResult(result) } } diff --git a/Libraries/Connect/Internal/Streaming/BidirectionalStream.swift b/Libraries/Connect/Internal/Streaming/BidirectionalStream.swift index 29a95d1a..1bf1aa6f 100644 --- a/Libraries/Connect/Internal/Streaming/BidirectionalStream.swift +++ b/Libraries/Connect/Internal/Streaming/BidirectionalStream.swift @@ -14,7 +14,7 @@ import SwiftProtobuf -/// Concrete implementation of `BidirectionalStreamInterface`. +/// Concrete **internal** implementation of `BidirectionalStreamInterface`. final class BidirectionalStream: Sendable { private let requestCallbacks: RequestCallbacks diff --git a/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift b/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift index 7a60a671..28311d1a 100644 --- a/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift +++ b/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift @@ -14,7 +14,7 @@ import Foundation -/// Concrete implementation of `ClientOnlyAsyncStreamInterface`. +/// Concrete **internal** implementation of `ClientOnlyAsyncStreamInterface`. /// Provides the necessary wiring to bridge from closures/callbacks to Swift's `AsyncStream` /// to work with async/await. /// @@ -26,14 +26,16 @@ final class ClientOnlyAsyncStream< >: BidirectionalAsyncStream { private let receivedMessageCount = Locked(0) - override func receive(_ result: StreamResult) { + override func handleResultFromServer(_ result: StreamResult) { let receivedMessageCount = self.receivedMessageCount.perform { value in if case .message = result { value += 1 } return value } - super.receive(result.validatedForClientStream(receivedMessageCount: receivedMessageCount)) + super.handleResultFromServer( + result.validatedForClientStream(receivedMessageCount: receivedMessageCount) + ) } } diff --git a/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift b/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift index acdf051c..3a14bce5 100644 --- a/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift +++ b/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift @@ -14,7 +14,7 @@ import SwiftProtobuf -/// Concrete implementation of `ClientOnlyStreamInterface`. +/// Concrete **internal** implementation of `ClientOnlyStreamInterface`. /// /// The complexity around configuring callbacks on this type is an artifact of the library /// supporting both callbacks and async/await. This is internal to the package, and not public. @@ -45,10 +45,10 @@ final class ClientOnlyStream: @ } /// Send a result to the consumer after doing additional validations for client-only streams. - /// Should be called by the protocol client when a result is received. + /// Should be called by the protocol client when a result is received from the network. /// /// - parameter result: The new result that was received. - func receive(_ result: StreamResult) { + func handleResultFromServer(_ result: StreamResult) { let receivedMessageCount = self.receivedMessageCount.perform { value in if case .message = result { value += 1 diff --git a/Libraries/Connect/Internal/Streaming/ServerOnlyAsyncStream.swift b/Libraries/Connect/Internal/Streaming/ServerOnlyAsyncStream.swift index ced2cf83..cf223296 100644 --- a/Libraries/Connect/Internal/Streaming/ServerOnlyAsyncStream.swift +++ b/Libraries/Connect/Internal/Streaming/ServerOnlyAsyncStream.swift @@ -14,7 +14,7 @@ import SwiftProtobuf -/// Concrete implementation of `ServerOnlyAsyncStreamInterface`. +/// Concrete **internal** implementation of `ServerOnlyAsyncStreamInterface`. @available(iOS 13, *) final class ServerOnlyAsyncStream: Sendable { private let bidirectionalStream: BidirectionalAsyncStream diff --git a/Libraries/Connect/Internal/Streaming/ServerOnlyStream.swift b/Libraries/Connect/Internal/Streaming/ServerOnlyStream.swift index c7128556..d70e1778 100644 --- a/Libraries/Connect/Internal/Streaming/ServerOnlyStream.swift +++ b/Libraries/Connect/Internal/Streaming/ServerOnlyStream.swift @@ -14,7 +14,7 @@ import SwiftProtobuf -/// Concrete implementation of `ServerOnlyStreamInterface`. +/// Concrete **internal** implementation of `ServerOnlyStreamInterface`. final class ServerOnlyStream: Sendable { private let bidirectionalStream: BidirectionalStream diff --git a/Libraries/Connect/PackageInternal/ConnectError+GRPC.swift b/Libraries/Connect/PackageInternal/ConnectError+GRPC.swift index 486bf523..38a83f6d 100644 --- a/Libraries/Connect/PackageInternal/ConnectError+GRPC.swift +++ b/Libraries/Connect/PackageInternal/ConnectError+GRPC.swift @@ -15,8 +15,8 @@ import Foundation extension ConnectError { - /// This should not be considered part of Connect's public/stable interface, and is subject - /// to change. When the compiler supports it, this should be package-internal. + /// **This should not be considered part of Connect's public/stable interface, and is subject + /// to change. When the compiler supports it, this should be package-internal.** /// /// Parses gRPC headers and/or trailers to obtain the status and any potential error. /// diff --git a/Libraries/Connect/PackageInternal/Envelope.swift b/Libraries/Connect/PackageInternal/Envelope.swift index a0421964..e8378134 100644 --- a/Libraries/Connect/PackageInternal/Envelope.swift +++ b/Libraries/Connect/PackageInternal/Envelope.swift @@ -15,11 +15,16 @@ import Foundation import SwiftProtobuf -/// Provides functionality for packing and unpacking (headers and length prefixed) messages. +/// **This should not be considered part of Connect's public/stable interface, and is subject +/// to change. When the compiler supports it, this should be package-internal.** /// -/// This should not be considered part of Connect's public/stable interface, and is subject -/// to change. When the compiler supports it, this should be package-internal. +/// Provides functionality for packing and unpacking (headers and length prefixed) messages. public enum Envelope { + /// The total number of bytes that will prefix a message. + public static var prefixLength: Int { + return 5 // Header flags (1 byte) + message length (4 bytes) + } + /// Packs a message into an "envelope", adding required header bytes and optionally /// applying compression. /// @@ -85,16 +90,6 @@ public enum Envelope { } } - /// Determines whether the specified data contains more than 1 message. - /// - /// - parameter packedData: The packed data to analyze. - /// - /// - returns: True if the data contains more than 1 message. - public static func containsMultipleMessages(_ packedData: Data) -> Bool { - let messageLength = self.messageLength(forPackedData: packedData) - return packedData.count > messageLength + self.prefixLength - } - /// Determines whether the specified data is compressed /// by assessing its compression prefix flag. /// @@ -105,17 +100,6 @@ public enum Envelope { return !packedData.isEmpty && (0b00000001 & packedData[0] != 0) } - // MARK: - Internal - - enum Error: Swift.Error { - case missingExpectedCompressionPool - } - - /// The total number of bytes that will prefix a message. - static var prefixLength: Int { - return 5 // Header flags (1 byte) + message length (4 bytes) - } - /// Computes the length of the message contained by a packed chunk of data. /// A packed chunk in this context refers to prefixed message data. /// @@ -127,7 +111,7 @@ public enum Envelope { /// - returns: The length of the next expected message in the packed data. If multiple chunks /// are specified, this will return the length of the first. Returns -1 if there is /// not enough prefix data to determine the message length. - static func messageLength(forPackedData data: Data) -> Int { + public static func messageLength(forPackedData data: Data) -> Int { guard data.count >= self.prefixLength else { return -1 } @@ -139,6 +123,12 @@ public enum Envelope { return Int(messageLength) } + // MARK: - Internal + + enum Error: Swift.Error { + case missingExpectedCompressionPool + } + // MARK: - Private private static func write(lengthOf message: Data, to buffer: inout Data) { diff --git a/Libraries/Connect/PackageInternal/Headers+GRPC.swift b/Libraries/Connect/PackageInternal/Headers+GRPC.swift index bada05c6..a9cace45 100644 --- a/Libraries/Connect/PackageInternal/Headers+GRPC.swift +++ b/Libraries/Connect/PackageInternal/Headers+GRPC.swift @@ -15,8 +15,8 @@ import Foundation extension Headers { - /// This should not be considered part of Connect's public/stable interface, and is subject - /// to change. When the compiler supports it, this should be package-internal. + /// **This should not be considered part of Connect's public/stable interface, and is subject + /// to change. When the compiler supports it, this should be package-internal.** /// /// Adds required headers to gRPC and gRPC-Web requests/streams. /// diff --git a/Libraries/Connect/PackageInternal/Trailers+gRPC.swift b/Libraries/Connect/PackageInternal/Trailers+gRPC.swift index 520fea81..b994d9cf 100644 --- a/Libraries/Connect/PackageInternal/Trailers+gRPC.swift +++ b/Libraries/Connect/PackageInternal/Trailers+gRPC.swift @@ -13,8 +13,8 @@ // limitations under the License. extension Trailers { - /// This should not be considered part of Connect's public/stable interface, and is subject - /// to change. When the compiler supports it, this should be package-internal. + /// **This should not be considered part of Connect's public/stable interface, and is subject + /// to change. When the compiler supports it, this should be package-internal.** /// /// Identifies the status code from gRPC and gRPC-Web trailers. /// diff --git a/Libraries/Connect/Public/Implementation/Clients/ProtocolClient.swift b/Libraries/Connect/Public/Implementation/Clients/ProtocolClient.swift index 86574487..b579b563 100644 --- a/Libraries/Connect/Public/Implementation/Clients/ProtocolClient.swift +++ b/Libraries/Connect/Public/Implementation/Clients/ProtocolClient.swift @@ -185,7 +185,7 @@ extension ProtocolClient: ProtocolClientInterface { ) -> any ClientOnlyStreamInterface { let clientOnly = ClientOnlyStream(onResult: onResult) let callbacks: RequestCallbacks = self.createRequestCallbacks( - path: path, headers: headers, onResult: { clientOnly.receive($0) } + path: path, headers: headers, onResult: { clientOnly.handleResultFromServer($0) } ) return clientOnly.configureForSending(with: callbacks) } @@ -228,7 +228,8 @@ extension ProtocolClient: ProtocolClientInterface { ) -> any BidirectionalAsyncStreamInterface { let bidirectionalAsync = BidirectionalAsyncStream() let callbacks: RequestCallbacks = self.createRequestCallbacks( - path: path, headers: headers, onResult: { bidirectionalAsync.receive($0) } + path: path, headers: headers, + onResult: { bidirectionalAsync.handleResultFromServer($0) } ) return bidirectionalAsync.configureForSending(with: callbacks) } @@ -240,7 +241,7 @@ extension ProtocolClient: ProtocolClientInterface { ) -> any ClientOnlyAsyncStreamInterface { let clientOnlyAsync = ClientOnlyAsyncStream() let callbacks: RequestCallbacks = self.createRequestCallbacks( - path: path, headers: headers, onResult: { clientOnlyAsync.receive($0) } + path: path, headers: headers, onResult: { clientOnlyAsync.handleResultFromServer($0) } ) return clientOnlyAsync.configureForSending(with: callbacks) } @@ -252,7 +253,8 @@ extension ProtocolClient: ProtocolClientInterface { ) -> any ServerOnlyAsyncStreamInterface { let bidirectionalAsync = BidirectionalAsyncStream() let callbacks: RequestCallbacks = self.createRequestCallbacks( - path: path, headers: headers, onResult: { bidirectionalAsync.receive($0) } + path: path, headers: headers, + onResult: { bidirectionalAsync.handleResultFromServer($0) } ) return ServerOnlyAsyncStream( bidirectionalStream: bidirectionalAsync.configureForSending(with: callbacks) diff --git a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift index 8a534f24..a1c69fc2 100644 --- a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift +++ b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift @@ -125,7 +125,7 @@ extension GRPCInterceptor: UnaryInterceptor { ), tracingInfo: response.tracingInfo )) return - } else if Envelope.containsMultipleMessages(rawData) { + } else if Envelope.containsMultipleGRPCMessages(rawData) { proceed(HTTPResponse( code: .unimplemented, headers: response.headers, @@ -275,6 +275,13 @@ extension GRPCInterceptor: StreamInterceptor { } } +private extension Envelope { + static func containsMultipleGRPCMessages(_ packedData: Data) -> Bool { + let messageLength = self.messageLength(forPackedData: packedData) + return packedData.count > messageLength + self.prefixLength + } +} + private final class Locked: @unchecked Sendable { private let lock = NIOLock() private var wrappedValue: T From 142a8d87b3f0e420f28e7addef779762f13a8b2f Mon Sep 17 00:00:00 2001 From: Michael Rebello Date: Wed, 5 Jun 2024 21:26:26 -0700 Subject: [PATCH 08/10] get tests to fail --- .../Connect/Internal/Interceptors/ConnectInterceptor.swift | 2 +- .../Connect/Internal/Interceptors/GRPCWebInterceptor.swift | 2 +- Libraries/ConnectNIO/Internal/GRPCInterceptor.swift | 4 +--- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift b/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift index ae51ac39..eacb7224 100644 --- a/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift +++ b/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift @@ -214,7 +214,7 @@ extension ConnectInterceptor: StreamInterceptor { error: response.error, trailers: response.metadata )) - } else if !message.isEmpty { + } else { proceed(.message(message)) } } catch let error { diff --git a/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift b/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift index b81cdbb2..ada8671a 100644 --- a/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift +++ b/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift @@ -233,7 +233,7 @@ extension GRPCWebInterceptor: StreamInterceptor { trailers: trailers )) } - } else if !unpackedData.isEmpty { + } else { proceed(.message(unpackedData)) } } catch let error { diff --git a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift index a1c69fc2..b6b5e377 100644 --- a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift +++ b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift @@ -226,9 +226,7 @@ extension GRPCInterceptor: StreamInterceptor { let unpackedMessage = try Envelope.unpackMessage( rawData, compressionPool: responseCompressionPool ).unpacked - if !unpackedMessage.isEmpty { - proceed(.message(unpackedMessage)) - } + proceed(.message(unpackedMessage)) } catch let error { // TODO: Close the stream here? proceed(.complete(code: .unknown, error: error, trailers: nil)) From c30822687a5121fd001fbba816d9ab3fa161b5ba Mon Sep 17 00:00:00 2001 From: Michael Rebello Date: Wed, 5 Jun 2024 21:29:46 -0700 Subject: [PATCH 09/10] fix tests Signed-off-by: Michael Rebello --- .../Streaming/ClientOnlyAsyncStream.swift | 19 +++--- .../Internal/Streaming/ClientOnlyStream.swift | 17 +++-- .../ClientOnlyStreamValidation.swift | 65 +++++++++++++++++++ .../StreamResult+ClientOnlyStream.swift | 54 --------------- 4 files changed, 87 insertions(+), 68 deletions(-) create mode 100644 Libraries/Connect/Internal/Streaming/ClientOnlyStreamValidation.swift delete mode 100644 Libraries/Connect/Internal/Streaming/StreamResult+ClientOnlyStream.swift diff --git a/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift b/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift index 28311d1a..07e18dac 100644 --- a/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift +++ b/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift @@ -24,18 +24,21 @@ import Foundation final class ClientOnlyAsyncStream< Input: ProtobufMessage, Output: ProtobufMessage >: BidirectionalAsyncStream { - private let receivedMessageCount = Locked(0) + private let receivedResults = Locked([StreamResult]()) override func handleResultFromServer(_ result: StreamResult) { - let receivedMessageCount = self.receivedMessageCount.perform { value in - if case .message = result { - value += 1 + let (isComplete, results) = self.receivedResults.perform { results in + results.append(result) + if case .complete = result { + return (true, ClientOnlyStreamValidation.validatedFinalClientStreamResults(results)) + } else { + return (false, []) } - return value } - super.handleResultFromServer( - result.validatedForClientStream(receivedMessageCount: receivedMessageCount) - ) + guard isComplete else { + return + } + results.forEach(super.handleResultFromServer) } } diff --git a/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift b/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift index 3a14bce5..85dec6a1 100644 --- a/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift +++ b/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift @@ -20,7 +20,7 @@ import SwiftProtobuf /// supporting both callbacks and async/await. This is internal to the package, and not public. final class ClientOnlyStream: @unchecked Sendable { private let onResult: @Sendable (StreamResult) -> Void - private let receivedMessageCount = Locked(0) + private let receivedResults = Locked([StreamResult]()) /// Callbacks used to send outbound data and close the stream. /// Optional because these callbacks are not available until the stream is initialized. private var requestCallbacks: RequestCallbacks? @@ -49,13 +49,18 @@ final class ClientOnlyStream: @ /// /// - parameter result: The new result that was received. func handleResultFromServer(_ result: StreamResult) { - let receivedMessageCount = self.receivedMessageCount.perform { value in - if case .message = result { - value += 1 + let (isComplete, results) = self.receivedResults.perform { results in + results.append(result) + if case .complete = result { + return (true, ClientOnlyStreamValidation.validatedFinalClientStreamResults(results)) + } else { + return (false, []) } - return value } - self.onResult(result.validatedForClientStream(receivedMessageCount: receivedMessageCount)) + guard isComplete else { + return + } + results.forEach(self.onResult) } } diff --git a/Libraries/Connect/Internal/Streaming/ClientOnlyStreamValidation.swift b/Libraries/Connect/Internal/Streaming/ClientOnlyStreamValidation.swift new file mode 100644 index 00000000..101ea08a --- /dev/null +++ b/Libraries/Connect/Internal/Streaming/ClientOnlyStreamValidation.swift @@ -0,0 +1,65 @@ +// Copyright 2022-2024 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import Foundation + +/// Namespace for performing client-only stream validation. +enum ClientOnlyStreamValidation { + /// Applies some validations which are only relevant for client-only streams. + /// + /// Should be called after all values have been received over a client stream. Since client + /// streams only expect 1 result, all values returned from the server should be buffered before + /// being validated here and returned to the caller. + /// + /// - parameter results: The buffered list of results to validate. + /// + /// - returns: The list of stream results which should be returned to the caller. + static func validatedFinalClientStreamResults( + _ results: [StreamResult] + ) -> [StreamResult] { + var messageCount = 0 + for result in results { + switch result { + case .headers: + continue + case .message: + messageCount += 1 + case .complete(let code, _, _): + if code != .ok { + return results + } + } + } + + if messageCount < 1 { + return [ + .complete( + code: .internalError, error: ConnectError( + code: .unimplemented, message: "unary stream has no messages" + ), trailers: nil + ), + ] + } else if messageCount > 1 { + return [ + .complete( + code: .internalError, error: ConnectError( + code: .unimplemented, message: "unary stream has multiple messages" + ), trailers: nil + ), + ] + } else { + return results + } + } +} diff --git a/Libraries/Connect/Internal/Streaming/StreamResult+ClientOnlyStream.swift b/Libraries/Connect/Internal/Streaming/StreamResult+ClientOnlyStream.swift deleted file mode 100644 index c0f7d2a0..00000000 --- a/Libraries/Connect/Internal/Streaming/StreamResult+ClientOnlyStream.swift +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2022-2024 The Connect Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import Foundation - -extension StreamResult { - /// Applies some validations which are only relevant for client-only streams. - /// - /// - parameter receivedMessageCount: The number of response messages that have been received so - /// far, including the one being validated. - /// - /// - returns: The validated stream result, which may have been transformed into an error. - func validatedForClientStream(receivedMessageCount: Int) -> Self { - switch self { - case .headers: - return self - case .message: - if receivedMessageCount > 1 { - return .complete( - code: .internalError, - error: ConnectError( - code: .unimplemented, message: "unary stream has multiple messages" - ), - trailers: nil - ) - } else { - return self - } - case .complete(let code, _, _): - if code == .ok && receivedMessageCount < 1 { - return .complete( - code: .internalError, - error: ConnectError( - code: .unimplemented, message: "unary stream has no messages" - ), - trailers: nil - ) - } else { - return self - } - } - } -} From e49a279b9e587523b0e4df2d6603ff3b164566cc Mon Sep 17 00:00:00 2001 From: Michael Rebello Date: Thu, 6 Jun 2024 17:33:48 -0700 Subject: [PATCH 10/10] fixup --- .../Implementation/Clients/URLSessionHTTPClient.swift | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Libraries/Connect/Public/Implementation/Clients/URLSessionHTTPClient.swift b/Libraries/Connect/Public/Implementation/Clients/URLSessionHTTPClient.swift index 5bf3b8f8..f045732d 100644 --- a/Libraries/Connect/Public/Implementation/Clients/URLSessionHTTPClient.swift +++ b/Libraries/Connect/Public/Implementation/Clients/URLSessionHTTPClient.swift @@ -67,7 +67,11 @@ open class URLSessionHTTPClient: NSObject, HTTPClientInterface, @unchecked Senda headers: [:], message: data, trailers: [:], - error: ConnectError(code: code, message: error.localizedDescription), + error: ConnectError( + code: code, + message: error.localizedDescription, + exception: error + ), tracingInfo: nil )) } else {