src/Network/Freddy.hs
{-# LANGUAGE OverloadedStrings #-}
module Network.Freddy (
connect,
disconnect,
Connection,
respondTo,
tapInto,
deliverWithResponse,
deliver,
cancelConsumer,
Consumer,
Delivery (..),
Error (..)
) where
import Control.Concurrent (forkIO)
import qualified Network.AMQP as AMQP
import Data.Text (Text, pack)
import Data.ByteString.Lazy.Char8 (ByteString)
import qualified Control.Concurrent.BroadcastChan as BC
import System.Timeout (timeout)
import Network.Freddy.ResultType (ResultType)
import qualified Network.Freddy.ResultType as ResultType
import qualified Network.Freddy.Request as Request
import Network.Freddy.CorrelationIdGenerator (CorrelationId, generateCorrelationId)
type Payload = ByteString
type QueueName = Text
type ReplyWith = Payload -> IO ()
type FailWith = Payload -> IO ()
type ResponseChannelEmitter = BC.BroadcastChan BC.In (Maybe AMQP.PublishError, AMQP.Message)
type ResponseChannelListener = BC.BroadcastChan BC.Out (Maybe AMQP.PublishError, AMQP.Message)
data Error = InvalidRequest Payload | TimeoutError deriving (Show, Eq)
type Response = Either Error Payload
data Delivery = Delivery Payload ReplyWith FailWith
data Reply = Reply QueueName AMQP.Message
data Connection = Connection {
amqpConnection :: AMQP.Connection,
amqpProduceChannel :: AMQP.Channel,
amqpResponseChannel :: AMQP.Channel,
responseQueueName :: Text,
eventChannel :: ResponseChannelEmitter
}
data Consumer = Consumer {
consumerTag :: AMQP.ConsumerTag,
consumerChannel :: AMQP.Channel
}
{-|
Creates a connection with the message queue.
__Example__:
@
connection <- Freddy.connect "127.0.0.1" "/" "guest" "guest"
@
-}
connect :: String -- ^ server host
-> Text -- ^ virtual host
-> Text -- ^ user name
-> Text -- ^ password
-> IO Connection
connect host vhost user pass = do
connection <- AMQP.openConnection host vhost user pass
produceChannel <- AMQP.openChannel connection
responseChannel <- AMQP.openChannel connection
AMQP.declareExchange produceChannel AMQP.newExchange {
AMQP.exchangeName = topicExchange,
AMQP.exchangeType = "topic",
AMQP.exchangeDurable = False
}
eventChannel <- BC.newBroadcastChan
(responseQueueName, _, _) <- declareQueue responseChannel ""
AMQP.consumeMsgs responseChannel responseQueueName AMQP.NoAck $ responseCallback eventChannel
AMQP.addReturnListener produceChannel (returnCallback eventChannel)
return $ Connection {
amqpConnection = connection,
amqpResponseChannel = responseChannel,
amqpProduceChannel = produceChannel,
responseQueueName = responseQueueName,
eventChannel = eventChannel
}
{-|
Closes the connection with the message queue.
__Example__:
@
connection <- Freddy.connect "127.0.0.1" "/" "guest" "guest"
Freddy.disconnect connection
@
-}
disconnect :: Connection -> IO ()
disconnect = AMQP.closeConnection . amqpConnection
{-|
Sends a message and waits for the response.
__Example__:
@
import qualified Network.Freddy as Freddy
import qualified Network.Freddy.Request as R
connection <- Freddy.connect "127.0.0.1" "/" "guest" "guest"
response <- Freddy.deliverWithResponse connection R.newReq {
R.queueName = "echo",
R.body = "{\\"msg\\": \\"what did you say?\\"}"
}
case response of
Right payload -> putStrLn "Received positive result"
Left (Freddy.InvalidRequest payload) -> putStrLn "Received error"
Left Freddy.TimeoutError -> putStrLn "Request timed out"
@
-}
deliverWithResponse :: Connection -> Request.Request -> IO Response
deliverWithResponse connection request = do
correlationId <- generateCorrelationId
let msg = AMQP.newMsg {
AMQP.msgBody = Request.body request,
AMQP.msgCorrelationID = Just correlationId,
AMQP.msgDeliveryMode = Just AMQP.NonPersistent,
AMQP.msgType = Just "request",
AMQP.msgReplyTo = Just $ responseQueueName connection,
AMQP.msgExpiration = Request.expirationInMs request
}
responseChannelListener <- BC.newBChanListener $ eventChannel connection
AMQP.publishMsg' (amqpProduceChannel connection) "" (Request.queueName request) True msg
AMQP.publishMsg (amqpProduceChannel connection) topicExchange (Request.queueName request) msg
responseBody <- timeout (Request.timeoutInMicroseconds request) $ do
let messageMatcher = matchingCorrelationId correlationId
waitForResponse responseChannelListener messageMatcher
case responseBody of
Just (Nothing, msg) -> return . createResponse $ msg
Just (Just error, _) -> return . Left . InvalidRequest $ "Publish Error"
Nothing -> return $ Left TimeoutError
createResponse :: AMQP.Message -> Either Error Payload
createResponse msg = do
let msgBody = AMQP.msgBody msg
case AMQP.msgType msg of
Just msgType ->
if (ResultType.fromText msgType) == ResultType.Success then
Right msgBody
else
Left . InvalidRequest $ msgBody
_ -> Left . InvalidRequest $ "No message type"
{-|
Send and forget type of delivery. It sends a message to given destination
without waiting for a response. This is useful when there are multiple
consumers that are using 'tapInto' or you just do not care about the
response.
__Example__:
@
import qualified Network.Freddy as Freddy
import qualified Network.Freddy.Request as R
connection <- Freddy.connect "127.0.0.1" "/" "guest" "guest"
Freddy.deliver connection R.newReq {
R.queueName = "notifications.user_signed_in",
R.body = "{\\"user_id\\": 1}"
}
@
-}
deliver :: Connection -> Request.Request -> IO ()
deliver connection request = do
let msg = AMQP.newMsg {
AMQP.msgBody = Request.body request,
AMQP.msgDeliveryMode = Just AMQP.NonPersistent,
AMQP.msgExpiration = Request.expirationInMs request
}
AMQP.publishMsg (amqpProduceChannel connection) "" (Request.queueName request) msg
AMQP.publishMsg (amqpProduceChannel connection) topicExchange (Request.queueName request) msg
return ()
{-|
Responds to messages on a given destination. It is useful for messages that
have to be processed once and then a result must be sent.
__Example__:
@
processMessage (Freddy.Delivery body replyWith failWith) = replyWith body
connection <- Freddy.connect "127.0.0.1" "/" "guest" "guest"
Freddy.respondTo connection "echo" processMessage
@
-}
respondTo :: Connection -> QueueName -> (Delivery -> IO ()) -> IO Consumer
respondTo connection queueName callback = do
let produceChannel = amqpProduceChannel connection
consumeChannel <- AMQP.openChannel . amqpConnection $ connection
declareQueue consumeChannel queueName
tag <- AMQP.consumeMsgs consumeChannel queueName AMQP.NoAck $
replyCallback callback produceChannel
return Consumer { consumerChannel = consumeChannel, consumerTag = tag }
{-|
Listens for messages on a given destination or destinations without
consuming them.
__Example__:
@
processMessage body = putStrLn body
connection <- Freddy.connect "127.0.0.1" "/" "guest" "guest"
Freddy.tapInto connection "notifications.*" processMessage
@
-}
tapInto :: Connection -> QueueName -> (Payload -> IO ()) -> IO Consumer
tapInto connection queueName callback = do
consumeChannel <- AMQP.openChannel . amqpConnection $ connection
declareExlusiveQueue consumeChannel queueName
AMQP.bindQueue consumeChannel "" topicExchange queueName
let consumer = callback . AMQP.msgBody .fst
tag <- AMQP.consumeMsgs consumeChannel queueName AMQP.NoAck consumer
return Consumer { consumerChannel = consumeChannel, consumerTag = tag }
{-|
Stops the consumer from listening new messages.
__Example__:
@
connection <- Freddy.connect "127.0.0.1" "/" "guest" "guest"
consumer <- Freddy.tapInto connection "notifications.*" processMessage
threadDelay tenMinutes $ Freddy.cancelConsumer consumer
@
-}
cancelConsumer :: Consumer -> IO ()
cancelConsumer consumer = do
AMQP.cancelConsumer (consumerChannel consumer) $ consumerTag consumer
AMQP.closeChannel (consumerChannel consumer)
returnCallback :: ResponseChannelEmitter -> (AMQP.Message, AMQP.PublishError) -> IO ()
returnCallback eventChannel (msg, error) =
BC.writeBChan eventChannel (Just error, msg)
responseCallback :: ResponseChannelEmitter -> (AMQP.Message, AMQP.Envelope) -> IO ()
responseCallback eventChannel (msg, _) =
BC.writeBChan eventChannel (Nothing, msg)
replyCallback :: (Delivery -> IO ()) -> AMQP.Channel -> (AMQP.Message, AMQP.Envelope) -> IO ()
replyCallback userCallback channel (msg, env) = do
let requestBody = AMQP.msgBody msg
let replyWith = sendReply msg channel ResultType.Success
let failWith = sendReply msg channel ResultType.Error
let delivery = Delivery requestBody replyWith failWith
forkIO . userCallback $ delivery
return ()
sendReply :: AMQP.Message -> AMQP.Channel -> ResultType -> Payload -> IO ()
sendReply originalMsg channel resType body =
case buildReply originalMsg resType body of
Just (Reply queueName message) -> do
AMQP.publishMsg channel "" queueName message
return ()
Nothing -> putStrLn "Could not reply"
buildReply :: AMQP.Message -> ResultType -> Payload -> Maybe Reply
buildReply originalMsg resType body = do
queueName <- AMQP.msgReplyTo originalMsg
let msg = AMQP.newMsg {
AMQP.msgBody = body,
AMQP.msgCorrelationID = AMQP.msgCorrelationID originalMsg,
AMQP.msgDeliveryMode = Just AMQP.NonPersistent,
AMQP.msgType = Just . ResultType.serializeResultType $ resType
}
Just $ Reply queueName msg
matchingCorrelationId :: CorrelationId -> AMQP.Message -> Bool
matchingCorrelationId correlationId msg =
case AMQP.msgCorrelationID msg of
Just msgCorrelationId -> msgCorrelationId == correlationId
Nothing -> False
topicExchange :: Text
topicExchange = "freddy-topic"
waitForResponse :: ResponseChannelListener -> (AMQP.Message -> Bool) -> IO (Maybe AMQP.PublishError, AMQP.Message)
waitForResponse eventChannelListener predicate = do
(error, msg) <- BC.readBChan eventChannelListener
if predicate msg then
return (error, msg)
else
waitForResponse eventChannelListener predicate
declareQueue :: AMQP.Channel -> QueueName -> IO (Text, Int, Int)
declareQueue channel queueName =
AMQP.declareQueue channel AMQP.newQueue {AMQP.queueName = queueName}
declareExlusiveQueue :: AMQP.Channel -> QueueName -> IO (Text, Int, Int)
declareExlusiveQueue channel queueName =
AMQP.declareQueue channel AMQP.newQueue {
AMQP.queueName = queueName,
AMQP.queueExclusive = True
}