Skip to content

Commit

Permalink
More consistent client-side exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
edsko committed Sep 28, 2023
1 parent 279e0e9 commit 76243b8
Show file tree
Hide file tree
Showing 19 changed files with 160 additions and 70 deletions.
5 changes: 5 additions & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,8 @@ packages: .
package grapesy
tests: True
flags: +build-demo +build-stress-test

source-repository-package
type: git
location: https://github.com/edsko/http2.git
tag: 6654f95e5e202c48599f3b7ae251ea13bd785a50
2 changes: 1 addition & 1 deletion grapesy.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ library
Network.GRPC.Spec.Timeout
Network.GRPC.Util.AccumulatedByteString
Network.GRPC.Util.ByteString
Network.GRPC.Util.Concurrency
Network.GRPC.Util.HTTP2
Network.GRPC.Util.HTTP2.Stream
Network.GRPC.Util.Parser
Expand All @@ -127,7 +128,6 @@ library
Network.GRPC.Util.Session.Channel
Network.GRPC.Util.Session.Client
Network.GRPC.Util.Session.Server
Network.GRPC.Util.STM
Network.GRPC.Util.Thread
Network.GRPC.Util.TLS
Paths_grapesy
Expand Down
4 changes: 4 additions & 0 deletions src/Network/GRPC/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ module Network.GRPC.Client (
-- * Common serialization formats
, Protobuf

-- * Exceptions
, ServerDisconnected(..)

-- * Debugging
, ClientDebugMsg(..)
) where

import Network.GRPC.Client.Call
import Network.GRPC.Client.Connection
import Network.GRPC.Spec
import Network.GRPC.Util.HTTP2.Stream (ServerDisconnected(..))
import Network.GRPC.Util.TLS qualified as Util.TLS

{-------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/Network/GRPC/Client/Call.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ import Network.GRPC.Client.Session
import Network.GRPC.Common
import Network.GRPC.Common.StreamElem qualified as StreamElem
import Network.GRPC.Spec
import Network.GRPC.Util.Concurrency
import Network.GRPC.Util.Session qualified as Session
import Network.GRPC.Util.STM

{-------------------------------------------------------------------------------
Definition
Expand Down
7 changes: 4 additions & 3 deletions src/Network/GRPC/Client/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ module Network.GRPC.Client.Connection (
, exponentialBackoff
) where

import Control.Concurrent
import Control.Monad
import Control.Monad.Catch
import Control.Tracer
Expand All @@ -44,8 +43,9 @@ import Network.GRPC.Client.Session
import Network.GRPC.Common.Compression qualified as Compr
import Network.GRPC.Common.Compression qualified as Compression
import Network.GRPC.Spec
import Network.GRPC.Util.Concurrency
import Network.GRPC.Util.HTTP2.Stream (ServerDisconnected(..))
import Network.GRPC.Util.Session qualified as Session
import Network.GRPC.Util.STM
import Network.GRPC.Util.TLS (ServerValidation(..), SslKeyLog(..))
import Network.GRPC.Util.TLS qualified as Util.TLS

Expand Down Expand Up @@ -303,7 +303,8 @@ withConnection connParams server k = do
case mErr of
Nothing -> ExitCaseSuccess ()
Just exitWithException ->
ExitCaseException . toException $ exitWithException
ExitCaseException . toException $
ServerDisconnected exitWithException
_mAlreadyClosed <- Session.close channel exitReason
return ()

Expand Down
2 changes: 1 addition & 1 deletion src/Network/GRPC/Client/StreamType.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ module Network.GRPC.Client.StreamType (
, rpc
) where

import Control.Concurrent.Async
import Control.Monad.Catch
import Control.Monad.IO.Class
import Data.Default
Expand All @@ -13,6 +12,7 @@ import Data.Proxy
import Network.GRPC.Client
import Network.GRPC.Common.StreamType
import Network.GRPC.Spec
import Network.GRPC.Util.Concurrency

{-------------------------------------------------------------------------------
Obtain handler for specific RPC call
Expand Down
2 changes: 1 addition & 1 deletion src/Network/GRPC/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import Control.Exception

import Network.GRPC.Common.StreamElem (StreamElem(..))
import Network.GRPC.Spec
import Network.GRPC.Util.Concurrency (STMException(..))
import Network.GRPC.Util.Session qualified as Session
import Network.GRPC.Util.STM (STMException(..))
import Network.GRPC.Util.Thread qualified as Thread
import Network.GRPC.Util.TLS

Expand Down
1 change: 1 addition & 0 deletions src/Network/GRPC/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import Network.GRPC.Server.Context qualified as Context
import Network.GRPC.Server.Handler (RpcHandler(..))
import Network.GRPC.Server.Handler qualified as Handler
import Network.GRPC.Spec
import Network.GRPC.Util.HTTP2.Stream (ClientDisconnected(..))
import Network.GRPC.Util.Session.Server qualified as Session.Server

{-------------------------------------------------------------------------------
Expand Down
18 changes: 3 additions & 15 deletions src/Network/GRPC/Server/Call.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@ module Network.GRPC.Server.Call (

-- ** Internal API
, sendProperTrailers

-- * Exceptions
, ClientDisconnected(..)
) where

import Control.Concurrent.Async
import Control.Monad
import Control.Monad.Catch
import Control.Tracer
Expand All @@ -46,7 +42,6 @@ import GHC.Stack
import Network.HTTP.Types qualified as HTTP
import Network.HTTP2.Internal qualified as HTTP2
import Network.HTTP2.Server qualified as HTTP2
import Text.Show.Pretty

import Network.GRPC.Common
import Network.GRPC.Common.Compression qualified as Compr
Expand All @@ -56,9 +51,10 @@ import Network.GRPC.Server.Connection qualified as Connection
import Network.GRPC.Server.Context qualified as Context
import Network.GRPC.Server.Session
import Network.GRPC.Spec
import Network.GRPC.Util.Concurrency
import Network.GRPC.Util.HTTP2.Stream (ClientDisconnected(..))
import Network.GRPC.Util.Session qualified as Session
import Network.GRPC.Util.Session.Server qualified as Server
import Network.GRPC.Util.STM

{-------------------------------------------------------------------------------
Definition
Expand Down Expand Up @@ -300,7 +296,7 @@ acceptCall params conn k = do
-- terminate. If we are interrupted while we wait, it depends on the
-- interruption:
--
-- * If we are interrupted by 'HTTP2.KilledByHttp2ThreadPoolManager', it means
-- * If we are interrupted by 'HTTP2.KilledByHttp2ThreadManager', it means
-- we got disconnected from the client. In this case, we shut down the channel
-- (if it's not already shut down); /if/ the handler at this tries to
-- communicate with the client, an exception will be raised. However, the
Expand Down Expand Up @@ -388,14 +384,6 @@ forwardException call err = do
ignoreExceptions :: SomeException -> IO ()
ignoreExceptions _ = return ()

-- | Client disconnected unexpectedly
data ClientDisconnected = ClientDisconnected SomeException
deriving stock (Show)
deriving anyclass (Exception)

instance PrettyVal ClientDisconnected where
prettyVal = String . show

{-------------------------------------------------------------------------------
Open (ongoing) call
-------------------------------------------------------------------------------}
Expand Down
2 changes: 1 addition & 1 deletion src/Network/GRPC/Server/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ module Network.GRPC.Server.Run (
, runServer
) where

import Control.Concurrent.Async
import Control.Exception
import Data.Default
import Network.ByteOrder (BufferSize)
Expand All @@ -21,6 +20,7 @@ import Network.Run.TCP (runTCPServer)
import Network.Socket (HostName, ServiceName, PortNumber)
import Network.TLS qualified as TLS

import Network.GRPC.Util.Concurrency
import Network.GRPC.Util.TLS (SslKeyLog(..))
import Network.GRPC.Util.TLS qualified as Util.TLS

Expand Down
68 changes: 68 additions & 0 deletions src/Network/GRPC/Util/Concurrency.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
{-# OPTIONS_GHC -Wno-redundant-constraints #-}

module Network.GRPC.Util.Concurrency (
-- * @Control.Concurrent@
module ReexportConcurrent
, forkIO
, forkIOWithUnmask
-- * @Control.Concurrent.Async@
, module ReexportAsync
, withAsync
-- * @Control.Concurrent.STM@
, module ReexportSTM
, atomically
, STMException(..)
) where

import Control.Exception
import GHC.Stack

import Control.Concurrent as ReexportConcurrent hiding (forkIO, forkIOWithUnmask)
import Control.Concurrent.Async as ReexportAsync hiding (withAsync)
import Control.Concurrent.STM as ReexportSTM hiding (atomically)

import Control.Concurrent qualified as Concurrent
import Control.Concurrent.Async qualified as Async
import Control.Concurrent.STM qualified as STM

{-------------------------------------------------------------------------------
Wrap thread spawning
-------------------------------------------------------------------------------}

forkIO :: HasCallStack => IO () -> IO ThreadId
forkIO = Concurrent.forkIO . wrapThreadBody

forkIOWithUnmask ::
HasCallStack
=> ((forall a. IO a -> IO a) -> IO ())
-> IO ThreadId
forkIOWithUnmask k = Concurrent.forkIOWithUnmask $ \unmask ->
wrapThreadBody (k unmask)

withAsync :: HasCallStack => IO a -> (Async a -> IO b) -> IO b
withAsync = Async.withAsync . wrapThreadBody

wrapThreadBody :: HasCallStack => IO a -> IO a
wrapThreadBody = id
--wrapThreadBody body = do
-- tid <- myThreadId
-- writeFile ("tmp/" ++ show tid) $ prettyCallStack callStack
-- body

{-------------------------------------------------------------------------------
Wrap exceptions with a callstack
-------------------------------------------------------------------------------}

data STMException = STMException CallStack SomeException
deriving stock (Show)
deriving anyclass (Exception)

-- | Rethrow STM exceptions with a callstakc
--
-- This is especially helpful to track down "blocked indefinitely" exceptions.
--
-- Implementation note: To catch such exceptions, we /must/ have the exception
-- handler /outside/ of the STM transaction.
atomically :: HasCallStack => STM a -> IO a
atomically action =
STM.atomically action `catch` (throwIO . STMException callStack )
46 changes: 35 additions & 11 deletions src/Network/GRPC/Util/HTTP2/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ module Network.GRPC.Util.HTTP2.Stream (
, clientOutputStream
-- * Exceptions
, StreamException(..)
, ClientDisconnected(..)
, ServerDisconnected(..)
) where

import Control.Exception
Expand All @@ -25,6 +27,7 @@ import Network.HTTP2.Client qualified as Client
import Network.HTTP2.Server qualified as Server

import Network.GRPC.Util.HTTP2 (fromHeaderTable)
import Text.Show.Pretty

{-------------------------------------------------------------------------------
Streams
Expand Down Expand Up @@ -66,9 +69,9 @@ getTrailers = _getTrailers
serverInputStream :: Server.Request -> IO InputStream
serverInputStream req = do
return InputStream {
_getChunk = wrapStreamExceptions $
_getChunk = wrapStreamExceptionsWith ClientDisconnected $
Server.getRequestBodyChunk req
, _getTrailers = wrapStreamExceptions $
, _getTrailers = wrapStreamExceptionsWith ClientDisconnected $
maybe [] fromHeaderTable <$>
Server.getRequestTrailers req
}
Expand Down Expand Up @@ -114,8 +117,10 @@ serverOutputStream writeChunk' flush' = do
-- case (see discussion above).

let outputStream = OutputStream {
_writeChunk = wrapStreamExceptions . writeChunk'
, _flush = wrapStreamExceptions $ flush'
_writeChunk = \c -> wrapStreamExceptionsWith ClientDisconnected $
writeChunk' c
, _flush = wrapStreamExceptionsWith ClientDisconnected $
flush'
}

flush outputStream
Expand All @@ -128,9 +133,9 @@ serverOutputStream writeChunk' flush' = do
clientInputStream :: Client.Response -> IO InputStream
clientInputStream resp = do
return InputStream {
_getChunk = wrapStreamExceptions $
_getChunk = wrapStreamExceptionsWith ServerDisconnected $
Client.getResponseBodyChunk resp
, _getTrailers = wrapStreamExceptions $
, _getTrailers = wrapStreamExceptionsWith ServerDisconnected $
maybe [] fromHeaderTable <$>
Client.getResponseTrailers resp
}
Expand All @@ -150,8 +155,10 @@ clientOutputStream writeChunk' flush' = do
-- improve the "duality" between the server/client API.

let outputStream = OutputStream {
_writeChunk = wrapStreamExceptions . writeChunk'
, _flush = wrapStreamExceptions $ flush'
_writeChunk = \c -> wrapStreamExceptionsWith ServerDisconnected $
writeChunk' c
, _flush = wrapStreamExceptionsWith ServerDisconnected $
flush'
}

writeChunk outputStream mempty
Expand All @@ -165,6 +172,23 @@ data StreamException = StreamException SomeException CallStack
deriving stock (Show)
deriving anyclass (Exception)

wrapStreamExceptions :: HasCallStack => IO a -> IO a
wrapStreamExceptions action =
action `catch` \err -> throwIO $ StreamException err callStack
-- | Client disconnected unexpectedly
data ClientDisconnected = ClientDisconnected SomeException
deriving stock (Show)
deriving anyclass (Exception)

-- | Server disconnected unexpectedly
data ServerDisconnected = ServerDisconnected SomeException
deriving stock (Show)
deriving anyclass (Exception)

instance PrettyVal ClientDisconnected where prettyVal = String . show
instance PrettyVal ServerDisconnected where prettyVal = String . show

wrapStreamExceptionsWith ::
(HasCallStack, Exception e)
=> (SomeException -> e)
-> IO a -> IO a
wrapStreamExceptionsWith f action =
action `catch` \err ->
throwIO $ f $ toException $ StreamException err callStack
29 changes: 0 additions & 29 deletions src/Network/GRPC/Util/STM.hs

This file was deleted.

4 changes: 2 additions & 2 deletions src/Network/GRPC/Util/Session/Channel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ import Network.HTTP2.Internal qualified as HTTP2

import Network.GRPC.Common.StreamElem (StreamElem(..))
import Network.GRPC.Common.StreamElem qualified as StreamElem
import Network.GRPC.Util.Concurrency
import Network.GRPC.Util.HTTP2.Stream
import Network.GRPC.Util.Parser
import Network.GRPC.Util.Session.API
import Network.GRPC.Util.Thread
import Network.GRPC.Util.HTTP2.Stream
import Network.GRPC.Util.STM

{-------------------------------------------------------------------------------
Definitions
Expand Down
Loading

0 comments on commit 76243b8

Please sign in to comment.