Skip to content

Commit

Permalink
More early termination problems
Browse files Browse the repository at this point in the history
Turns out, the grapesy test suite is Too Powerful and has in fact revealed an
STM bug 😱 See `atomically` for details.
  • Loading branch information
edsko committed Nov 11, 2023
1 parent 0148d3d commit f11ace2
Show file tree
Hide file tree
Showing 15 changed files with 319 additions and 120 deletions.
14 changes: 10 additions & 4 deletions cabal.project
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
packages: .
packages:
.

package grapesy
tests: True
flags: +build-demo +build-stress-test

-- The version on Hackage also works, it just writes some (unnecessary)
-- exceptions to stderr
-- https://github.com/kazu-yamamoto/http2/pull/97
source-repository-package
type: git
location: https://github.com/edsko/http2.git
tag: c2a2994ed45a08998c2b6eb22a08f28b9eb36c3c
tag: d7c848f6a6e4b2d809af4304ea5f5fbcdc58cd52

-- https://github.com/kazu-yamamoto/http2-tls/pull/4
source-repository-package
type: git
location: https://github.com/edsko/http2-tls.git
tag: 4fdfe7c32177c5a483916623cb8f93525761cff6
9 changes: 7 additions & 2 deletions grapesy.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ library
Network.GRPC.Server.Run
Network.GRPC.Server.StreamType
Network.GRPC.Spec
other-modules:

-- other-modules:

Network.GRPC.Client.Call
Network.GRPC.Client.Connection
Network.GRPC.Client.Meta
Expand Down Expand Up @@ -195,6 +197,8 @@ test-suite test-grapesy
ghc-options:
-Wprepositive-qualified-module
-threaded
-rtsopts
-debug
type:
exitcode-stdio-1.0
hs-source-dirs:
Expand Down Expand Up @@ -232,7 +236,7 @@ test-suite test-grapesy
, pretty-show >= 1.10 && < 1.11
, QuickCheck >= 2.14 && < 2.15
, stm >= 2.5 && < 2.6
, tasty >= 1.4 && < 1.5
, tasty >= 1.4 && < 1.6
, tasty-hunit >= 0.10 && < 0.11
, tasty-quickcheck >= 0.10 && < 0.11
, text >= 1.2 && < 2.1
Expand Down Expand Up @@ -353,6 +357,7 @@ executable test-stress
, containers >= 0.6 && < 0.7
, contra-tracer >= 0.2 && < 0.3
, data-default >= 0.7 && < 0.8
, exceptions >= 0.10 && < 0.11
, http2
, optparse-applicative >= 0.16 && < 0.19
, pretty-show >= 1.10 && < 1.11
Expand Down
2 changes: 1 addition & 1 deletion src/Network/GRPC/Client/Call.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import Network.GRPC.Util.Session qualified as Session
-- when the body is run. If you want to be sure that the call has been setup,
-- you can call 'recvResponseMetadata'.
withRPC :: forall m rpc a.
(MonadMask m, MonadIO m, IsRPC rpc)
(MonadMask m, MonadIO m, IsRPC rpc, HasCallStack)
=> Connection -> CallParams -> Proxy rpc -> (Call rpc -> m a) -> m a
withRPC conn callParams proxy k =
(throwUnclean =<<) $
Expand Down
11 changes: 3 additions & 8 deletions src/Network/GRPC/Client/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -470,14 +470,9 @@ stayConnected connParams server connVar connCanClose =

-- TODO: This is currently only used for the HTTP case, not HTTPS
clientConfig :: Authority -> Scheme -> HTTP2.Client.ClientConfig
clientConfig auth scheme = HTTP2.Client.ClientConfig {
scheme = rawScheme serverPseudoHeaders
, authority = rawAuthority serverPseudoHeaders

-- Docs describe this as "How many pushed responses are contained in
-- the cache". Since gRPC does not make use of HTTP2 server push, we
-- set it to 0.
, cacheLimit = 0
clientConfig auth scheme = HTTP2.Client.defaultClientConfig {
HTTP2.Client.scheme = rawScheme serverPseudoHeaders
, HTTP2.Client.authority = rawAuthority serverPseudoHeaders
}
where
serverPseudoHeaders :: RawServerHeaders
Expand Down
2 changes: 0 additions & 2 deletions src/Network/GRPC/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ module Network.GRPC.Common (
-- ** Low-level
, Session.ChannelClosed(..)
, Session.ChannelUncleanClose(..)
, STMException(..)
, Thread.ThreadCancelled(..)
, Thread.ThreadInterfaceUnavailable(..)
) where
Expand All @@ -43,7 +42,6 @@ import Control.Exception

import Network.GRPC.Common.StreamElem (StreamElem(..))
import Network.GRPC.Spec
import Network.GRPC.Util.Concurrency (STMException(..))
import Network.GRPC.Util.Session qualified as Session
import Network.GRPC.Util.Thread qualified as Thread
import Network.GRPC.Util.TLS
Expand Down
4 changes: 2 additions & 2 deletions src/Network/GRPC/Server/Call.hs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ acceptCall params conn k = do
}
, outCompression = cOut
}
KickoffTrailersOnly trailers ->
KickoffTrailersOnly trailers -> do
return $ Session.FlowStartNoMessages trailers
where
inboundHeaders :: RequestHeaders
Expand Down Expand Up @@ -392,7 +392,7 @@ forwardException call err = do
--
-- We do not return trailers, since gRPC does not support sending trailers from
-- the client to the server (only from the server to the client).
recvInput :: HasCallStack => Call rpc -> IO (StreamElem NoMetadata (Input rpc))
recvInput :: forall rpc. HasCallStack => Call rpc -> IO (StreamElem NoMetadata (Input rpc))
recvInput call = atomically $ recvInputSTM call

-- | Send RPC output to the client
Expand Down
60 changes: 43 additions & 17 deletions src/Network/GRPC/Util/Concurrency.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,23 @@ module Network.GRPC.Util.Concurrency (
-- * @Control.Concurrent.STM@
, module ReexportSTM
, atomically
, STMException(..)
) where

import Prelude hiding (id)

import Control.Exception
import GHC.Stack

import Control.Concurrent as ReexportConcurrent hiding (forkIO, forkIOWithUnmask)
import Control.Concurrent.Async as ReexportAsync hiding (withAsync)
import Control.Concurrent.STM as ReexportSTM hiding (atomically)
import Control.Concurrent as ReexportConcurrent hiding (
forkIO
, forkIOWithUnmask
)
import Control.Concurrent.Async as ReexportAsync hiding (
withAsync
)
import Control.Concurrent.STM as ReexportSTM hiding (
atomically
)

import Control.Concurrent qualified as Concurrent
import Control.Concurrent.Async qualified as Async
Expand All @@ -43,26 +51,44 @@ withAsync :: HasCallStack => IO a -> (Async a -> IO b) -> IO b
withAsync = Async.withAsync . wrapThreadBody

wrapThreadBody :: HasCallStack => IO a -> IO a
wrapThreadBody = id
wrapThreadBody body = body
--wrapThreadBody body = do
-- tid <- myThreadId
-- writeFile ("tmp/" ++ show tid) $ prettyCallStack callStack
-- body

{-------------------------------------------------------------------------------
Wrap exceptions with a callstack
STM
-------------------------------------------------------------------------------}

data STMException = STMException CallStack SomeException
deriving stock (Show)
deriving anyclass (Exception)

-- | Rethrow STM exceptions with a callstakc
-- | Run STM transaction
--
-- This is especially helpful to track down "blocked indefinitely" exceptions.
-- When running an STM tranaction throws a "blocked indefinitely" exception,
-- there should be no point running it again; after all, it is blocked
-- /indefinitely/. It seems however that this is not in fact guaranteed by the
-- STM implementation, and the grapesy test suite manages to trigger a case
-- where the test fails with this exception even though the thread definitely
-- is /not/ blocked indefinitely, and indeed simply trying again is enough.
--
-- Implementation note: To catch such exceptions, we /must/ have the exception
-- handler /outside/ of the STM transaction.
atomically :: HasCallStack => STM a -> IO a
atomically action =
STM.atomically action `catch` (throwIO . STMException callStack )
-- (The documentation of @Control.Concurrent@ at
-- <https://hackage.haskell.org/package/base-4.19.0.0/docs/Control-Concurrent.html#g:14>
-- mentions an exceptional case related to finalizers
-- (see also <https://gitlab.haskell.org/ghc/ghc/-/issues/11001>)
-- but that is not our case here.
--
-- This is obviously a huge hack and has got to be of the worst functions I have
-- ever written (and I have written some pretty horrible functions in my past).
--
-- TODO: Fix this properly.
-- TODO: Is 1 guarded attempt enough..? Do we need more..? Unlimited..?
atomically :: forall a. STM a -> IO a
atomically stm = go 1
where
go ::
Int -- ^ Number of guarded attempts left
-> IO a
go 0 = run
go n = run `catch` \BlockedIndefinitelyOnSTM{} -> go (n - 1)

run :: IO a
run = STM.atomically stm
48 changes: 36 additions & 12 deletions src/Network/GRPC/Util/Session/Channel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ module Network.GRPC.Util.Session.Channel (
, DebugMsg(..)
) where

import Control.Exception
import Control.Exception hiding (try)
import Control.Monad.Catch
import Control.Tracer
import Data.Bifunctor
Expand Down Expand Up @@ -163,6 +163,13 @@ data RegularFlowState flow = RegularFlowState {
, flowTerminated :: TMVar (Trailers flow)
}

-- | 'Show' instance is useful in combination with @stm-debug@ only
deriving instance (
Show (Headers flow)
, Show (TMVar (StreamElem (Trailers flow) (Message flow)))
, Show (TMVar (Trailers flow))
) => Show (RegularFlowState flow)

{-------------------------------------------------------------------------------
Initialization
-------------------------------------------------------------------------------}
Expand Down Expand Up @@ -324,11 +331,16 @@ recv Channel{channelInbound, channelRecvFinal} = do
case readFinal of
Just cs -> throwSTM $ RecvAfterFinal cs
Nothing -> do
st <- readTMVar =<< getThreadInterface channelInbound
-- We get the TMVar in the same transaction as reading from it (below).
-- This means that /if/ the thread running 'recvMessageLoop' throws an
-- exception and is killed, the 'takeTMVar' below cannot block
-- indefinitely.
iface <- getThreadInterface channelInbound
st <- readTMVar iface
case st of
FlowStateRegular regular -> do
msg <- takeTMVar (flowMsg regular)
-- We update 'channelRecvFinal' in the same tx as the read, to
-- We update 'channelRecvFinal' in the same tx as the read, to
-- atomically change from "there is a value" to "all values read".
StreamElem.whenDefinitelyFinal msg $ \_trailers ->
writeTVar channelRecvFinal $ Just callStack
Expand Down Expand Up @@ -369,7 +381,7 @@ data RecvAfterFinal =
-- | Wait for the outbound thread to terminate
--
-- See 'forceClose' for discussion.
waitForOutbound :: Channel sess -> IO (FlowState (Outbound sess))
waitForOutbound :: HasCallStack => Channel sess -> IO (FlowState (Outbound sess))
waitForOutbound Channel{channelOutbound} = atomically $
readTMVar =<< waitForThread channelOutbound

Expand Down Expand Up @@ -448,14 +460,17 @@ data ChannelClosed =
-------------------------------------------------------------------------------}

-- | Send all messages to the node's peer
--
-- Should be called with exceptions masked.
sendMessageLoop :: forall sess.
IsSession sess
=> sess
-> (forall x. IO x -> IO x) -- ^ Unmask
-> Tracer IO (DebugMsg sess)
-> RegularFlowState (Outbound sess)
-> OutputStream
-> IO ()
sendMessageLoop sess tracer st stream =
sendMessageLoop sess unmask tracer st stream =
go $ buildMsg sess (flowHeaders st)
where
go :: (Message (Outbound sess) -> Builder) -> IO ()
Expand All @@ -466,7 +481,9 @@ sendMessageLoop sess tracer st stream =
loop :: IO (Trailers (Outbound sess))
loop = do
traceWith tracer $ NodeSendAwaitMsg
msg <- atomically $ takeTMVar (flowMsg st)
-- Technically the call to unmask is necessary here, as takeTMVar
-- is interuptible.
msg <- unmask $ atomically $ takeTMVar (flowMsg st)
traceWith tracer $ NodeSendMsg msg

case msg of
Expand All @@ -492,11 +509,12 @@ sendMessageLoop sess tracer st stream =
recvMessageLoop :: forall sess.
IsSession sess
=> sess
-> (forall x. IO x -> IO x) -- ^ Unmask
-> Tracer IO (DebugMsg sess)
-> RegularFlowState (Inbound sess)
-> InputStream
-> IO ()
recvMessageLoop sess tracer st stream =
recvMessageLoop sess unmask tracer st stream = do
go $ parseMsg sess (flowHeaders st)
where
go :: Parser (Message (Inbound sess)) -> IO ()
Expand All @@ -507,23 +525,29 @@ recvMessageLoop sess tracer st stream =
atomically $ putTMVar (flowMsg st) $ NoMoreElems trailers
where
loop :: Parser (Message (Inbound sess)) -> IO [HTTP.Header]
loop (ParserError err) =
loop (ParserError err) = do
throwIO $ PeerSentMalformedMessage err
loop (ParserDone x p') = do
traceWith tracer $ NodeRecvMsg (StreamElem x)
atomically $ putTMVar (flowMsg st) $ StreamElem x
loop p'
loop (ParserNeedsData acc p') = do
traceWith tracer $ NodeNeedsData acc
bs <- getChunk stream
mbs <- unmask $ try $ getChunk stream
bs <- case mbs of
Left (err :: SomeException) -> do
throwIO err
Right bs ->
return bs


if | not (BS.Strict.null bs) ->
if | not (BS.Strict.null bs) -> do
loop $ p' bs

| not (BS.Lazy.null acc) ->
| not (BS.Lazy.null acc) -> do
throwIO PeerSentIncompleteMessage

| otherwise ->
| otherwise -> do
getTrailers stream

outboundTrailersMaker :: forall sess.
Expand Down
9 changes: 4 additions & 5 deletions src/Network/GRPC/Util/Session/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,10 @@ setupRequestChannel sess tracer ConnectionToServer{sendRequest} outboundStart =
(requestHeaders requestInfo)
$ \unmask write' flush' -> unmask $ do
threadBody (channelOutbound channel)
unmask
(newTMVarIO (FlowStateRegular regular))
$ \_stVar -> do
stream <- clientOutputStream write' flush'
sendMessageLoop sess tracer regular stream
sendMessageLoop sess unmask tracer regular stream
forkRequest channel req
FlowStartNoMessages trailers -> do
stVar <- newTMVarIO $ FlowStateNoMessages trailers
Expand All @@ -111,7 +110,7 @@ setupRequestChannel sess tracer ConnectionToServer{sendRequest} outboundStart =
where
forkRequest :: Channel sess -> Client.Request -> IO ()
forkRequest channel req =
forkThread (channelInbound channel) newEmptyTMVarIO $ \stVar -> do
forkThread (channelInbound channel) newEmptyTMVarIO $ \unmask stVar -> do
setup <- try $ sendRequest req $ \resp -> do
responseStatus <- case Client.responseStatus resp of
Just x -> return x
Expand All @@ -131,8 +130,8 @@ setupRequestChannel sess tracer ConnectionToServer{sendRequest} outboundStart =
regular <- initFlowStateRegular headers
stream <- clientInputStream resp
atomically $ putTMVar stVar $ FlowStateRegular regular
recvMessageLoop sess tracer regular stream
FlowStartNoMessages trailers ->
recvMessageLoop sess unmask tracer regular stream
FlowStartNoMessages trailers -> do
atomically $ putTMVar stVar $ FlowStateNoMessages trailers

case setup of
Expand Down
Loading

0 comments on commit f11ace2

Please sign in to comment.