Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve all outstanding conformance opt-outs #271

Merged
merged 10 commits into from
Jun 8, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Examples/ElizaCocoaPodsApp/Podfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ SPEC CHECKSUMS:

PODFILE CHECKSUM: b598f373a6ab5add976b09c2ac79029bf2200d48

COCOAPODS: 1.13.0
COCOAPODS: 1.15.2
74 changes: 60 additions & 14 deletions Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -83,28 +83,51 @@ extension ConnectInterceptor: UnaryInterceptor {
] = current.value
})

if let encoding = response.headers[HeaderConstants.contentEncoding]?.first,
let compressionPool = self.config.responseCompressionPool(forName: encoding),
let message = response.message.flatMap({ try? compressionPool.decompress(data: $0) })
let finalResponse: HTTPResponse
let contentType = response.headers[HeaderConstants.contentType]?.first ?? ""
if response.code == .ok && !contentType.hasPrefix("application/\(self.config.codec.name())")
{
proceed(HTTPResponse(
code: response.code,
headers: headers,
message: message,
trailers: trailers,
error: response.error,
// If content-type looks like it could be an RPC server's response, consider
// this an internal error.
let code: Code = contentType.hasPrefix("application/") ? .internalError : .unknown
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a non-200 HTTP status handled somewhere else? I expected to see it handled just above, or the HTTP status incorporated here (so deriving a code from the HTTP status instead of hard-coding to "unknown").

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The non-200 cases are falling into one of the else/else if cases below depending on whether the error response is compressed (Connect Compressed Error and End-Stream/HTTPVersion:2/TLS:false/error/compressed is one of these cases). Both of these use response.code, which is the Connect code from the HTTP status.

finalResponse = HTTPResponse(
code: code, headers: headers, message: nil, trailers: trailers,
error: ConnectError(code: code, message: "unexpected content-type: \(contentType)"),
tracingInfo: response.tracingInfo
))
)
} else if let encoding = response.headers[HeaderConstants.contentEncoding]?.first {
if let compressionPool = self.config.responseCompressionPool(forName: encoding),
let message = response.message.flatMap({ try? compressionPool.decompress(data: $0) })
{
finalResponse = HTTPResponse(
code: response.code,
headers: headers,
message: message,
trailers: trailers,
error: response.error,
tracingInfo: response.tracingInfo
)
} else {
finalResponse = HTTPResponse(
code: .internalError,
headers: headers,
message: nil,
trailers: trailers,
error: ConnectError(code: .internalError, message: "unexpected encoding"),
tracingInfo: response.tracingInfo
)
}
} else {
proceed(HTTPResponse(
finalResponse = HTTPResponse(
code: response.code,
headers: headers,
message: response.message,
trailers: trailers,
error: response.error,
tracingInfo: response.tracingInfo
))
)
}
proceed(finalResponse)
}
}

Expand Down Expand Up @@ -146,13 +169,36 @@ extension ConnectInterceptor: StreamInterceptor {
switch result {
case .headers(let headers):
self.streamResponseHeaders.value = headers
proceed(result)

let contentType = headers[HeaderConstants.contentType]?.first ?? ""
if contentType != "application/connect+\(self.config.codec.name())" {
// If content-type looks like it could be an RPC server's response, consider
// this an internal error.
let code: Code = contentType.hasPrefix("application/connect+")
? .internalError
: .unknown
proceed(.complete(
code: code, error: ConnectError(
code: code, message: "unexpected content-type: \(contentType)"
), trailers: nil
))
} else {
proceed(result)
}
case .message(let data):
do {
let responseCompressionPool = self.streamResponseHeaders.value?[
HeaderConstants.connectStreamingContentEncoding
]?.first.flatMap { self.config.responseCompressionPool(forName: $0) }
if responseCompressionPool == nil && Envelope.isCompressed(data) {
proceed(.complete(
code: .internalError, error: ConnectError(
code: .internalError, message: "received unexpected compressed message"
), trailers: nil
))
return
}

let (headerByte, message) = try Envelope.unpackMessage(
data, compressionPool: responseCompressionPool
)
Expand All @@ -168,7 +214,7 @@ extension ConnectInterceptor: StreamInterceptor {
error: response.error,
trailers: response.metadata
))
} else {
} else if !message.isEmpty {
proceed(.message(message))
}
} catch let error {
Expand Down
97 changes: 84 additions & 13 deletions Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,38 @@ extension GRPCWebInterceptor: UnaryInterceptor {
response.headers,
trailers: response.trailers
)
if grpcCode != .ok || connectError != nil {
rebello95 marked this conversation as resolved.
Show resolved Hide resolved
proceed(HTTPResponse(
code: grpcCode,
headers: response.headers,
message: response.message,
trailers: response.trailers,
error: connectError,
tracingInfo: response.tracingInfo
))
} else {
proceed(HTTPResponse(
code: .unimplemented,
headers: response.headers,
message: response.message,
trailers: response.trailers,
error: ConnectError(
code: .unimplemented, message: "unary response has no message"
),
tracingInfo: response.tracingInfo
))
}
return
}

let contentType = response.headers[HeaderConstants.contentType]?.first ?? ""
if response.code == .ok && !self.contentTypeIsExpectedGRPCWeb(contentType) {
// If content-type looks like it could be a gRPC server's response, consider
// this an internal error.
let code: Code = self.contentTypeIsGRPCWeb(contentType) ? .internalError : .unknown
proceed(HTTPResponse(
code: grpcCode,
headers: response.headers,
message: response.message,
trailers: response.trailers,
error: connectError,
code: code, headers: response.headers, message: nil, trailers: response.trailers,
error: ConnectError(code: code, message: "unexpected content-type: \(contentType)"),
tracingInfo: response.tracingInfo
))
return
Expand All @@ -75,6 +101,18 @@ extension GRPCWebInterceptor: UnaryInterceptor {
let compressionPool = response.headers[HeaderConstants.grpcContentEncoding]?
.first
.flatMap { self.config.responseCompressionPool(forName: $0) }
if compressionPool == nil && Envelope.isCompressed(responseData) {
proceed(HTTPResponse(
code: .internalError, headers: response.headers, message: nil,
trailers: response.trailers,
error: ConnectError(
code: .internalError, message: "received unexpected compressed message"
),
tracingInfo: response.tracingInfo
))
return
}

do {
// gRPC Web returns data in 2 chunks (either/both of which may be compressed):
// 1. OPTIONAL (when not trailers-only): The (headers and length prefixed)
Expand Down Expand Up @@ -107,7 +145,7 @@ extension GRPCWebInterceptor: UnaryInterceptor {
}
} catch let error {
proceed(HTTPResponse(
code: .unknown,
code: .unimplemented,
headers: response.headers,
message: response.message,
trailers: response.trailers,
Expand Down Expand Up @@ -146,6 +184,19 @@ extension GRPCWebInterceptor: StreamInterceptor {
) {
switch result {
case .headers(let headers):
let contentType = headers[HeaderConstants.contentType]?.first ?? ""
if !self.contentTypeIsExpectedGRPCWeb(contentType) {
// If content-type looks like it could be a gRPC server's response, consider
// this an internal error.
let code: Code = self.contentTypeIsGRPCWeb(contentType) ? .internalError : .unknown
proceed(.complete(
code: code, error: ConnectError(
code: code, message: "unexpected content-type: \(contentType)"
), trailers: headers
))
return
}

if let grpcCode = headers.grpcStatus() {
// Headers-only response.
proceed(.complete(
Expand Down Expand Up @@ -181,7 +232,7 @@ extension GRPCWebInterceptor: StreamInterceptor {
trailers: trailers
))
}
} else {
} else if !unpackedData.isEmpty {
proceed(.message(unpackedData))
}
} catch let error {
Expand All @@ -193,9 +244,20 @@ extension GRPCWebInterceptor: StreamInterceptor {
proceed(result)
}
}
}

// MARK: - Private
// MARK: - Private

private func contentTypeIsGRPCWeb(_ contentType: String) -> Bool {
return contentType == "application/grpc-web"
|| contentType.hasPrefix("application/grpc-web+")
}

private func contentTypeIsExpectedGRPCWeb(_ contentType: String) -> Bool {
let codecName = self.config.codec.name()
return (codecName == "proto" && contentType == "application/grpc-web")
|| contentType == "application/grpc-web+\(codecName)"
}
}

private struct TrailersDecodingError: Error {}

Expand Down Expand Up @@ -228,13 +290,22 @@ private extension Trailers {
private extension HTTPResponse {
func withHandledGRPCWebTrailers(_ trailers: Trailers, message: Data?) -> Self {
let (grpcCode, error) = ConnectError.parseGRPCHeaders(self.headers, trailers: trailers)
if grpcCode == .ok {
if grpcCode != .ok || error != nil {
return HTTPResponse(
code: grpcCode,
headers: self.headers,
message: message,
message: nil,
trailers: trailers,
error: nil,
error: error,
tracingInfo: self.tracingInfo
)
} else if message?.isEmpty != false {
return HTTPResponse(
code: .unimplemented,
headers: self.headers,
message: nil,
trailers: trailers,
error: ConnectError(code: .unimplemented, message: "unary response has no message"),
tracingInfo: self.tracingInfo
)
} else {
Expand All @@ -243,7 +314,7 @@ private extension HTTPResponse {
headers: self.headers,
message: message,
trailers: trailers,
error: error,
error: nil,
tracingInfo: self.tracingInfo
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ import SwiftProtobuf
/// Concrete implementation of `BidirectionalAsyncStreamInterface`.
/// Provides the necessary wiring to bridge from closures/callbacks to Swift's `AsyncStream`
/// to work with async/await.
///
/// If the library removes callback support in favor of only supporting async/await in the future,
/// this class can be simplified.
@available(iOS 13, *)
final class BidirectionalAsyncStream<
class BidirectionalAsyncStream<
Input: ProtobufMessage, Output: ProtobufMessage
>: @unchecked Sendable {
/// The underlying async stream that will be exposed to the consumer.
Expand Down Expand Up @@ -103,7 +106,3 @@ extension BidirectionalAsyncStream: BidirectionalAsyncStreamInterface {
self.requestCallbacks?.cancel()
}
}

// Conforms to the client-only interface since it matches exactly and the implementation is internal
@available(iOS 13, *)
extension BidirectionalAsyncStream: ClientOnlyAsyncStreamInterface {}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,3 @@ extension BidirectionalStream: BidirectionalStreamInterface {
self.requestCallbacks.cancel()
}
}

// Conforms to the client-only interface since it matches exactly and the implementation is internal
extension BidirectionalStream: ClientOnlyStreamInterface {}
45 changes: 45 additions & 0 deletions Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2022-2024 The Connect Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import Foundation

/// Concrete implementation of `ClientOnlyAsyncStreamInterface`.
/// Provides the necessary wiring to bridge from closures/callbacks to Swift's `AsyncStream`
/// to work with async/await.
///
/// This subclasses `BidirectionalAsyncStream` since its behavior is purely additive (it overlays
/// some additional validation) and both types are internal to the package, not public.
@available(iOS 13, *)
final class ClientOnlyAsyncStream<
Input: ProtobufMessage, Output: ProtobufMessage
>: BidirectionalAsyncStream<Input, Output> {
private let receivedMessageCount = Locked(0)

override func receive(_ result: StreamResult<Output>) {
let receivedMessageCount = self.receivedMessageCount.perform { value in
if case .message = result {
value += 1
}
return value
}
super.receive(result.validatedForClientStream(receivedMessageCount: receivedMessageCount))
}
}

@available(iOS 13, *)
extension ClientOnlyAsyncStream: ClientOnlyAsyncStreamInterface {
func closeAndReceive() {
self.close()
}
}
Loading
Loading