Skip to content

Commit

Permalink
More early termination problems
Browse files Browse the repository at this point in the history
Turns out, the grapesy test suite is Too Powerful and has in fact revealed an
STM bug 😱 See `atomically` for details.
  • Loading branch information
edsko committed Nov 11, 2023
1 parent 0148d3d commit 882a0dd
Show file tree
Hide file tree
Showing 15 changed files with 302 additions and 117 deletions.
14 changes: 10 additions & 4 deletions cabal.project
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
packages: .
packages:
.

package grapesy
tests: True
flags: +build-demo +build-stress-test

-- The version on Hackage also works, it just writes some (unnecessary)
-- exceptions to stderr
-- https://github.com/kazu-yamamoto/http2/pull/97
source-repository-package
type: git
location: https://github.com/edsko/http2.git
tag: c2a2994ed45a08998c2b6eb22a08f28b9eb36c3c
tag: d7c848f6a6e4b2d809af4304ea5f5fbcdc58cd52

-- https://github.com/kazu-yamamoto/http2-tls/pull/4
source-repository-package
type: git
location: https://github.com/edsko/http2-tls.git
tag: 4fdfe7c32177c5a483916623cb8f93525761cff6
4 changes: 3 additions & 1 deletion grapesy.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ test-suite test-grapesy
ghc-options:
-Wprepositive-qualified-module
-threaded
-rtsopts
type:
exitcode-stdio-1.0
hs-source-dirs:
Expand Down Expand Up @@ -232,7 +233,7 @@ test-suite test-grapesy
, pretty-show >= 1.10 && < 1.11
, QuickCheck >= 2.14 && < 2.15
, stm >= 2.5 && < 2.6
, tasty >= 1.4 && < 1.5
, tasty >= 1.4 && < 1.6
, tasty-hunit >= 0.10 && < 0.11
, tasty-quickcheck >= 0.10 && < 0.11
, text >= 1.2 && < 2.1
Expand Down Expand Up @@ -353,6 +354,7 @@ executable test-stress
, containers >= 0.6 && < 0.7
, contra-tracer >= 0.2 && < 0.3
, data-default >= 0.7 && < 0.8
, exceptions >= 0.10 && < 0.11
, http2
, optparse-applicative >= 0.16 && < 0.19
, pretty-show >= 1.10 && < 1.11
Expand Down
2 changes: 1 addition & 1 deletion src/Network/GRPC/Client/Call.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import Network.GRPC.Util.Session qualified as Session
-- when the body is run. If you want to be sure that the call has been setup,
-- you can call 'recvResponseMetadata'.
withRPC :: forall m rpc a.
(MonadMask m, MonadIO m, IsRPC rpc)
(MonadMask m, MonadIO m, IsRPC rpc, HasCallStack)
=> Connection -> CallParams -> Proxy rpc -> (Call rpc -> m a) -> m a
withRPC conn callParams proxy k =
(throwUnclean =<<) $
Expand Down
11 changes: 3 additions & 8 deletions src/Network/GRPC/Client/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -470,14 +470,9 @@ stayConnected connParams server connVar connCanClose =

-- TODO: This is currently only used for the HTTP case, not HTTPS
clientConfig :: Authority -> Scheme -> HTTP2.Client.ClientConfig
clientConfig auth scheme = HTTP2.Client.ClientConfig {
scheme = rawScheme serverPseudoHeaders
, authority = rawAuthority serverPseudoHeaders

-- Docs describe this as "How many pushed responses are contained in
-- the cache". Since gRPC does not make use of HTTP2 server push, we
-- set it to 0.
, cacheLimit = 0
clientConfig auth scheme = HTTP2.Client.defaultClientConfig {
HTTP2.Client.scheme = rawScheme serverPseudoHeaders
, HTTP2.Client.authority = rawAuthority serverPseudoHeaders
}
where
serverPseudoHeaders :: RawServerHeaders
Expand Down
2 changes: 0 additions & 2 deletions src/Network/GRPC/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ module Network.GRPC.Common (
-- ** Low-level
, Session.ChannelClosed(..)
, Session.ChannelUncleanClose(..)
, STMException(..)
, Thread.ThreadCancelled(..)
, Thread.ThreadInterfaceUnavailable(..)
) where
Expand All @@ -43,7 +42,6 @@ import Control.Exception

import Network.GRPC.Common.StreamElem (StreamElem(..))
import Network.GRPC.Spec
import Network.GRPC.Util.Concurrency (STMException(..))
import Network.GRPC.Util.Session qualified as Session
import Network.GRPC.Util.Thread qualified as Thread
import Network.GRPC.Util.TLS
Expand Down
4 changes: 2 additions & 2 deletions src/Network/GRPC/Server/Call.hs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ acceptCall params conn k = do
}
, outCompression = cOut
}
KickoffTrailersOnly trailers ->
KickoffTrailersOnly trailers -> do
return $ Session.FlowStartNoMessages trailers
where
inboundHeaders :: RequestHeaders
Expand Down Expand Up @@ -392,7 +392,7 @@ forwardException call err = do
--
-- We do not return trailers, since gRPC does not support sending trailers from
-- the client to the server (only from the server to the client).
recvInput :: HasCallStack => Call rpc -> IO (StreamElem NoMetadata (Input rpc))
recvInput :: forall rpc. HasCallStack => Call rpc -> IO (StreamElem NoMetadata (Input rpc))
recvInput call = atomically $ recvInputSTM call

-- | Send RPC output to the client
Expand Down
60 changes: 43 additions & 17 deletions src/Network/GRPC/Util/Concurrency.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,23 @@ module Network.GRPC.Util.Concurrency (
-- * @Control.Concurrent.STM@
, module ReexportSTM
, atomically
, STMException(..)
) where

import Prelude hiding (id)

import Control.Exception
import GHC.Stack

import Control.Concurrent as ReexportConcurrent hiding (forkIO, forkIOWithUnmask)
import Control.Concurrent.Async as ReexportAsync hiding (withAsync)
import Control.Concurrent.STM as ReexportSTM hiding (atomically)
import Control.Concurrent as ReexportConcurrent hiding (
forkIO
, forkIOWithUnmask
)
import Control.Concurrent.Async as ReexportAsync hiding (
withAsync
)
import Control.Concurrent.STM as ReexportSTM hiding (
atomically
)

import Control.Concurrent qualified as Concurrent
import Control.Concurrent.Async qualified as Async
Expand All @@ -43,26 +51,44 @@ withAsync :: HasCallStack => IO a -> (Async a -> IO b) -> IO b
withAsync = Async.withAsync . wrapThreadBody

wrapThreadBody :: HasCallStack => IO a -> IO a
wrapThreadBody = id
wrapThreadBody body = body
--wrapThreadBody body = do
-- tid <- myThreadId
-- writeFile ("tmp/" ++ show tid) $ prettyCallStack callStack
-- body

{-------------------------------------------------------------------------------
Wrap exceptions with a callstack
STM
-------------------------------------------------------------------------------}

data STMException = STMException CallStack SomeException
deriving stock (Show)
deriving anyclass (Exception)

-- | Rethrow STM exceptions with a callstakc
-- | Run STM transaction
--
-- This is especially helpful to track down "blocked indefinitely" exceptions.
-- When running an STM tranaction throws a "blocked indefinitely" exception,
-- there should be no point running it again; after all, it is blocked
-- /indefinitely/. It seems however that this is not in fact guaranteed by the
-- STM implementation, and the grapesy test suite manages to trigger a case
-- where the test fails with this exception even though the thread definitely
-- is /not/ blocked indefinitely, and indeed simply trying again is enough.
--
-- Implementation note: To catch such exceptions, we /must/ have the exception
-- handler /outside/ of the STM transaction.
atomically :: HasCallStack => STM a -> IO a
atomically action =
STM.atomically action `catch` (throwIO . STMException callStack )
-- (The documentation of @Control.Concurrent@ at
-- <https://hackage.haskell.org/package/base-4.19.0.0/docs/Control-Concurrent.html#g:14>
-- mentions an exceptional case related to finalizers
-- (see also <https://gitlab.haskell.org/ghc/ghc/-/issues/11001>)
-- but that is not our case here.
--
-- This is obviously a huge hack and has got to be of the worst functions I have
-- ever written (and I have written some pretty horrible functions in my past).
--
-- TODO: Fix this properly.
-- TODO: Is 1 guarded attempt enough..? Do we need more..? Unlimited..?
atomically :: forall a. STM a -> IO a
atomically stm = go 1
where
go ::
Int -- ^ Number of guarded attempts left
-> IO a
go 0 = run
go n = run `catch` \BlockedIndefinitelyOnSTM{} -> go (n - 1)

run :: IO a
run = STM.atomically stm
36 changes: 25 additions & 11 deletions src/Network/GRPC/Util/Session/Channel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ module Network.GRPC.Util.Session.Channel (
, DebugMsg(..)
) where

import Control.Exception
import Control.Exception hiding (try)
import Control.Monad.Catch
import Control.Tracer
import Data.Bifunctor
Expand Down Expand Up @@ -163,6 +163,13 @@ data RegularFlowState flow = RegularFlowState {
, flowTerminated :: TMVar (Trailers flow)
}

-- | 'Show' instance is useful in combination with @stm-debug@ only
deriving instance (
Show (Headers flow)
, Show (TMVar (StreamElem (Trailers flow) (Message flow)))
, Show (TMVar (Trailers flow))
) => Show (RegularFlowState flow)

{-------------------------------------------------------------------------------
Initialization
-------------------------------------------------------------------------------}
Expand Down Expand Up @@ -324,7 +331,11 @@ recv Channel{channelInbound, channelRecvFinal} = do
case readFinal of
Just cs -> throwSTM $ RecvAfterFinal cs
Nothing -> do
st <- readTMVar =<< getThreadInterface channelInbound
-- We get the TMVar in the same transaction as reading from it (below).
-- This means that /if/ the thread running 'recvMessageLoop' throws an
-- exception and is killed, the 'takeTMVar' below cannot block
-- indefinitely (because the transaction will be retried).
st <- readTMVar =<< getThreadInterface channelInbound
case st of
FlowStateRegular regular -> do
msg <- takeTMVar (flowMsg regular)
Expand Down Expand Up @@ -369,7 +380,7 @@ data RecvAfterFinal =
-- | Wait for the outbound thread to terminate
--
-- See 'forceClose' for discussion.
waitForOutbound :: Channel sess -> IO (FlowState (Outbound sess))
waitForOutbound :: HasCallStack => Channel sess -> IO (FlowState (Outbound sess))
waitForOutbound Channel{channelOutbound} = atomically $
readTMVar =<< waitForThread channelOutbound

Expand Down Expand Up @@ -448,14 +459,17 @@ data ChannelClosed =
-------------------------------------------------------------------------------}

-- | Send all messages to the node's peer
--
-- Should be called with exceptions masked.
sendMessageLoop :: forall sess.
IsSession sess
=> sess
-> (forall x. IO x -> IO x) -- ^ Unmask
-> Tracer IO (DebugMsg sess)
-> RegularFlowState (Outbound sess)
-> OutputStream
-> IO ()
sendMessageLoop sess tracer st stream =
sendMessageLoop sess unmask tracer st stream =
go $ buildMsg sess (flowHeaders st)
where
go :: (Message (Outbound sess) -> Builder) -> IO ()
Expand All @@ -466,7 +480,9 @@ sendMessageLoop sess tracer st stream =
loop :: IO (Trailers (Outbound sess))
loop = do
traceWith tracer $ NodeSendAwaitMsg
msg <- atomically $ takeTMVar (flowMsg st)
-- Technically the call to unmask is necessary here, as takeTMVar
-- is interuptible.
msg <- unmask $ atomically $ takeTMVar (flowMsg st)
traceWith tracer $ NodeSendMsg msg

case msg of
Expand All @@ -492,11 +508,12 @@ sendMessageLoop sess tracer st stream =
recvMessageLoop :: forall sess.
IsSession sess
=> sess
-> (forall x. IO x -> IO x) -- ^ Unmask
-> Tracer IO (DebugMsg sess)
-> RegularFlowState (Inbound sess)
-> InputStream
-> IO ()
recvMessageLoop sess tracer st stream =
recvMessageLoop sess unmask tracer st stream = do
go $ parseMsg sess (flowHeaders st)
where
go :: Parser (Message (Inbound sess)) -> IO ()
Expand All @@ -507,22 +524,19 @@ recvMessageLoop sess tracer st stream =
atomically $ putTMVar (flowMsg st) $ NoMoreElems trailers
where
loop :: Parser (Message (Inbound sess)) -> IO [HTTP.Header]
loop (ParserError err) =
loop (ParserError err) = do
throwIO $ PeerSentMalformedMessage err
loop (ParserDone x p') = do
traceWith tracer $ NodeRecvMsg (StreamElem x)
atomically $ putTMVar (flowMsg st) $ StreamElem x
loop p'
loop (ParserNeedsData acc p') = do
traceWith tracer $ NodeNeedsData acc
bs <- getChunk stream

bs <- unmask $ getChunk stream
if | not (BS.Strict.null bs) ->
loop $ p' bs

| not (BS.Lazy.null acc) ->
throwIO PeerSentIncompleteMessage

| otherwise ->
getTrailers stream

Expand Down
9 changes: 4 additions & 5 deletions src/Network/GRPC/Util/Session/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,10 @@ setupRequestChannel sess tracer ConnectionToServer{sendRequest} outboundStart =
(requestHeaders requestInfo)
$ \unmask write' flush' -> unmask $ do
threadBody (channelOutbound channel)
unmask
(newTMVarIO (FlowStateRegular regular))
$ \_stVar -> do
stream <- clientOutputStream write' flush'
sendMessageLoop sess tracer regular stream
sendMessageLoop sess unmask tracer regular stream
forkRequest channel req
FlowStartNoMessages trailers -> do
stVar <- newTMVarIO $ FlowStateNoMessages trailers
Expand All @@ -111,7 +110,7 @@ setupRequestChannel sess tracer ConnectionToServer{sendRequest} outboundStart =
where
forkRequest :: Channel sess -> Client.Request -> IO ()
forkRequest channel req =
forkThread (channelInbound channel) newEmptyTMVarIO $ \stVar -> do
forkThread (channelInbound channel) newEmptyTMVarIO $ \unmask stVar -> do
setup <- try $ sendRequest req $ \resp -> do
responseStatus <- case Client.responseStatus resp of
Just x -> return x
Expand All @@ -131,8 +130,8 @@ setupRequestChannel sess tracer ConnectionToServer{sendRequest} outboundStart =
regular <- initFlowStateRegular headers
stream <- clientInputStream resp
atomically $ putTMVar stVar $ FlowStateRegular regular
recvMessageLoop sess tracer regular stream
FlowStartNoMessages trailers ->
recvMessageLoop sess unmask tracer regular stream
FlowStartNoMessages trailers -> do
atomically $ putTMVar stVar $ FlowStateNoMessages trailers

case setup of
Expand Down
13 changes: 7 additions & 6 deletions src/Network/GRPC/Util/Session/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Network.GRPC.Util.Session.Server (
import Control.Exception
import Control.Tracer
import Network.HTTP2.Server qualified as Server
import GHC.Stack

import Network.GRPC.Util.Concurrency
import Network.GRPC.Util.HTTP2 (fromHeaderTable)
Expand All @@ -33,7 +34,7 @@ data ConnectionToClient = ConnectionToClient {
--
-- The actual response will not immediately be initiated; see below.
setupResponseChannel :: forall sess.
AcceptSession sess
(AcceptSession sess, HasCallStack)
=> sess
-> Tracer IO (DebugMsg sess)
-> ConnectionToClient
Expand Down Expand Up @@ -66,17 +67,17 @@ setupResponseChannel sess tracer conn startOutbound = do
else
FlowStartRegular <$> parseRequestRegular sess requestInfo

forkThread (channelInbound channel) newEmptyTMVarIO $ \stVar ->
forkThread (channelInbound channel) newEmptyTMVarIO $ \unmask stVar -> do
case inboundStart of
FlowStartRegular headers -> do
regular <- initFlowStateRegular headers
stream <- serverInputStream (request conn)
atomically $ putTMVar stVar $ FlowStateRegular regular
recvMessageLoop sess tracer regular stream
FlowStartNoMessages trailers ->
recvMessageLoop sess unmask tracer regular stream
FlowStartNoMessages trailers -> do
atomically $ putTMVar stVar $ FlowStateNoMessages trailers

forkThread (channelOutbound channel) newEmptyTMVarIO $ \stVar -> do
forkThread (channelOutbound channel) newEmptyTMVarIO $ \unmask stVar -> do
outboundStart <- startOutbound inboundStart
let responseInfo = buildResponseInfo sess outboundStart
case outboundStart of
Expand All @@ -90,7 +91,7 @@ setupResponseChannel sess tracer conn startOutbound = do
(responseHeaders responseInfo)
$ \write' flush' -> do
stream <- serverOutputStream write' flush'
sendMessageLoop sess tracer regular stream
sendMessageLoop sess unmask tracer regular stream
respond conn resp
FlowStartNoMessages trailers -> do
atomically $ putTMVar stVar $ FlowStateNoMessages trailers
Expand Down
Loading

0 comments on commit 882a0dd

Please sign in to comment.