Skip to content

Commit

Permalink
Metadata tutorial
Browse files Browse the repository at this point in the history
  • Loading branch information
edsko committed Oct 23, 2024
1 parent 9537d2c commit e305540
Show file tree
Hide file tree
Showing 16 changed files with 438 additions and 26 deletions.
1 change: 1 addition & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ packages:
, ./tutorials/quickstart
, ./tutorials/basics
, ./tutorials/lowlevel
, ./tutorials/metadata

package grapesy
tests: True
Expand Down
2 changes: 1 addition & 1 deletion grapesy/demo-server/Demo/Server/Service/Greeter.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ sayHelloStreamReply = mkRpcHandlerNoInitialMetadata $ \call -> do
setResponseInitialMetadata call $ SayHelloMetadata (Just "initial-md-value")

-- The client expects the metadata well before the first output
_ <- initiateResponse call
initiateResponse call

req <- recvFinalInput call

Expand Down
2 changes: 1 addition & 1 deletion grapesy/interop/Interop/Server/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ constructResponseMetadata call = do

-- Send initial metadata
setResponseInitialMetadata call initMeta
_initiated <- initiateResponse call
initiateResponse call

-- Return the final metadata to be sent at the end of the call
return trailMeta
Expand Down
31 changes: 17 additions & 14 deletions grapesy/src/Network/GRPC/Client/Call.hs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ data Call rpc = SupportsClientRpc rpc => Call {
-- If there are still /inbound/ messages upon leaving the scope of 'withRPC' no
-- exception is raised (but the call is nonetheless still closed, and the server
-- handler will be informed that the client has disappeared).
withRPC :: forall m rpc a.
withRPC :: forall rpc m a.
(MonadMask m, MonadIO m, SupportsClientRpc rpc, HasCallStack)
=> Connection -> CallParams rpc -> Proxy rpc -> (Call rpc -> m a) -> m a
withRPC conn callParams proxy k = fmap fst $
Expand Down Expand Up @@ -386,7 +386,7 @@ sendInputWithMeta Call{callChannel} msg = liftIO $ do
-- 'GrpcStatus' here: a status of 'GrpcOk' carries no information, and any other
-- status will result in a 'GrpcException'. Calling 'recvOutput' again after
-- receiving the trailers is a bug and results in a 'RecvAfterFinal' exception.
recvOutput :: forall m rpc.
recvOutput :: forall rpc m.
(MonadIO m, HasCallStack)
=> Call rpc
-> m (StreamElem (ResponseTrailingMetadata rpc) (Output rpc))
Expand Down Expand Up @@ -444,8 +444,10 @@ recvOutputWithMeta = recvBoth
-- returns any replies.
-- * The response metadata /will/ be available before the first output from the
-- server, and may indeed be available /well/ before.
recvResponseMetadata :: forall rpc. Call rpc -> IO (ResponseMetadata rpc)
recvResponseMetadata call@Call{} =
recvResponseMetadata :: forall rpc m.
MonadIO m
=> Call rpc -> m (ResponseMetadata rpc)
recvResponseMetadata call@Call{} = liftIO $
recvInitialResponse call >>= aux
where
aux ::
Expand All @@ -472,12 +474,13 @@ recvResponseMetadata call@Call{} =
-- rather than thrown as an exception.
--
-- Most applications will never need to use this function.
recvInitialResponse ::
Call rpc
-> IO ( Either (TrailersOnly' HandledSynthesized)
recvInitialResponse :: forall rpc m.
MonadIO m
=> Call rpc
-> m ( Either (TrailersOnly' HandledSynthesized)
(ResponseHeaders' HandledSynthesized)
)
recvInitialResponse Call{callChannel} =
recvInitialResponse Call{callChannel} = liftIO $
fmap inbHeaders <$> Session.getInboundHeaders callChannel

{-------------------------------------------------------------------------------
Expand Down Expand Up @@ -516,7 +519,7 @@ sendEndOfInput call = sendInput call $ NoMoreElems NoMetadata
-- of \"Trailers-Only\" amounts to a protocol error; if the server /does/ use
-- \"Trailers-Only\", this throws a 'ProtoclException'
-- ('UnexpectedTrailersOnly').
recvResponseInitialMetadata :: forall m rpc.
recvResponseInitialMetadata :: forall rpc m.
MonadIO m
=> Call rpc
-> m (ResponseInitialMetadata rpc)
Expand All @@ -534,7 +537,7 @@ recvResponseInitialMetadata call@Call{} = liftIO $ do
-- | Receive the next output
--
-- Throws 'ProtocolException' if there are no more outputs.
recvNextOutput :: forall m rpc.
recvNextOutput :: forall rpc m.
(MonadIO m, HasCallStack)
=> Call rpc -> m (Output rpc)
recvNextOutput call@Call{} = liftIO $ do
Expand All @@ -555,7 +558,7 @@ recvNextOutput call@Call{} = liftIO $ do
--
-- NOTE: If the first output we receive from the server is not marked as final,
-- we will block until we receive the end-of-stream indication.
recvFinalOutput :: forall m rpc.
recvFinalOutput :: forall rpc m.
(MonadIO m, HasCallStack)
=> Call rpc
-> m (Output rpc, ResponseTrailingMetadata rpc)
Expand All @@ -577,7 +580,7 @@ recvFinalOutput call@Call{} = liftIO $ do
-- | Receive trailers
--
-- Throws 'ProtocolException' if we received an output.
recvTrailers :: forall m rpc.
recvTrailers :: forall rpc m.
(MonadIO m, HasCallStack)
=> Call rpc -> m (ResponseTrailingMetadata rpc)
recvTrailers call@Call{} = liftIO $ do
Expand All @@ -594,7 +597,7 @@ recvTrailers call@Call{} = liftIO $ do
Internal auxiliary: deal with final message
-------------------------------------------------------------------------------}

recvBoth :: forall m rpc.
recvBoth :: forall rpc m.
(HasCallStack, MonadIO m)
=> Call rpc
-> m (StreamElem ProperTrailers' (InboundMeta, Output rpc))
Expand All @@ -612,7 +615,7 @@ recvBoth Call{callChannel} = liftIO $
flatten (Right streamElem) =
streamElem

recvEither :: forall m rpc.
recvEither :: forall rpc m.
(HasCallStack, MonadIO m)
=> Call rpc
-> m (Either ProperTrailers' (InboundMeta, Output rpc))
Expand Down
2 changes: 1 addition & 1 deletion grapesy/src/Network/GRPC/Common/StreamElem.hs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ whileNext_ f g = go
FinalElem a b -> g a >> return b
NoMoreElems b -> return b

-- | Invoke the callback until it returns 'NoNextElem', collecting results
-- | Invoke the callback until 'FinalElem' or 'NoMoreElems', collecting results
collect :: forall m b a. Monad m => m (StreamElem b a) -> m ([a], b)
collect f =
first reverse . swap <$> flip runStateT [] aux
Expand Down
18 changes: 10 additions & 8 deletions grapesy/src/Network/GRPC/Server/Call.hs
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,8 @@ sendOutputWithMeta :: forall rpc.
-> StreamElem (ResponseTrailingMetadata rpc) (OutboundMeta, Output rpc)
-> IO ()
sendOutputWithMeta call@Call{callChannel} msg = do
_updated <- initiateResponse call
msg' <- bitraverse mkTrailers return msg
initiateResponse call
msg' <- bitraverse mkTrailers return msg
Session.send callChannel msg'

-- This /must/ be called before leaving the scope of 'runHandler' (or we
Expand Down Expand Up @@ -480,14 +480,16 @@ setResponseInitialMetadata Call{ callResponseMetadata
Low-level API
-------------------------------------------------------------------------------}

-- | Initiate the response
-- | Initiate the response.
--
-- This will cause the initial response metadata to be sent
-- (see also 'setResponseMetadata').
-- This will cause the initial response metadata to be sent (see also
-- 'setResponseMetadata').
--
-- Returns 'False' if the response was already initiated.
initiateResponse :: HasCallStack => Call rpc -> IO Bool
initiateResponse Call{callResponseKickoff} =
-- Does nothing if the response was already initated (that is, the response
-- headers, or trailers in the case of 'sendTrailersOnly', have already been
-- sent).
initiateResponse :: HasCallStack => Call rpc -> IO ()
initiateResponse Call{callResponseKickoff} = void $
atomically $ tryPutTMVar callResponseKickoff $ KickoffRegular callStack

-- | Use the gRPC @Trailers-Only@ case for non-error responses
Expand Down
2 changes: 1 addition & 1 deletion grapesy/test-grapesy/Test/Driver/Dialogue/Execution.hs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ serverLocal clock call = \(LocalSteps steps) -> do
case action of
Initiate metadata -> liftIO $ do
Server.setResponseInitialMetadata call metadata
void $ Server.initiateResponse call
Server.initiateResponse call
return True
Send x -> do
peerHealth <- get
Expand Down
10 changes: 10 additions & 0 deletions tutorials/lowlevel/app/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ methods db =
$ RawMethod (mkRpcHandler $ routeChat )
$ NoMoreMethods


-- Alternative way to define the handlers, avoiding 'fromMethods'
_handlers :: DB -> [SomeRpcHandler IO]
_handlers db = [
someRpcHandler . mkRpcHandler $ getFeature db
, someRpcHandler . mkRpcHandler $ listFeatures db
, someRpcHandler . mkRpcHandler $ recordRoute db
, someRpcHandler . mkRpcHandler $ routeChat
]

main :: IO ()
main = do
db <- getDB
Expand Down
31 changes: 31 additions & 0 deletions tutorials/metadata/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
Copyright (c) 2023-2024, Well-Typed LLP and Anduril Industries Inc.

All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following
disclaimer in the documentation and/or other materials provided
with the distribution.

* Neither the name of Well-Typed LLP, the name of Anduril
Industries Inc., nor the names of other contributors may be
used to endorse or promote products derived from this software
without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
1 change: 1 addition & 0 deletions tutorials/metadata/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Metadata tutorial
3 changes: 3 additions & 0 deletions tutorials/metadata/Setup.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import Data.ProtoLens.Setup

main = defaultMainGeneratingProtos "proto"
98 changes: 98 additions & 0 deletions tutorials/metadata/app/Client.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
module Client (main) where

import Control.Monad.Catch
import Control.Monad.State
import Crypto.Hash.SHA256 qualified as SHA256
import Data.ByteString (ByteString)
import Data.ByteString qualified as BS
import Data.Text qualified as Text
import System.Environment
import System.IO

import Network.GRPC.Client
import Network.GRPC.Common
import Network.GRPC.Common.Protobuf
import Network.GRPC.Common.StreamElem qualified as StreamElem

import Proto.API.Fileserver

{-------------------------------------------------------------------------------
Very simple progress bar
-------------------------------------------------------------------------------}

newtype ProgressT m a = WrapProgressT {
unwrapProgressT :: StateT [Integer] m a
}
deriving newtype (Functor, Applicative, Monad, MonadTrans, MonadIO)

instance MonadState s m => MonadState s (ProgressT m) where
state = lift . state

runProgressT :: (MonadMask m, MonadIO m) => Integer -> ProgressT m a -> m a
runProgressT totalSize =
(`finally` liftIO (putStrLn " done"))
. flip evalStateT dots
. unwrapProgressT
where
numDots = 50
stepSize = totalSize `div` fromIntegral numDots
dots = take numDots $ iterate (+ stepSize) stepSize

updateProgressBar :: forall m. MonadIO m => Integer -> ProgressT m ()
updateProgressBar currentSize = do
WrapProgressT $ StateT go
liftIO $ hFlush stdout
where
go :: [Integer] -> m ((), [Integer])
go [] = return ((), [])
go (d:ds)
| d <= currentSize = liftIO (putChar '.') >> go ds
| otherwise = return ((), d:ds)

{-------------------------------------------------------------------------------
RPC
-------------------------------------------------------------------------------}

processPartial ::
Handle
-> Proto Partial
-> ProgressT (StateT (SHA256.Ctx, Integer) IO) ()
processPartial h partial = do
liftIO $ BS.hPut h chunk
accSize <- state $ \(ctx, currentSize) ->
let currentSize' = currentSize + fromIntegral (BS.length chunk)
in (currentSize', (SHA256.update ctx chunk, currentSize'))
updateProgressBar accSize
where
chunk :: ByteString
chunk = partial ^. #chunk

download :: Connection -> String -> String -> IO ()
download conn inp out = do
withRPC conn def (Proxy @(Protobuf Fileserver "download")) $ \call -> do
sendFinalInput call $ defMessage & #name .~ Text.pack inp

-- Wait for initial metadata, telling us how big the file is
DownloadStart{downloadSize} <- recvResponseInitialMetadata call

-- Process each chunk
(DownloadDone{downloadHash = theirHash}, (ourHash, _)) <-
withFile out WriteMode $ \h ->
flip runStateT (SHA256.init, 0) . runProgressT downloadSize $
StreamElem.whileNext_ (recvOutput call) (processPartial h)

-- Check hash
putStrLn $ "Hash match: " ++ show (theirHash == SHA256.finalize ourHash)

{-------------------------------------------------------------------------------
Main application
-------------------------------------------------------------------------------}

main :: IO ()
main = do
[inp, out] <- getArgs
withConnection def server $ \conn -> do
download conn inp out
where
server :: Server
server = ServerInsecure $ Address "127.0.0.1" defaultInsecurePort Nothing
Loading

0 comments on commit e305540

Please sign in to comment.