From f11ace2eb3d4e53a6a5c996d976b9a265ad2b6a3 Mon Sep 17 00:00:00 2001 From: Edsko de Vries Date: Fri, 29 Sep 2023 14:16:37 +0200 Subject: [PATCH] More early termination problems Turns out, the grapesy test suite is Too Powerful and has in fact revealed an STM bug :scream: See `atomically` for details. --- cabal.project | 14 +- grapesy.cabal | 9 +- src/Network/GRPC/Client/Call.hs | 2 +- src/Network/GRPC/Client/Connection.hs | 11 +- src/Network/GRPC/Common.hs | 2 - src/Network/GRPC/Server/Call.hs | 4 +- src/Network/GRPC/Util/Concurrency.hs | 60 +++++-- src/Network/GRPC/Util/Session/Channel.hs | 48 ++++-- src/Network/GRPC/Util/Session/Client.hs | 9 +- src/Network/GRPC/Util/Session/Server.hs | 13 +- src/Network/GRPC/Util/Thread.hs | 16 +- test-common/Test/Util/ClientServer.hs | 74 ++++++--- test-grapesy/Main.hs | 1 + .../Test/Driver/Dialogue/Execution.hs | 149 ++++++++++++++---- test-grapesy/Test/Prop/Dialogue.hs | 27 +++- 15 files changed, 319 insertions(+), 120 deletions(-) diff --git a/cabal.project b/cabal.project index 3f1d8bae..4f01dec8 100644 --- a/cabal.project +++ b/cabal.project @@ -1,12 +1,18 @@ -packages: . +packages: + . package grapesy tests: True flags: +build-demo +build-stress-test --- The version on Hackage also works, it just writes some (unnecessary) --- exceptions to stderr +-- https://github.com/kazu-yamamoto/http2/pull/97 source-repository-package type: git location: https://github.com/edsko/http2.git - tag: c2a2994ed45a08998c2b6eb22a08f28b9eb36c3c + tag: d7c848f6a6e4b2d809af4304ea5f5fbcdc58cd52 + +-- https://github.com/kazu-yamamoto/http2-tls/pull/4 +source-repository-package + type: git + location: https://github.com/edsko/http2-tls.git + tag: 4fdfe7c32177c5a483916623cb8f93525761cff6 diff --git a/grapesy.cabal b/grapesy.cabal index 4dd4e35b..bb8e9865 100644 --- a/grapesy.cabal +++ b/grapesy.cabal @@ -89,7 +89,9 @@ library Network.GRPC.Server.Run Network.GRPC.Server.StreamType Network.GRPC.Spec - other-modules: + + -- other-modules: + Network.GRPC.Client.Call Network.GRPC.Client.Connection Network.GRPC.Client.Meta @@ -195,6 +197,8 @@ test-suite test-grapesy ghc-options: -Wprepositive-qualified-module -threaded + -rtsopts + -debug type: exitcode-stdio-1.0 hs-source-dirs: @@ -232,7 +236,7 @@ test-suite test-grapesy , pretty-show >= 1.10 && < 1.11 , QuickCheck >= 2.14 && < 2.15 , stm >= 2.5 && < 2.6 - , tasty >= 1.4 && < 1.5 + , tasty >= 1.4 && < 1.6 , tasty-hunit >= 0.10 && < 0.11 , tasty-quickcheck >= 0.10 && < 0.11 , text >= 1.2 && < 2.1 @@ -353,6 +357,7 @@ executable test-stress , containers >= 0.6 && < 0.7 , contra-tracer >= 0.2 && < 0.3 , data-default >= 0.7 && < 0.8 + , exceptions >= 0.10 && < 0.11 , http2 , optparse-applicative >= 0.16 && < 0.19 , pretty-show >= 1.10 && < 1.11 diff --git a/src/Network/GRPC/Client/Call.hs b/src/Network/GRPC/Client/Call.hs index 586baa85..88acd92e 100644 --- a/src/Network/GRPC/Client/Call.hs +++ b/src/Network/GRPC/Client/Call.hs @@ -54,7 +54,7 @@ import Network.GRPC.Util.Session qualified as Session -- when the body is run. If you want to be sure that the call has been setup, -- you can call 'recvResponseMetadata'. withRPC :: forall m rpc a. - (MonadMask m, MonadIO m, IsRPC rpc) + (MonadMask m, MonadIO m, IsRPC rpc, HasCallStack) => Connection -> CallParams -> Proxy rpc -> (Call rpc -> m a) -> m a withRPC conn callParams proxy k = (throwUnclean =<<) $ diff --git a/src/Network/GRPC/Client/Connection.hs b/src/Network/GRPC/Client/Connection.hs index 90031f73..84ea6a31 100644 --- a/src/Network/GRPC/Client/Connection.hs +++ b/src/Network/GRPC/Client/Connection.hs @@ -470,14 +470,9 @@ stayConnected connParams server connVar connCanClose = -- TODO: This is currently only used for the HTTP case, not HTTPS clientConfig :: Authority -> Scheme -> HTTP2.Client.ClientConfig - clientConfig auth scheme = HTTP2.Client.ClientConfig { - scheme = rawScheme serverPseudoHeaders - , authority = rawAuthority serverPseudoHeaders - - -- Docs describe this as "How many pushed responses are contained in - -- the cache". Since gRPC does not make use of HTTP2 server push, we - -- set it to 0. - , cacheLimit = 0 + clientConfig auth scheme = HTTP2.Client.defaultClientConfig { + HTTP2.Client.scheme = rawScheme serverPseudoHeaders + , HTTP2.Client.authority = rawAuthority serverPseudoHeaders } where serverPseudoHeaders :: RawServerHeaders diff --git a/src/Network/GRPC/Common.hs b/src/Network/GRPC/Common.hs index c8c0a336..f6602f74 100644 --- a/src/Network/GRPC/Common.hs +++ b/src/Network/GRPC/Common.hs @@ -34,7 +34,6 @@ module Network.GRPC.Common ( -- ** Low-level , Session.ChannelClosed(..) , Session.ChannelUncleanClose(..) - , STMException(..) , Thread.ThreadCancelled(..) , Thread.ThreadInterfaceUnavailable(..) ) where @@ -43,7 +42,6 @@ import Control.Exception import Network.GRPC.Common.StreamElem (StreamElem(..)) import Network.GRPC.Spec -import Network.GRPC.Util.Concurrency (STMException(..)) import Network.GRPC.Util.Session qualified as Session import Network.GRPC.Util.Thread qualified as Thread import Network.GRPC.Util.TLS diff --git a/src/Network/GRPC/Server/Call.hs b/src/Network/GRPC/Server/Call.hs index 1e84fc13..01767e5f 100644 --- a/src/Network/GRPC/Server/Call.hs +++ b/src/Network/GRPC/Server/Call.hs @@ -261,7 +261,7 @@ acceptCall params conn k = do } , outCompression = cOut } - KickoffTrailersOnly trailers -> + KickoffTrailersOnly trailers -> do return $ Session.FlowStartNoMessages trailers where inboundHeaders :: RequestHeaders @@ -392,7 +392,7 @@ forwardException call err = do -- -- We do not return trailers, since gRPC does not support sending trailers from -- the client to the server (only from the server to the client). -recvInput :: HasCallStack => Call rpc -> IO (StreamElem NoMetadata (Input rpc)) +recvInput :: forall rpc. HasCallStack => Call rpc -> IO (StreamElem NoMetadata (Input rpc)) recvInput call = atomically $ recvInputSTM call -- | Send RPC output to the client diff --git a/src/Network/GRPC/Util/Concurrency.hs b/src/Network/GRPC/Util/Concurrency.hs index 2152887d..55ab1505 100644 --- a/src/Network/GRPC/Util/Concurrency.hs +++ b/src/Network/GRPC/Util/Concurrency.hs @@ -11,15 +11,23 @@ module Network.GRPC.Util.Concurrency ( -- * @Control.Concurrent.STM@ , module ReexportSTM , atomically - , STMException(..) ) where +import Prelude hiding (id) + import Control.Exception import GHC.Stack -import Control.Concurrent as ReexportConcurrent hiding (forkIO, forkIOWithUnmask) -import Control.Concurrent.Async as ReexportAsync hiding (withAsync) -import Control.Concurrent.STM as ReexportSTM hiding (atomically) +import Control.Concurrent as ReexportConcurrent hiding ( + forkIO + , forkIOWithUnmask + ) +import Control.Concurrent.Async as ReexportAsync hiding ( + withAsync + ) +import Control.Concurrent.STM as ReexportSTM hiding ( + atomically + ) import Control.Concurrent qualified as Concurrent import Control.Concurrent.Async qualified as Async @@ -43,26 +51,44 @@ withAsync :: HasCallStack => IO a -> (Async a -> IO b) -> IO b withAsync = Async.withAsync . wrapThreadBody wrapThreadBody :: HasCallStack => IO a -> IO a -wrapThreadBody = id +wrapThreadBody body = body --wrapThreadBody body = do -- tid <- myThreadId -- writeFile ("tmp/" ++ show tid) $ prettyCallStack callStack -- body {------------------------------------------------------------------------------- - Wrap exceptions with a callstack + STM -------------------------------------------------------------------------------} -data STMException = STMException CallStack SomeException - deriving stock (Show) - deriving anyclass (Exception) - --- | Rethrow STM exceptions with a callstakc +-- | Run STM transaction -- --- This is especially helpful to track down "blocked indefinitely" exceptions. +-- When running an STM tranaction throws a "blocked indefinitely" exception, +-- there should be no point running it again; after all, it is blocked +-- /indefinitely/. It seems however that this is not in fact guaranteed by the +-- STM implementation, and the grapesy test suite manages to trigger a case +-- where the test fails with this exception even though the thread definitely +-- is /not/ blocked indefinitely, and indeed simply trying again is enough. -- --- Implementation note: To catch such exceptions, we /must/ have the exception --- handler /outside/ of the STM transaction. -atomically :: HasCallStack => STM a -> IO a -atomically action = - STM.atomically action `catch` (throwIO . STMException callStack ) +-- (The documentation of @Control.Concurrent@ at +-- +-- mentions an exceptional case related to finalizers +-- (see also ) +-- but that is not our case here. +-- +-- This is obviously a huge hack and has got to be of the worst functions I have +-- ever written (and I have written some pretty horrible functions in my past). +-- +-- TODO: Fix this properly. +-- TODO: Is 1 guarded attempt enough..? Do we need more..? Unlimited..? +atomically :: forall a. STM a -> IO a +atomically stm = go 1 + where + go :: + Int -- ^ Number of guarded attempts left + -> IO a + go 0 = run + go n = run `catch` \BlockedIndefinitelyOnSTM{} -> go (n - 1) + + run :: IO a + run = STM.atomically stm diff --git a/src/Network/GRPC/Util/Session/Channel.hs b/src/Network/GRPC/Util/Session/Channel.hs index 7358b9d9..36e43ed1 100644 --- a/src/Network/GRPC/Util/Session/Channel.hs +++ b/src/Network/GRPC/Util/Session/Channel.hs @@ -35,7 +35,7 @@ module Network.GRPC.Util.Session.Channel ( , DebugMsg(..) ) where -import Control.Exception +import Control.Exception hiding (try) import Control.Monad.Catch import Control.Tracer import Data.Bifunctor @@ -163,6 +163,13 @@ data RegularFlowState flow = RegularFlowState { , flowTerminated :: TMVar (Trailers flow) } +-- | 'Show' instance is useful in combination with @stm-debug@ only +deriving instance ( + Show (Headers flow) + , Show (TMVar (StreamElem (Trailers flow) (Message flow))) + , Show (TMVar (Trailers flow)) + ) => Show (RegularFlowState flow) + {------------------------------------------------------------------------------- Initialization -------------------------------------------------------------------------------} @@ -324,11 +331,16 @@ recv Channel{channelInbound, channelRecvFinal} = do case readFinal of Just cs -> throwSTM $ RecvAfterFinal cs Nothing -> do - st <- readTMVar =<< getThreadInterface channelInbound + -- We get the TMVar in the same transaction as reading from it (below). + -- This means that /if/ the thread running 'recvMessageLoop' throws an + -- exception and is killed, the 'takeTMVar' below cannot block + -- indefinitely. + iface <- getThreadInterface channelInbound + st <- readTMVar iface case st of FlowStateRegular regular -> do msg <- takeTMVar (flowMsg regular) - -- We update 'channelRecvFinal' in the same tx as the read, to + -- We update 'channelRecvFinal' in the same tx as the read, to -- atomically change from "there is a value" to "all values read". StreamElem.whenDefinitelyFinal msg $ \_trailers -> writeTVar channelRecvFinal $ Just callStack @@ -369,7 +381,7 @@ data RecvAfterFinal = -- | Wait for the outbound thread to terminate -- -- See 'forceClose' for discussion. -waitForOutbound :: Channel sess -> IO (FlowState (Outbound sess)) +waitForOutbound :: HasCallStack => Channel sess -> IO (FlowState (Outbound sess)) waitForOutbound Channel{channelOutbound} = atomically $ readTMVar =<< waitForThread channelOutbound @@ -448,14 +460,17 @@ data ChannelClosed = -------------------------------------------------------------------------------} -- | Send all messages to the node's peer +-- +-- Should be called with exceptions masked. sendMessageLoop :: forall sess. IsSession sess => sess + -> (forall x. IO x -> IO x) -- ^ Unmask -> Tracer IO (DebugMsg sess) -> RegularFlowState (Outbound sess) -> OutputStream -> IO () -sendMessageLoop sess tracer st stream = +sendMessageLoop sess unmask tracer st stream = go $ buildMsg sess (flowHeaders st) where go :: (Message (Outbound sess) -> Builder) -> IO () @@ -466,7 +481,9 @@ sendMessageLoop sess tracer st stream = loop :: IO (Trailers (Outbound sess)) loop = do traceWith tracer $ NodeSendAwaitMsg - msg <- atomically $ takeTMVar (flowMsg st) + -- Technically the call to unmask is necessary here, as takeTMVar + -- is interuptible. + msg <- unmask $ atomically $ takeTMVar (flowMsg st) traceWith tracer $ NodeSendMsg msg case msg of @@ -492,11 +509,12 @@ sendMessageLoop sess tracer st stream = recvMessageLoop :: forall sess. IsSession sess => sess + -> (forall x. IO x -> IO x) -- ^ Unmask -> Tracer IO (DebugMsg sess) -> RegularFlowState (Inbound sess) -> InputStream -> IO () -recvMessageLoop sess tracer st stream = +recvMessageLoop sess unmask tracer st stream = do go $ parseMsg sess (flowHeaders st) where go :: Parser (Message (Inbound sess)) -> IO () @@ -507,7 +525,7 @@ recvMessageLoop sess tracer st stream = atomically $ putTMVar (flowMsg st) $ NoMoreElems trailers where loop :: Parser (Message (Inbound sess)) -> IO [HTTP.Header] - loop (ParserError err) = + loop (ParserError err) = do throwIO $ PeerSentMalformedMessage err loop (ParserDone x p') = do traceWith tracer $ NodeRecvMsg (StreamElem x) @@ -515,15 +533,21 @@ recvMessageLoop sess tracer st stream = loop p' loop (ParserNeedsData acc p') = do traceWith tracer $ NodeNeedsData acc - bs <- getChunk stream + mbs <- unmask $ try $ getChunk stream + bs <- case mbs of + Left (err :: SomeException) -> do + throwIO err + Right bs -> + return bs + - if | not (BS.Strict.null bs) -> + if | not (BS.Strict.null bs) -> do loop $ p' bs - | not (BS.Lazy.null acc) -> + | not (BS.Lazy.null acc) -> do throwIO PeerSentIncompleteMessage - | otherwise -> + | otherwise -> do getTrailers stream outboundTrailersMaker :: forall sess. diff --git a/src/Network/GRPC/Util/Session/Client.hs b/src/Network/GRPC/Util/Session/Client.hs index 01e780ab..b4e974a4 100644 --- a/src/Network/GRPC/Util/Session/Client.hs +++ b/src/Network/GRPC/Util/Session/Client.hs @@ -91,11 +91,10 @@ setupRequestChannel sess tracer ConnectionToServer{sendRequest} outboundStart = (requestHeaders requestInfo) $ \unmask write' flush' -> unmask $ do threadBody (channelOutbound channel) - unmask (newTMVarIO (FlowStateRegular regular)) $ \_stVar -> do stream <- clientOutputStream write' flush' - sendMessageLoop sess tracer regular stream + sendMessageLoop sess unmask tracer regular stream forkRequest channel req FlowStartNoMessages trailers -> do stVar <- newTMVarIO $ FlowStateNoMessages trailers @@ -111,7 +110,7 @@ setupRequestChannel sess tracer ConnectionToServer{sendRequest} outboundStart = where forkRequest :: Channel sess -> Client.Request -> IO () forkRequest channel req = - forkThread (channelInbound channel) newEmptyTMVarIO $ \stVar -> do + forkThread (channelInbound channel) newEmptyTMVarIO $ \unmask stVar -> do setup <- try $ sendRequest req $ \resp -> do responseStatus <- case Client.responseStatus resp of Just x -> return x @@ -131,8 +130,8 @@ setupRequestChannel sess tracer ConnectionToServer{sendRequest} outboundStart = regular <- initFlowStateRegular headers stream <- clientInputStream resp atomically $ putTMVar stVar $ FlowStateRegular regular - recvMessageLoop sess tracer regular stream - FlowStartNoMessages trailers -> + recvMessageLoop sess unmask tracer regular stream + FlowStartNoMessages trailers -> do atomically $ putTMVar stVar $ FlowStateNoMessages trailers case setup of diff --git a/src/Network/GRPC/Util/Session/Server.hs b/src/Network/GRPC/Util/Session/Server.hs index 7ceaba24..8f2dd2b9 100644 --- a/src/Network/GRPC/Util/Session/Server.hs +++ b/src/Network/GRPC/Util/Session/Server.hs @@ -7,6 +7,7 @@ module Network.GRPC.Util.Session.Server ( import Control.Exception import Control.Tracer import Network.HTTP2.Server qualified as Server +import GHC.Stack import Network.GRPC.Util.Concurrency import Network.GRPC.Util.HTTP2 (fromHeaderTable) @@ -33,7 +34,7 @@ data ConnectionToClient = ConnectionToClient { -- -- The actual response will not immediately be initiated; see below. setupResponseChannel :: forall sess. - AcceptSession sess + (AcceptSession sess, HasCallStack) => sess -> Tracer IO (DebugMsg sess) -> ConnectionToClient @@ -66,17 +67,17 @@ setupResponseChannel sess tracer conn startOutbound = do else FlowStartRegular <$> parseRequestRegular sess requestInfo - forkThread (channelInbound channel) newEmptyTMVarIO $ \stVar -> + forkThread (channelInbound channel) newEmptyTMVarIO $ \unmask stVar -> do case inboundStart of FlowStartRegular headers -> do regular <- initFlowStateRegular headers stream <- serverInputStream (request conn) atomically $ putTMVar stVar $ FlowStateRegular regular - recvMessageLoop sess tracer regular stream - FlowStartNoMessages trailers -> + recvMessageLoop sess unmask tracer regular stream + FlowStartNoMessages trailers -> do atomically $ putTMVar stVar $ FlowStateNoMessages trailers - forkThread (channelOutbound channel) newEmptyTMVarIO $ \stVar -> do + forkThread (channelOutbound channel) newEmptyTMVarIO $ \unmask stVar -> do outboundStart <- startOutbound inboundStart let responseInfo = buildResponseInfo sess outboundStart case outboundStart of @@ -90,7 +91,7 @@ setupResponseChannel sess tracer conn startOutbound = do (responseHeaders responseInfo) $ \write' flush' -> do stream <- serverOutputStream write' flush' - sendMessageLoop sess tracer regular stream + sendMessageLoop sess unmask tracer regular stream respond conn resp FlowStartNoMessages trailers -> do atomically $ putTMVar stVar $ FlowStateNoMessages trailers diff --git a/src/Network/GRPC/Util/Thread.hs b/src/Network/GRPC/Util/Thread.hs index fb148562..5a976d8c 100644 --- a/src/Network/GRPC/Util/Thread.hs +++ b/src/Network/GRPC/Util/Thread.hs @@ -47,6 +47,7 @@ data ThreadState a = -- | Thread terminated with an exception | ThreadException SomeException + deriving (Show) {------------------------------------------------------------------------------- Creating threads @@ -56,18 +57,20 @@ newThreadState :: IO (TVar (ThreadState a)) newThreadState = newTVarIO ThreadNotStarted forkThread :: - TVar (ThreadState a) + HasCallStack + => TVar (ThreadState a) -> IO a -- ^ Initialize the thread (runs with exceptions masked) - -> (a -> IO ()) -- ^ Main thread body + -> ((forall x. IO x -> IO x) -> a -> IO ()) -- ^ Main thread body -> IO () forkThread state initThread body = void $ mask_ $ forkIOWithUnmask $ \unmask -> - threadBody state unmask initThread body + threadBody state initThread $ body unmask -- | Wrap the thread body -- -- This should be wrapped around the body of the thread, and should be called --- with exceptions masked. +-- with exceptions masked. The body itself is responsible for unmasking +-- exceptions -- -- This is intended for integration with existing libraries (such as @http2@), -- which might do the forking under the hood. @@ -76,11 +79,10 @@ forkThread state initThread body = -- this function terminates immediately. threadBody :: TVar (ThreadState a) - -> (forall x. IO x -> IO x) -- ^ Unmask exceptions -> IO a -- ^ Initialize the thread (runs with exceptions masked) -> (a -> IO ()) -- ^ Main thread body -> IO () -threadBody state unmask initThread body = do +threadBody state initThread body = do tid <- myThreadId shouldStart <- atomically $ do st <- readTVar state @@ -93,7 +95,7 @@ threadBody state unmask initThread body = do when shouldStart $ do iface <- initThread atomically $ writeTVar state $ ThreadRunning tid iface - res <- try $ unmask $ body iface + res <- try $ body iface atomically $ writeTVar state $ case res of Left e -> ThreadException e diff --git a/test-common/Test/Util/ClientServer.hs b/test-common/Test/Util/ClientServer.hs index 16fd2a8c..8fe0e3ba 100644 --- a/test-common/Test/Util/ClientServer.hs +++ b/test-common/Test/Util/ClientServer.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE CPP #-} {-# LANGUAGE OverloadedStrings #-} module Test.Util.ClientServer ( @@ -24,15 +25,17 @@ module Test.Util.ClientServer ( import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM -import Control.Exception import Control.Monad +import Control.Monad.Catch import Control.Monad.IO.Class import Control.Tracer +import Data.Bifunctor import Data.Default import Data.Map qualified as Map import Data.Set qualified as Set import Data.Text qualified as Text import GHC.Generics qualified as GHC +import Network.HTTP2.Internal qualified as HTTP2 import Network.HTTP2.Server qualified as HTTP2 import Network.TLS import Text.Show.Pretty @@ -132,6 +135,7 @@ data ExpectedException e = | ExpectedExceptionCompressionNegotationFailed CompressionNegotationFailed | ExpectedExceptionDouble (DoubleException (ExpectedException e)) | ExpectedExceptionCustom e + | ExpectedExceptionHttp2 (Maybe SomeException) deriving stock (Show, GHC.Generic) deriving anyclass (PrettyVal) @@ -222,8 +226,12 @@ isExpectedException cfg assessCustomException topLevel = , doubleExceptionAnnotation = () } - | Just (STMException _stack err' ) <- fromException err +#if MIN_VERSION_stm(9,9,9) + -- http://github.com/edsko/stm-debug + | Just (STMException _stack err') <- fromException err = go err' +#endif + | Just (ThreadInterfaceUnavailable _stack err') <- fromException err = go err' | Just (ThreadCancelled _stack err') <- fromException err @@ -233,6 +241,21 @@ isExpectedException cfg assessCustomException topLevel = | Just (ChannelUncleanClose err') <- fromException err = go err' + -- + -- HTTP2 + -- + -- HTTP2 will kill handlers when they are no longer required. We expect + -- these when one part of a test throws an exception and we exit the test + -- prematurely. + -- + -- TODO: We could try and make these more precise, and expect them only + -- if indeed another part of the test has failed, but we'd really be + -- testing http2 rather than grapesy itself. + -- + + | Just (HTTP2.KilledByHttp2ThreadManager e) <- fromException err + = Right (ExpectedExceptionHttp2 e) + -- -- Custom exceptions -- @@ -254,30 +277,45 @@ isExpectedException cfg assessCustomException topLevel = {------------------------------------------------------------------------------- Server handler lock - - We don't want to terminate the test when some of the server handlers are - still running; this will result in those handlers being killed with a - 'KilledByHttp2ThreadManager' exception, confusing the test results. -------------------------------------------------------------------------------} -newtype ServerHandlerLock = ServerHandlerLock (TVar Int) +newtype ServerHandlerLock = ServerHandlerLock (TVar (Either SomeException Int)) newServerHandlerLock :: IO ServerHandlerLock -newServerHandlerLock = ServerHandlerLock <$> newTVarIO 0 +newServerHandlerLock = ServerHandlerLock <$> newTVarIO (Right 0) waitForHandlerTermination :: ServerHandlerLock -> STM () waitForHandlerTermination (ServerHandlerLock lock) = do - activeHandlers <- readTVar lock - unless (activeHandlers == 0) $ retry + mActiveHandlers <- readTVar lock + case mActiveHandlers of + Left err -> throwSTM err + Right 0 -> return () + Right _ -> retry serverHandlerLockHook :: ServerHandlerLock -> HTTP2.Server -> HTTP2.Server serverHandlerLockHook (ServerHandlerLock lock) server request aux respond = - bracket_ register unregister $ - server request aux respond + mask $ \unmask -> do + register + result <- try $ unmask $ server request aux respond + -- We don't rethrow the exception (instead 'waitForHandlerTermination' will) + unregister result where - register, unregister :: IO () - register = atomically $ modifyTVar lock (\x -> x + 1) - unregister = atomically $ modifyTVar lock (\x -> x - 1) + register :: IO () + register = atomically $ modifyTVar lock $ bimap id succ + + unregister :: Either SomeException () -> IO () + unregister result = atomically $ modifyTVar lock $ either Left $ + case result of + Right () -> + Right . pred + Left e -> + case fromException e of + Just (HTTP2.KilledByHttp2ThreadManager _) -> + -- If we are shutting down because of a test failure, we don't + -- want to get confused by any other handlers being shut down + Right . pred + Nothing -> + const (Left e) {------------------------------------------------------------------------------- Server @@ -481,17 +519,17 @@ runTestClientServer cfg clientRun serverHandlers = do (Right (), Right a) -> return a (Left serverErr, Right _) -> - throwIO $ ServerException { + throwM $ ServerException { serverException = serverErr , serverExceptionLogs = logMsgs } (Right (), Left clientErr) -> - throwIO $ ClientException { + throwM $ ClientException { clientException = clientErr , clientExceptionLogs = logMsgs } (Left serverErr, Left clientErr) -> - throwIO $ DoubleException { + throwM $ DoubleException { doubleExceptionServer = serverErr , doubleExceptionClient = clientErr , doubleExceptionAnnotation = logMsgs diff --git a/test-grapesy/Main.hs b/test-grapesy/Main.hs index 668c5063..8d503c6b 100644 --- a/test-grapesy/Main.hs +++ b/test-grapesy/Main.hs @@ -20,3 +20,4 @@ main = defaultMain $ testGroup "grapesy" [ , Dialogue.tests ] ] + diff --git a/test-grapesy/Test/Driver/Dialogue/Execution.hs b/test-grapesy/Test/Driver/Dialogue/Execution.hs index cf012a87..df33d986 100644 --- a/test-grapesy/Test/Driver/Dialogue/Execution.hs +++ b/test-grapesy/Test/Driver/Dialogue/Execution.hs @@ -128,6 +128,95 @@ instance Monad (Health e) where ifAlive :: (a -> Health e b) -> Health e a -> Health e b ifAlive = (=<<) +{------------------------------------------------------------------------------- + Execution mode +-------------------------------------------------------------------------------} + +-- | Execution mode +-- +-- Each test involves one or more pairs of a client and a server engaged in an +-- RPC call. Each call consists of a series of actions ("client sends initial +-- metadata", "client sends a message", "server sends a message", etc.). We +-- categorize such actions as either " passive " (such as receiving a message) +-- or " active " (such as sending a message). +-- +-- As part of the test generation, each action is assigned a (test) clock tick. +-- This imposes a specific (but randomly generated) ordering; this matches the +-- formal definition of the behaviour of concurrent systems: "for every +-- interleaving, ...". Active actions wait until the test clock (essentially +-- such an @MVar Int@) reaches their assigned tick before proceeding. The +-- /advancement/ of the test clock depends on the mode (see below). +data ExecutionMode = + -- | Conservative mode (used when tests involve early termination) + -- + -- == Ordering + -- + -- Consider a test that looks something like: + -- + -- > clock tick 1: client sends message + -- > clock tick 2: client throws exception + -- + -- We have to be careful to preserve ordering here: the exception thrown by + -- the client may or may not " overtake " the earlier send: the message may + -- or may not be send to the server, thereby making the tests flaky. In + -- conversative mode, therefore, the /passive/ participant is the one that + -- advances the test clock; in the example above, the /server/ advances the + -- test clock when it receives the message. The downside of this approach + -- is that we are excluding some valid behaviour from the tests: we're + -- effectively making every operation synchronous. + -- + -- == Connection isolation + -- + -- The tests assume that different calls are independent from each other. + -- This is mostly true, but not completely: when a client or a server + -- terminates early, the entire connection (supporting potentially many + -- calls) is reset. It's not entirely clear why; it feels like an + -- unnecessary limitation in @http2@. (TODO: Actually, this might be related + -- to an exception being thrown in 'outboundTrailersMaker'?) + -- + -- Ideally, we would either (1) randomly assign connections to calls and + -- then test that an early termination only affects calls using the same + -- connection, or better yet, (2), remove this limitation from @http2@. + -- + -- For now, we do neither: /if/ a test includes early termination, we give + -- each call its own connection, thereby regaining independence. + Conservative + + -- | Aggressive mode (used when tests do not involve early termination) + -- + -- == Ordering + -- + -- Consider a test containing: + -- + -- > clock tick 1: client sends message A + -- > clock tick 2: client sends message B + -- + -- In aggressive mode it's the /active/ participant that advances the clock. + -- This means that we may well reach clock tick 2 before the server has + -- received message A, thus testing the asynchronous nature of the + -- operations that @grapesy@ offers + -- + -- ## Connection isolation + -- + -- In aggressive mode all calls from the client to the server share the + -- same connection. + | Aggressive + deriving (Show) + +determineExecutionMode :: GlobalSteps -> ExecutionMode +determineExecutionMode steps = + if hasEarlyTermination steps + then Conservative + else Aggressive + +ifConservative :: Applicative m => ExecutionMode -> m () -> m () +ifConservative Conservative k = k +ifConservative Aggressive _ = pure () + +ifAggressive :: Applicative m => ExecutionMode -> m () -> m () +ifAggressive Aggressive k = k +ifAggressive Conservative _ = pure () + {------------------------------------------------------------------------------- Client-side interpretation -------------------------------------------------------------------------------} @@ -135,10 +224,11 @@ ifAlive = (=<<) clientLocal :: HasCallStack => TestClock + -> ExecutionMode -> Client.Call (BinaryRpc meth srv) -> LocalSteps -> IO () -clientLocal testClock call = \(LocalSteps steps) -> +clientLocal testClock mode call = \(LocalSteps steps) -> flip evalStateT (Alive ()) $ go steps where go :: [(TestClockTick, LocalStep)] -> StateT (ServerHealth ()) IO () @@ -148,9 +238,9 @@ clientLocal testClock call = \(LocalSteps steps) -> case step of ClientAction action -> do liftIO $ waitForTestClockTick testClock tick - continue <- clientAct action - `finally` - liftIO (advanceTestClock testClock) + continue <- + clientAct action `finally` + liftIO (ifAggressive mode $ advanceTestClock testClock) if continue then go steps else do @@ -163,7 +253,9 @@ clientLocal testClock call = \(LocalSteps steps) -> advanceTestClockAtTimes testClock $ mapMaybe ourStep steps ServerAction action -> do - reactToServer action + reactToServer action `finally` + liftIO (ifConservative mode $ advanceTestClock testClock) + go steps -- Client action @@ -257,25 +349,14 @@ clientLocal testClock call = \(LocalSteps steps) -> clientGlobal :: TestClock + -> ExecutionMode -> (forall a. (Client.Connection -> IO a) -> IO a) -> GlobalSteps -> IO () -clientGlobal testClock withConn = \steps@(GlobalSteps globalSteps) -> - -- TODO: The tests assume that different calls are independent from each - -- other. This is mostly true, but not completely: when a client or a server - -- terminates early, the entire connection (supporting potentially many - -- calls) is reset. It's not entirely clear why; it feels like an - -- unnecessary limitation in @http2@. - -- - -- Ideally, we would either (1) randomly assign connections to calls and - -- then test that an early termination only affects calls using the same - -- connection, or better yet, (2), remove this limitation from @http2@. - -- - -- For now, we do neither: /if/ a test includes early termination, we give - -- each call its own connection, thereby regaining independence. - if hasEarlyTermination steps - then go Nothing [] globalSteps - else withConn $ \conn -> go (Just conn) [] globalSteps +clientGlobal testClock mode withConn = \(GlobalSteps globalSteps) -> + case mode of + Aggressive -> withConn $ \conn -> go (Just conn) [] globalSteps + Conservative -> go Nothing [] globalSteps where go :: Maybe Client.Connection -> [Async ()] -> [LocalSteps] -> IO () go _ threads [] = do @@ -318,7 +399,7 @@ clientGlobal testClock withConn = \steps@(GlobalSteps globalSteps) -> -- a class of correct behaviour: the server might not respond -- with that initial metadata until the client has sent some -- messages. - clientLocal testClock call (LocalSteps steps') + clientLocal testClock mode call (LocalSteps steps') _otherwise -> error $ "clientGlobal: expected Initiate, got " ++ show steps @@ -373,9 +454,10 @@ _waitForEnabled label = liftIO $ do serverLocal :: TestClock + -> ExecutionMode -> Server.Call (BinaryRpc serv meth) -> LocalSteps -> IO () -serverLocal testClock call = \(LocalSteps steps) -> do +serverLocal testClock mode call = \(LocalSteps steps) -> do flip evalStateT (Alive ()) $ go steps where go :: [(TestClockTick, LocalStep)] -> StateT (ClientHealth ()) IO () @@ -385,9 +467,9 @@ serverLocal testClock call = \(LocalSteps steps) -> do case step of ServerAction action -> do liftIO $ waitForTestClockTick testClock tick - continue <- serverAct action - `finally` - liftIO (advanceTestClock testClock) + continue <- + serverAct action `finally` + liftIO (ifAggressive mode $ advanceTestClock testClock) if continue then go steps else do @@ -402,7 +484,8 @@ serverLocal testClock call = \(LocalSteps steps) -> do advanceTestClockAtTimes testClock $ mapMaybe ourStep steps ClientAction action -> do - reactToClient action + reactToClient action `finally` + liftIO (ifConservative mode $ advanceTestClock testClock) go steps -- Server action @@ -495,6 +578,7 @@ serverLocal testClock call = \(LocalSteps steps) -> do serverGlobal :: HasCallStack => TestClock + -> ExecutionMode -> MVar GlobalSteps -- ^ Unlike in the client case, the grapesy infrastructure spawns a new -- thread for each incoming connection. To know which part of the test this @@ -503,7 +587,7 @@ serverGlobal :: -- thread, the order of these incoming requests is deterministic. -> Server.Call (BinaryRpc serv meth) -> IO () -serverGlobal testClock globalStepsVar call = do +serverGlobal testClock mode globalStepsVar call = do steps <- modifyMVar globalStepsVar (getNextSteps . getGlobalSteps) -- See discussion in clientGlobal (runLocalSteps) advanceTestClock testClock @@ -519,7 +603,7 @@ serverGlobal testClock globalStepsVar call = do -- exception. receivedMetadata <- Server.getRequestMetadata call expect (== metadata) $ Set.fromList receivedMetadata - serverLocal testClock call $ LocalSteps steps' + serverLocal testClock mode call $ LocalSteps steps' _otherwise -> error "serverGlobal: expected ClientInitiateRequest" where @@ -549,10 +633,10 @@ execGlobalSteps steps k = do IsRPC (BinaryRpc serv meth) => Proxy (BinaryRpc serv meth) -> Server.RpcHandler IO handler rpc = Server.mkRpcHandler rpc $ \call -> - serverGlobal testClock globalStepsVar call + serverGlobal testClock mode globalStepsVar call mRes :: Either SomeException a <- try $ k $ def { - client = \conn -> clientGlobal testClock conn steps + client = \conn -> clientGlobal testClock mode conn steps , server = [ handler (Proxy @TestRpc1) , handler (Proxy @TestRpc2) , handler (Proxy @TestRpc3) @@ -562,6 +646,9 @@ execGlobalSteps steps k = do Left err -> throwM err Right a -> return a where + mode :: ExecutionMode + mode = determineExecutionMode steps + -- For 'clientGlobal' the order doesn't matter, because it spawns a thread -- for each 'LocalSteps'. The server however doesn't get this option; the -- threads /get/ spawnwed for each incoming connection, and must feel off diff --git a/test-grapesy/Test/Prop/Dialogue.hs b/test-grapesy/Test/Prop/Dialogue.hs index 5d2825e2..f8417008 100644 --- a/test-grapesy/Test/Prop/Dialogue.hs +++ b/test-grapesy/Test/Prop/Dialogue.hs @@ -35,6 +35,7 @@ tests = testGroup "Test.Prop.Dialogue" [ , testCaseInfo "earlyTermination3" $ regression earlyTermination3 , testCaseInfo "earlyTermination4" $ regression earlyTermination4 , testCaseInfo "earlyTermination5" $ regression earlyTermination5 + , testCaseInfo "earlyTermination6" $ regression earlyTermination6 ] , testGroup "Setup" [ testProperty "shrinkingWellFounded" prop_shrinkingWellFounded @@ -74,13 +75,13 @@ propDialogue dialogue = propClientServer assessCustomException $ execGlobalSteps globalSteps where globalSteps :: GlobalSteps - globalSteps = dialogueGlobalSteps dialogue + globalSteps = dialogueGlobalSteps $ dialogue regression :: Dialogue -> IO String regression dialogue = do handle annotate $ - testClientServer assessCustomException $ - execGlobalSteps globalSteps + (testClientServer assessCustomException $ + execGlobalSteps globalSteps) where globalSteps :: GlobalSteps globalSteps = dialogueGlobalSteps dialogue @@ -316,10 +317,26 @@ earlyTermination4 = Dialogue [ -- 'clientGlobal'. earlyTermination5 :: Dialogue earlyTermination5 = Dialogue [ - (1, ClientAction $ Initiate (Set.fromList [],RPC1)) + (1, ClientAction $ Initiate (Set.fromList [], RPC1)) , (1, ServerAction $ Terminate Nothing) - , (0, ClientAction $ Initiate (Set.fromList [],RPC1)) + , (0, ClientAction $ Initiate (Set.fromList [], RPC1)) , (0, ClientAction $ Send (NoMoreElems NoMetadata)) , (0, ServerAction $ Send (NoMoreElems (Set.fromList []))) , (1, ClientAction $ Send (NoMoreElems NoMetadata)) ] + +-- | Variation where the client does send some messages before throwing an +-- exception +-- +-- This is mostly a check on the test infrastructure itself. In a test case +-- like this where a message is enqueued and then an exception is thrown, the +-- exception might " overtake " that message and the server will never +-- receive it. This motivates the " conversative " test mode where we test +-- each operation in a synchronous manner. +earlyTermination6 :: Dialogue +earlyTermination6 = Dialogue [ + (0, ClientAction $ Initiate (Set.fromList [], RPC1)) + , (0, ClientAction $ Send (StreamElem 0)) + , (0, ClientAction $ Terminate (Just (ExceptionId 0))) + , (0, ServerAction $ Send (NoMoreElems (Set.fromList []))) + ]