1
0
Fork 0
mirror of https://code.naskya.net/repos/ndqEd synced 2025-03-20 15:14:54 +09:00
vervis/src/Vervis/Federation.hs

1194 lines
56 KiB
Haskell
Raw Normal View History

{- This file is part of Vervis.
-
- Written in 2019 by fr33domlover <fr33domlover@riseup.net>.
-
- Copying is an act of love. Please copy, reuse and share.
-
- The author(s) have dedicated all copyright and related and neighboring
- rights to this software to the public domain worldwide. This software is
- distributed without any warranty.
-
- You should have received a copy of the CC0 Public Domain Dedication along
- with this software. If not, see
- <http://creativecommons.org/publicdomain/zero/1.0/>.
-}
module Vervis.Federation
( handleInboxActivity
, fixRunningDeliveries
, handleOutboxNote
, retryOutboxDelivery
)
where
import Prelude
import Control.Applicative
import Control.Concurrent.MVar
import Control.Concurrent.STM.TVar
2019-04-11 13:44:44 +00:00
import Control.Exception hiding (Handler, try)
import Control.Monad
import Control.Monad.Logger.CallStack
import Control.Monad.Trans.Except
import Control.Monad.Trans.Maybe
import Control.Monad.Trans.Reader
import Data.Aeson (Object)
2019-04-11 13:44:44 +00:00
import Data.Bifunctor
import Data.ByteString (ByteString)
2019-04-11 13:44:44 +00:00
import Data.Either
import Data.Foldable
2019-04-11 13:44:44 +00:00
import Data.Function
import Data.List (sort, deleteBy, nub, union, unionBy)
import Data.List.NonEmpty (NonEmpty (..), nonEmpty)
import Data.Maybe
2019-04-11 13:44:44 +00:00
import Data.Semigroup
import Data.Text (Text)
import Data.Text.Encoding
import Data.Time.Clock
import Data.Traversable
2019-04-11 13:44:44 +00:00
import Data.Tuple
import Database.Persist hiding (deleteBy)
import Database.Persist.Sql hiding (deleteBy)
import Network.HTTP.Client
import Network.HTTP.Signature
import Network.HTTP.Types.Header
import Network.HTTP.Types.URI
import Network.TLS
2019-04-11 13:44:44 +00:00
import UnliftIO.Exception (try)
import Yesod.Core hiding (logError, logWarn, logInfo)
import Yesod.Persist.Core
2019-04-11 13:44:44 +00:00
import qualified Data.List.NonEmpty as NE
import qualified Data.List.Ordered as LO
import qualified Data.Text as T
import qualified Database.Esqueleto as E
import Network.HTTP.Signature
import Database.Persist.JSON
import Network.FedURI
import Web.ActivityPub
import Yesod.Auth.Unverified
import Yesod.FedURI
import Yesod.Hashids
import Yesod.MonadSite
import Data.Either.Local
2019-04-11 13:44:44 +00:00
import Data.List.Local
import Data.List.NonEmpty.Local
import Data.Maybe.Local
import Database.Persist.Local
import Vervis.ActorKey
import Vervis.Foundation
import Vervis.Model
import Vervis.Model.Ident
import Vervis.RemoteActorStore
import Vervis.Settings
hostIsLocal :: (MonadHandler m, HandlerSite m ~ App) => Text -> m Bool
hostIsLocal h = getsYesod $ (== h) . appInstanceHost . appSettings
verifyHostLocal
:: (MonadHandler m, HandlerSite m ~ App)
=> Text -> Text -> ExceptT Text m ()
verifyHostLocal h t = do
local <- hostIsLocal h
unless local $ throwE t
parseAudience :: Monad m => Audience -> Text -> ExceptT Text m FedURI
parseAudience (Audience to bto cc bcc aud) t =
case toSingleton to of
Just fu
| null bto && null cc && null bcc && null aud ->
return fu
_ -> throwE t
where
toSingleton v =
case v of
[x] -> Just x
_ -> Nothing
fromMaybeE :: Monad m => Maybe a -> Text -> ExceptT Text m a
fromMaybeE Nothing t = throwE t
fromMaybeE (Just x) _ = return x
requireEitherM
:: MonadIO m => Maybe a -> Maybe b -> String -> String -> m (Either a b)
requireEitherM mx my f t =
case requireEither mx my of
Left b -> liftIO $ throwIO $ userError $ if b then t else f
Right exy -> return exy
prependError :: Monad m => Text -> ExceptT Text m a -> ExceptT Text m a
prependError t a = do
r <- lift $ runExceptT a
case r of
Left e -> throwE $ t <> ": " <> e
Right x -> return x
parseProject :: Monad m => LocalURI -> ExceptT Text m (ShrIdent, PrjIdent)
parseProject luRecip = do
route <- case decodeRouteLocal luRecip of
Nothing -> throwE "Got Create Note with recipient that isn't a valid route"
Just r -> return r
case route of
ProjectR shr prj -> return (shr, prj)
_ -> throwE "Got Create Note with non-project recipient"
parseTicket :: Monad m => (ShrIdent, PrjIdent) -> LocalURI -> ExceptT Text m Int
parseTicket project luContext = do
route <- case decodeRouteLocal luContext of
Nothing -> throwE "Local context isn't a valid route"
Just r -> return r
case route of
TicketR shr prj num ->
if (shr, prj) == project
then return num
else throwE "Local context ticket doesn't belong to the recipient project"
_ -> throwE "Local context isn't a ticket route"
parseComment :: LocalURI -> ExceptT Text Handler (ShrIdent, LocalMessageId)
parseComment luParent = do
route <- case decodeRouteLocal luParent of
Nothing -> throwE "Not a local route"
Just r -> return r
case route of
MessageR shr hid -> (shr,) <$> decodeKeyHashidE hid "Non-existent local message hashid"
_ -> throwE "Not a local message route"
getLocalParentMessageId :: DiscussionId -> ShrIdent -> LocalMessageId -> ExceptT Text AppDB MessageId
getLocalParentMessageId did shr lmid = do
mlm <- lift $ get lmid
lm <- fromMaybeE mlm "Local parent: no such lmid"
p <- lift $ getJust $ localMessageAuthor lm
s <- lift $ getJust $ personIdent p
unless (shr == sharerIdent s) $ throwE "Local parent: No such message, lmid mismatches sharer"
let mid = localMessageRest lm
m <- lift $ getJust mid
unless (messageRoot m == did) $
throwE "Local parent belongs to a different discussion"
return mid
-- | Handle an activity that came to our inbox. Return a description of what we
-- did, and whether we stored the activity or not (so that we can decide
-- whether to log it for debugging).
handleInboxActivity :: Object -> Text -> InstanceId -> RemoteActorId -> Activity -> Handler (Text, Bool)
handleInboxActivity raw hActor iidActor rsidActor (Activity _id _luActor audience specific) =
case specific of
CreateActivity (Create note) -> do
result <- runExceptT $ handleCreate iidActor hActor rsidActor raw audience note
case result of
Left e -> logWarn e >> return ("Create Note: " <> e, False)
Right (uNew, luTicket) ->
return
( T.concat
[ "Inserted remote comment <"
, renderFedURI uNew
, "> into discussion of local ticket <"
, luriPath luTicket
, ">."
]
, True
)
_ -> return ("Unsupported activity type", False)
where
verifyLocal fu t = do
let (h, lu) = f2l fu
local <- hostIsLocal h
if local
then return lu
else throwE t
parseParent :: LocalURI -> FedURI -> ExceptT Text Handler (Maybe (Either (ShrIdent, LocalMessageId) (Text, LocalURI)))
parseParent luContext uParent = do
let (hParent, luParent) = f2l uParent
local <- hostIsLocal hParent
if local
then if luParent == luContext
then return Nothing
else prependError "Local parent" $ Just . Left <$> parseComment luParent
else return $ Just $ Right (hParent, luParent)
selectOrphans uNote did op =
E.select $ E.from $ \ (rm `E.InnerJoin` m) -> do
E.on $ rm E.^. RemoteMessageRest E.==. m E.^. MessageId
E.where_ $
rm E.^. RemoteMessageLostParent E.==. E.just (E.val uNote) E.&&.
m E.^. MessageRoot `op` E.val did
return (rm E.^. RemoteMessageId, m E.^. MessageId)
handleCreate iidActor hActor rsidActor raw audience (Note mluNote _luAttrib _aud muParent muContext mpublished content) = do
luNote <- fromMaybeE mluNote "Got Create Note without note id"
(shr, prj) <- do
(hRecip, luRecip) <- f2l <$> parseAudience audience "Got a Create Note with a not-just-single-to audience"
verifyHostLocal hRecip "Non-local recipient"
parseProject luRecip
luContext <- do
uContext <- fromMaybeE muContext "Got a Create Note without context"
verifyLocal uContext "Got a Create Note with non-local context"
num <- parseTicket (shr, prj) luContext
mparent <- do
uParent <- fromMaybeE muParent "Got a Create Note without inReplyTo"
parseParent luContext uParent
published <- fromMaybeE mpublished "Got Create Note without 'published' field"
ExceptT $ runDB $ runExceptT $ do
mrmid <- lift $ getKeyBy $ UniqueRemoteMessageIdent iidActor luNote
for_ mrmid $ \ rmid ->
throwE $
"Got a Create Note with a note ID we already have, \
\RemoteMessageId " <> T.pack (show rmid)
mdid <- lift $ runMaybeT $ do
sid <- MaybeT $ getKeyBy $ UniqueSharer shr
jid <- MaybeT $ getKeyBy $ UniqueProject prj sid
t <- MaybeT $ getValBy $ UniqueTicket jid num
return $ ticketDiscuss t
did <- fromMaybeE mdid "Got Create Note on non-existent ticket"
meparent <- for mparent $ \ parent ->
case parent of
Left (shrParent, lmid) -> Left <$> getLocalParentMessageId did shrParent lmid
Right (hParent, luParent) -> do
mrm <- lift $ runMaybeT $ do
iid <- MaybeT $ getKeyBy $ UniqueInstance hParent
MaybeT $ getValBy $ UniqueRemoteMessageIdent iid luParent
case mrm of
Nothing -> do
logWarn "Got Create Note replying to a remote message we don't have"
return $ Right $ l2f hParent luParent
Just rm -> do
let mid = remoteMessageRest rm
m <- lift $ getJust mid
unless (messageRoot m == did) $
throwE "Got Create Note replying to remote message which belongs to a different discussion"
return $ Left mid
now <- liftIO getCurrentTime
rroid <- lift $ insert $ RemoteRawObject (PersistJSON raw) now
mid <- lift $ insert Message
{ messageCreated = published
, messageContent = content
, messageParent =
case meparent of
Just (Left midParent) -> Just midParent
_ -> Nothing
, messageRoot = did
}
lift $ insert_ RemoteMessage
{ remoteMessageAuthor = rsidActor
, remoteMessageInstance = iidActor
, remoteMessageIdent = luNote
, remoteMessageRest = mid
, remoteMessageRaw = rroid
, remoteMessageLostParent =
case meparent of
Just (Right uParent) -> Just uParent
_ -> Nothing
}
-- Now we need to check orphans. These are RemoteMessages whose
-- associated Message doesn't have a parent, but the original Note
-- does have an inReplyTo which isn't the same as the context. It's
-- possible that this new activity we just got, this new Note, is
-- exactly that lost parent.
let uNote = l2f hActor luNote
related <- lift $ selectOrphans uNote did (E.==.)
lift $ for_ related $ \ (E.Value rmidOrphan, E.Value midOrphan) -> do
logWarn $ T.concat
[ "Found parent for related orphan RemoteMessage #"
, T.pack (show rmidOrphan)
, ", setting its parent now to Message #"
, T.pack (show mid)
]
update rmidOrphan [RemoteMessageLostParent =. Nothing]
update midOrphan [MessageParent =. Just mid]
unrelated <- lift $ selectOrphans uNote did (E.!=.)
for_ unrelated $ \ (E.Value rmidOrphan, E.Value _midOrphan) ->
logWarn $ T.concat
[ "Found parent for unrelated orphan RemoteMessage #"
, T.pack (show rmidOrphan)
, ", NOT settings its parent to Message #"
, T.pack (show mid)
, " because they have different DiscussionId!"
]
return (uNote, luContext)
fixRunningDeliveries :: (MonadIO m, MonadLogger m, IsSqlBackend backend) => ReaderT backend m ()
fixRunningDeliveries = do
c <- updateWhereCount [UnlinkedDeliveryRunning ==. True] [UnlinkedDeliveryRunning =. False]
unless (c == 0) $ logWarn $ T.concat
[ "fixRunningDeliveries fixed "
, T.pack (show c)
, " linked deliveries"
]
c' <- updateWhereCount [DeliveryRunning ==. True] [DeliveryRunning =. False]
unless (c' == 0) $ logWarn $ T.concat
[ "fixRunningDeliveries fixed "
, T.pack (show c)
, " unlinked deliveries"
]
2019-04-11 13:44:44 +00:00
data LocalTicketRecipient = LocalTicketParticipants | LocalTicketTeam
deriving (Eq, Ord)
data LocalProjectRecipient
= LocalProject
| LocalTicketRelated Int LocalTicketRecipient
deriving (Eq, Ord)
data LocalSharerRecipient
= LocalSharer
| LocalProjectRelated PrjIdent LocalProjectRecipient
deriving (Eq, Ord)
data LocalRecipient = LocalSharerRelated ShrIdent LocalSharerRecipient
deriving (Eq, Ord)
data LocalTicketRelatedSet
= OnlyTicketParticipants
| OnlyTicketTeam
| BothTicketParticipantsAndTeam
data LocalProjectRelatedSet = LocalProjectRelatedSet
{ localRecipProject :: Bool
, localRecipTicketRelated :: [(Int, LocalTicketRelatedSet)]
}
data LocalSharerRelatedSet = LocalSharerRelatedSet
{ localRecipSharer :: Bool
, localRecipProjectRelated :: [(PrjIdent, LocalProjectRelatedSet)]
}
type LocalRecipientSet = [(ShrIdent, LocalSharerRelatedSet)]
newtype FedError = FedError Text deriving Show
instance Exception FedError
getHttpSign
:: (MonadSite m, SiteEnv m ~ App) => m (ByteString -> (KeyId, Signature))
getHttpSign = do
(akey1, akey2, new1) <- liftIO . readTVarIO =<< asksSite appActorKeys
renderUrl <- askUrlRender
let (keyID, akey) =
if new1
then (renderUrl ActorKey1R, akey1)
else (renderUrl ActorKey2R, akey2)
return $ \ b -> (KeyId $ encodeUtf8 keyID, actorKeySign akey b)
deliverHttp
:: (MonadSite m, SiteEnv m ~ App)
=> (ByteString -> (KeyId, Signature))
-> Doc Activity
-> Text
-> LocalURI
-> m (Either APPostError (Response ()))
deliverHttp sign doc h luInbox = do
manager <- asksSite appHttpManager
let inbox = l2f h luInbox
headers = hRequestTarget :| [hHost, hDate, hActivityPubActor]
httpPostAP manager inbox headers sign docActor doc
where
docActor = renderFedURI $ l2f (docHost doc) (activityActor $ docValue doc)
isInstanceErrorHttp (InvalidUrlException _ _) = False
isInstanceErrorHttp (HttpExceptionRequest _ hec) =
case hec of
ResponseTimeout -> True
ConnectionTimeout -> True
InternalException se ->
case fromException se of
Just (HandshakeFailed _) -> True
_ -> False
_ -> False
isInstanceErrorP (APPostErrorSig _) = False
isInstanceErrorP (APPostErrorHTTP he) = isInstanceErrorHttp he
isInstanceErrorG Nothing = False
isInstanceErrorG (Just e) =
case e of
APGetErrorHTTP he -> isInstanceErrorHttp he
APGetErrorJSON _ -> False
APGetErrorContentType _ -> False
2019-04-11 13:44:44 +00:00
-- | Handle a Note submitted by a local user to their outbox. It can be either
-- a comment on a local ticket, or a comment on some remote context. Return an
-- error message if the Note is rejected, otherwise the new 'LocalMessageId'.
handleOutboxNote :: Text -> Note -> Handler (Either Text LocalMessageId)
handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished content) = runExceptT $ do
verifyHostLocal host "Attributed to non-local actor"
verifyNothing mluNote "Note specifies an id"
verifyNothing mpublished "Note specifies published"
uContext <- fromMaybeE muContext "Note without context"
recips <- nonEmptyE (concatRecipients aud) "Note without recipients"
(mparent, localRecips, mticket, remoteRecips) <- parseRecipsContextParent recips uContext muParent
federation <- getsYesod $ appFederation . appSettings
unless (federation || null remoteRecips) $
throwE "Federation disabled, but remote recipients specified"
2019-04-11 13:44:44 +00:00
result <- lift $ try $ runDB $ (either abort return =<<) . runExceptT $ do
(pid, shrUser) <- verifyIsLoggedInUser luAttrib "Note attributed to different actor"
(did, meparent, mcollections) <- case mticket of
Just (shr, prj, num) -> do
mt <- lift $ runMaybeT $ do
sid <- MaybeT $ getKeyBy $ UniqueSharer shr
jid <- MaybeT $ getKeyBy $ UniqueProject prj sid
t <- MaybeT $ getValBy $ UniqueTicket jid num
return (sid, t)
(sid, t) <- fromMaybeE mt "Context: No such local ticket"
let did = ticketDiscuss t
mmidParent <- for mparent $ \ parent ->
case parent of
Left (shrParent, lmidParent) -> getLocalParentMessageId did shrParent lmidParent
Right (hParent, luParent) -> do
mrm <- lift $ runMaybeT $ do
iid <- MaybeT $ getKeyBy $ UniqueInstance hParent
MaybeT $ getValBy $ UniqueRemoteMessageIdent iid luParent
rm <- fromMaybeE mrm "Remote parent unknown locally"
let mid = remoteMessageRest rm
m <- lift $ getJust mid
unless (messageRoot m == did) $
throwE "Remote parent belongs to a different discussion"
return mid
return (did, Left <$> mmidParent, Just (sid, ticketFollowers t))
Nothing -> do
(rd, rdnew) <- lift $ do
2019-04-11 13:44:44 +00:00
let (hContext, luContext) = f2l uContext
iid <- either entityKey id <$> insertBy' (Instance hContext)
mrd <- getValBy $ UniqueRemoteDiscussionIdent iid luContext
2019-04-11 13:44:44 +00:00
case mrd of
Just rd -> return (rd, False)
Nothing -> do
did <- insert Discussion
let rd = RemoteDiscussion iid luContext did
erd <- insertBy' rd
case erd of
Left (Entity _ rd') -> do
delete did
return (rd', False)
Right _ -> return (rd, True)
2019-04-11 13:44:44 +00:00
let did = remoteDiscussionDiscuss rd
meparent <- for mparent $ \ parent ->
case parent of
Left (shrParent, lmidParent) -> do
when rdnew $ throwE "Local parent inexistent, RemoteDiscussion is new"
Left <$> getLocalParentMessageId did shrParent lmidParent
Right (hParent, luParent) -> do
mrm <- lift $ runMaybeT $ do
iid <- MaybeT $ getKeyBy $ UniqueInstance hParent
MaybeT $ getValBy $ UniqueRemoteMessageIdent iid luParent
case mrm of
Nothing -> return $ Right $ l2f hParent luParent
Just rm -> Left <$> do
let mid = remoteMessageRest rm
m <- lift $ getJust mid
unless (messageRoot m == did) $
throwE "Remote parent belongs to a different discussion"
return mid
return (did, meparent, Nothing)
(lmid, obid, doc) <- lift $ insertMessage luAttrib shrUser pid uContext did muParent meparent content
moreRemotes <- deliverLocal obid localRecips mcollections
unless (federation || null moreRemotes) $
throwE "Federation disabled but remote collection members found"
remotesHttp <- lift $ deliverRemoteDB obid remoteRecips moreRemotes
return (lmid, obid, doc, remotesHttp)
(lmid, obid, doc, remotesHttp) <- case result of
2019-04-11 13:44:44 +00:00
Left (FedError t) -> throwE t
Right r -> return r
let handleDeliveryError e = logError $ "Outbox POST handler: delivery failed! " <> T.pack (displayException e)
lift $ forkHandler handleDeliveryError $ deliverRemoteHttp obid doc remotesHttp
2019-04-11 13:44:44 +00:00
return lmid
where
verifyNothing :: Monad m => Maybe a -> e -> ExceptT e m ()
verifyNothing Nothing _ = return ()
verifyNothing (Just _) e = throwE e
concatRecipients :: Audience -> [FedURI]
concatRecipients (Audience to bto cc bcc gen) = concat [to, bto, cc, bcc, gen]
nonEmptyE :: Monad m => [a] -> e -> ExceptT e m (NonEmpty a)
nonEmptyE l e =
case nonEmpty l of
Nothing -> throwE e
Just ne -> return ne
parseRecipsContextParent
:: NonEmpty FedURI
-> FedURI
-> Maybe FedURI
-> ExceptT Text Handler
( Maybe (Either (ShrIdent, LocalMessageId) (Text, LocalURI))
, [ShrIdent]
, Maybe (ShrIdent, PrjIdent, Int)
, [FedURI]
)
parseRecipsContextParent recips uContext muParent = do
(locals, remotes) <- lift $ splitRecipients recips
let (localsParsed, localsRest) = parseLocalRecipients locals
unless (null localsRest) $
throwE "Note has invalid local recipients"
let localsSet = groupLocalRecipients localsParsed
(hContext, luContext) = f2l uContext
parent <- parseParent uContext muParent
local <- hostIsLocal hContext
if local
then do
ticket <- parseContextTicket luContext
shrs <- verifyTicketRecipients ticket localsSet
return (parent, shrs, Just ticket, remotes)
else do
shrs <- verifyOnlySharers localsSet
return (parent, shrs, Nothing, remotes)
where
-- First step: Split into remote and local:
splitRecipients :: NonEmpty FedURI -> Handler ([LocalURI], [FedURI])
splitRecipients recips = do
home <- getsYesod $ appInstanceHost . appSettings
let (local, remote) = NE.partition ((== home) . furiHost) recips
return (map (snd . f2l) local, remote)
-- Parse the local recipients
parseLocalRecipients :: [LocalURI] -> ([LocalRecipient], [Either LocalURI (Route App)])
parseLocalRecipients = swap . partitionEithers . map decide
where
parseLocalRecipient (SharerR shr) = Just $ LocalSharerRelated shr LocalSharer
parseLocalRecipient (ProjectR shr prj) =
Just $ LocalSharerRelated shr $ LocalProjectRelated prj LocalProject
parseLocalRecipient (TicketParticipantsR shr prj num) =
Just $ LocalSharerRelated shr $ LocalProjectRelated prj $ LocalTicketRelated num LocalTicketParticipants
parseLocalRecipient (TicketTeamR shr prj num) =
Just $ LocalSharerRelated shr $ LocalProjectRelated prj $ LocalTicketRelated num LocalTicketTeam
parseLocalRecipient _ = Nothing
decide lu =
case decodeRouteLocal lu of
Nothing -> Left $ Left lu
Just route ->
case parseLocalRecipient route of
Nothing -> Left $ Right route
Just lr -> Right lr
-- Group local recipients
groupLocalRecipients :: [LocalRecipient] -> LocalRecipientSet
groupLocalRecipients
= map
( second
$ uncurry LocalSharerRelatedSet
. bimap
(not . null)
( map
( second
$ uncurry LocalProjectRelatedSet
. bimap
(not . null)
( map (second ltrs2ltrs)
. groupWithExtract fst snd
)
. partitionEithers
. NE.toList
)
. groupWithExtract fst (lpr2e . snd)
)
. partitionEithers
. NE.toList
)
. groupWithExtract
(\ (LocalSharerRelated shr _) -> shr)
(\ (LocalSharerRelated _ lsr) -> lsr2e lsr)
. sort
where
lsr2e LocalSharer = Left ()
lsr2e (LocalProjectRelated prj lpr) = Right (prj, lpr)
lpr2e LocalProject = Left ()
lpr2e (LocalTicketRelated num ltr) = Right (num, ltr)
ltrs2ltrs (LocalTicketParticipants :| l) =
if LocalTicketTeam `elem` l
then BothTicketParticipantsAndTeam
else OnlyTicketParticipants
ltrs2ltrs (LocalTicketTeam :| l) =
if LocalTicketParticipants `elem` l
then BothTicketParticipantsAndTeam
else OnlyTicketTeam
parseParent :: FedURI -> Maybe FedURI -> ExceptT Text Handler (Maybe (Either (ShrIdent, LocalMessageId) (Text, LocalURI)))
parseParent _ Nothing = return Nothing
parseParent uContext (Just uParent) =
if uParent == uContext
then return Nothing
else Just <$> do
let (hParent, luParent) = f2l uParent
parentLocal <- hostIsLocal hParent
if parentLocal
then Left <$> parseComment luParent
else return $ Right (hParent, luParent)
parseContextTicket :: Monad m => LocalURI -> ExceptT Text m (ShrIdent, PrjIdent, Int)
parseContextTicket luContext = do
route <- case decodeRouteLocal luContext of
Nothing -> throwE "Local context isn't a valid route"
Just r -> return r
case route of
TicketR shr prj num -> return (shr, prj, num)
_ -> throwE "Local context isn't a ticket route"
atMostSharer :: e -> (ShrIdent, LocalSharerRelatedSet) -> ExceptT e Handler (Maybe ShrIdent)
atMostSharer _ (shr, LocalSharerRelatedSet s []) = return $ if s then Just shr else Nothing
atMostSharer e (_ , LocalSharerRelatedSet _ _ ) = throwE e
verifyTicketRecipients :: (ShrIdent, PrjIdent, Int) -> LocalRecipientSet -> ExceptT Text Handler [ShrIdent]
verifyTicketRecipients (shr, prj, num) recips = do
lsrSet <- fromMaybeE (lookupSorted shr recips) "Note with local context: No required recipients"
(prj', lprSet) <- verifySingleton (localRecipProjectRelated lsrSet) "Note project-related recipient sets"
unless (prj == prj') $ throwE "Note project recipients mismatch context's project"
unless (localRecipProject lprSet) $ throwE "Note context's project not addressed"
(num', ltrSet) <- verifySingleton (localRecipTicketRelated lprSet) "Note ticket-related recipient sets"
unless (num == num') $ throwE "Note project recipients mismatch context's ticket number"
case ltrSet of
OnlyTicketParticipants -> throwE "Note ticket participants not addressed"
OnlyTicketTeam -> throwE "Note ticket team not addressed"
BothTicketParticipantsAndTeam -> return ()
let rest = deleteBy ((==) `on` fst) (shr, lsrSet) recips
orig = if localRecipSharer lsrSet then Just shr else Nothing
catMaybes . (orig :) <$> traverse (atMostSharer "Note with unrelated non-sharer recipients") rest
where
verifySingleton :: Monad m => [a] -> Text -> ExceptT Text m a
verifySingleton [] t = throwE $ t <> ": expected 1, got 0"
verifySingleton [x] _ = return x
verifySingleton l t = throwE $ t <> ": expected 1, got " <> T.pack (show $ length l)
verifyOnlySharers :: LocalRecipientSet -> ExceptT Text Handler [ShrIdent]
verifyOnlySharers lrs = catMaybes <$> traverse (atMostSharer "Note with remote context but local project-related recipients") lrs
abort :: Text -> AppDB a
abort = liftIO . throwIO . FedError
verifyIsLoggedInUser :: LocalURI -> Text -> ExceptT Text AppDB (PersonId, ShrIdent)
verifyIsLoggedInUser lu t = do
Entity pid p <- requireVerifiedAuth
s <- lift $ getJust $ personIdent p
route2local <- getEncodeRouteLocal
let shr = sharerIdent s
if route2local (SharerR shr) == lu
then return (pid, shr)
else throwE t
insertMessage
:: LocalURI
-> ShrIdent
-> PersonId
-> FedURI
-> DiscussionId
-> Maybe FedURI
-> Maybe (Either MessageId FedURI)
-> Text
-> AppDB (LocalMessageId, OutboxItemId, Doc Activity)
insertMessage luAttrib shrUser pid uContext did muParent meparent content = do
now <- liftIO getCurrentTime
mid <- insert Message
{ messageCreated = now
, messageContent = content
, messageParent =
case meparent of
Just (Left midParent) -> Just midParent
_ -> Nothing
, messageRoot = did
}
lmid <- insert LocalMessage
{ localMessageAuthor = pid
, localMessageRest = mid
, localMessageUnlinkedParent =
case meparent of
Just (Right uParent) -> Just uParent
_ -> Nothing
}
route2local <- getEncodeRouteLocal
lmhid <- encodeKeyHashid lmid
let activity luAct = Doc host Activity
{ activityId = luAct
, activityActor = luAttrib
, activityAudience = aud
, activitySpecific = CreateActivity Create
{ createObject = Note
{ noteId = Just $ route2local $ MessageR shrUser lmhid
, noteAttrib = luAttrib
, noteAudience = aud
, noteReplyTo = Just $ fromMaybe uContext muParent
, noteContext = Just uContext
, notePublished = Just now
, noteContent = content
}
}
}
obid <- insert OutboxItem
{ outboxItemPerson = pid
, outboxItemActivity = PersistJSON $ activity $ LocalURI "" ""
, outboxItemPublished = now
}
obhid <- encodeKeyHashid obid
let luAct = route2local $ OutboxItemR shrUser obhid
doc = activity luAct
update obid [OutboxItemActivity =. PersistJSON doc]
return (lmid, obid, doc)
-- | Merge 2 lists ordered on fst, concatenating snd values when
-- multiple identical fsts occur. The resulting list is ordered on fst,
-- and each fst value appears only once.
--
-- >>> mergeWith (+) [('a',3), ('a',1), ('b',5)] [('a',2), ('c',4)]
-- [('a',6), ('b',5), ('c',4)]
mergeConcat :: (Ord a, Semigroup b) => [(a, b)] -> [(a, b)] -> [(a, b)]
mergeConcat xs ys = map (second sconcat) $ groupWithExtract fst snd $ LO.mergeBy (compare `on` fst) xs ys
fst3 :: (a, b, c) -> a
fst3 (x, _, _) = x
thd3 :: (a, b, c) -> c
thd3 (_, _, z) = z
2019-04-11 13:44:44 +00:00
-- Deliver to local recipients. For local users, find in DB and deliver.
-- For local collections, expand them, deliver to local users, and return a
-- list of remote actors found in them.
deliverLocal
:: OutboxItemId
-> [ShrIdent]
-> Maybe (SharerId, FollowerSetId)
-> ExceptT Text AppDB [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, Maybe UTCTime))]
2019-04-11 13:44:44 +00:00
deliverLocal obid recips mticket = do
recipPids <- traverse getPersonId $ nub recips
(morePids, remotes) <-
lift $ case mticket of
Nothing -> return ([], [])
Just (sid, fsid) -> do
(teamPids, teamRemotes) <- getTicketTeam sid
(fsPids, fsRemotes) <- getFollowers fsid
return
( union teamPids fsPids
-- TODO this is inefficient! The way this combines
-- same-host sharer lists is:
--
-- (1) concatenate them
-- (2) nubBy fst to remove duplicates
--
-- But we have knowledge that:
--
-- (1) in each of the 2 lists we're combining, each
-- instance occurs only once
-- (2) in each actor list, each actor occurs only
-- once
--
-- So we can improve this code by:
--
-- (1) Not assume arbitrary number of consecutive
-- repetition of the same instance, we may only
-- have repetition if the same instance occurs
-- in both lists
-- (2) Don't <> the lists, instead apply unionBy or
-- something better (unionBy assumes one list
-- may have repetition, but removes repetition
-- from the other; we know both lists have no
-- repetition, can we use that to do this
-- faster than unionBy?)
--
-- Also, if we ask the DB to sort by actor, then in
-- the (2) point above, instead of unionBy we can use
-- the knowledge the lists are sorted, and apply
-- LO.unionBy instead. Or even better, because
-- LO.unionBy doesn't assume no repetitions (possibly
-- though it still does it the fastest way).
--
-- So, in mergeConcat, don't start with merging,
-- because we lose the knowledge that each list's
-- instances aren't repeated. Use a custom merge
-- where we can unionBy or LO.unionBy whenever both
-- lists have the same instance.
, map (second $ NE.nubBy ((==) `on` fst3)) $ mergeConcat teamRemotes fsRemotes
2019-04-11 13:44:44 +00:00
)
lift $ for_ (union recipPids morePids) $ \ pid -> insert_ $ InboxItemLocal pid obid
return remotes
where
getPersonOrGroupId :: SharerId -> AppDB (Either PersonId GroupId)
getPersonOrGroupId sid = do
mpid <- getKeyBy $ UniquePersonIdent sid
mgid <- getKeyBy $ UniqueGroup sid
requireEitherM mpid mgid
"Found sharer that is neither person nor group"
"Found sharer that is both person and group"
getPersonId :: ShrIdent -> ExceptT Text AppDB PersonId
getPersonId shr = do
msid <- lift $ getKeyBy $ UniqueSharer shr
sid <- fromMaybeE msid "Local Note addresses nonexistent local sharer"
id_ <- lift $ getPersonOrGroupId sid
case id_ of
Left pid -> return pid
Right _gid -> throwE "Local Note addresses a local group"
groupRemotes :: [(InstanceId, Text, RemoteActorId, LocalURI, Maybe UTCTime)] -> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, Maybe UTCTime))]
groupRemotes = groupWithExtractBy ((==) `on` fst) fst snd . map toTuples
2019-04-11 13:44:44 +00:00
where
toTuples (iid, h, rsid, lu, ms) = ((iid, h), (rsid, lu, ms))
getTicketTeam :: SharerId -> AppDB ([PersonId], [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, Maybe UTCTime))])
2019-04-11 13:44:44 +00:00
getTicketTeam sid = do
id_ <- getPersonOrGroupId sid
(,[]) <$> case id_ of
Left pid -> return [pid]
Right gid ->
map (groupMemberPerson . entityVal) <$>
selectList [GroupMemberGroup ==. gid] []
getFollowers :: FollowerSetId -> AppDB ([PersonId], [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, Maybe UTCTime))])
2019-04-11 13:44:44 +00:00
getFollowers fsid = do
local <- selectList [FollowTarget ==. fsid] []
remote <- E.select $ E.from $ \ (rf `E.InnerJoin` rs `E.InnerJoin` i) -> do
E.on $ rs E.^. RemoteActorInstance E.==. i E.^. InstanceId
E.on $ rf E.^. RemoteFollowActor E.==. rs E.^. RemoteActorId
2019-04-11 13:44:44 +00:00
E.where_ $ rf E.^. RemoteFollowTarget E.==. E.val fsid
E.orderBy [E.asc $ i E.^. InstanceId]
return
( i E.^. InstanceId
, i E.^. InstanceHost
, rs E.^. RemoteActorId
, rs E.^. RemoteActorInbox
, rs E.^. RemoteActorErrorSince
2019-04-11 13:44:44 +00:00
)
return
( map (followPerson . entityVal) local
, groupRemotes $
map (\ (E.Value iid, E.Value h, E.Value rsid, E.Value luInbox, E.Value msince) ->
(iid, h, rsid, luInbox, msince)
2019-04-11 13:44:44 +00:00
)
remote
)
-- Deliver to a local sharer, if they exist as a user account
deliverToLocalSharer :: OutboxItemId -> ShrIdent -> ExceptT Text AppDB ()
deliverToLocalSharer obid shr = do
msid <- lift $ getKeyBy $ UniqueSharer shr
sid <- fromMaybeE msid "Local Note addresses nonexistent local sharer"
mpid <- lift $ getKeyBy $ UniquePersonIdent sid
mgid <- lift $ getKeyBy $ UniqueGroup sid
id_ <-
requireEitherM mpid mgid
"Found sharer that is neither person nor group"
"Found sharer that is both person and group"
case id_ of
Left pid -> lift $ insert_ $ InboxItemLocal pid obid
Right _gid -> throwE "Local Note addresses a local group"
deliverRemoteDB
:: OutboxItemId
-> [FedURI]
-> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, Maybe UTCTime))]
-> AppDB
( [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, DeliveryId))]
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
)
deliverRemoteDB obid recips known = do
2019-04-11 13:44:44 +00:00
recips' <- for (groupByHost recips) $ \ (h, lus) -> do
let lus' = NE.nub lus
(iid, inew) <- idAndNew <$> insertBy' (Instance h)
if inew
then return ((iid, h), (Nothing, Nothing, Just lus'))
2019-04-11 13:44:44 +00:00
else do
es <- for lus' $ \ lu -> do
ma <- runMaybeT
$ Left <$> MaybeT (getBy $ UniqueRemoteActor iid lu)
<|> Right <$> MaybeT (getBy $ UniqueUnfetchedRemoteActor iid lu)
2019-04-11 13:44:44 +00:00
return $
case ma of
Nothing -> Left lu
Just e ->
Right $ case e of
Left (Entity raid ra) -> Left (raid, remoteActorInbox ra, remoteActorErrorSince ra)
Right (Entity uraid ura) -> Right (uraid, unfetchedRemoteActorIdent ura, unfetchedRemoteActorSince ura)
let (unknown, newKnown) = partitionEithers $ NE.toList es
(fetched, unfetched) = partitionEithers newKnown
return ((iid, h), (nonEmpty fetched, nonEmpty unfetched, nonEmpty unknown))
let moreKnown = mapMaybe (\ (i, (f, _, _)) -> (i,) <$> f) recips'
unfetched = mapMaybe (\ (i, (_, uf, _)) -> (i,) <$> uf) recips'
stillUnknown = mapMaybe (\ (i, (_, _, uk)) -> (i,) <$> uk) recips'
2019-04-11 13:44:44 +00:00
-- TODO see the earlier TODO about merge, it applies here too
allFetched = map (second $ NE.nubBy ((==) `on` fst3)) $ mergeConcat known moreKnown
fetchedDeliv <- for allFetched $ \ (i, rs) ->
(i,) <$> insertMany' (\ (raid, _, msince) -> Delivery raid obid $ isNothing msince) rs
unfetchedDeliv <- for unfetched $ \ (i, rs) ->
(i,) <$> insertMany' (\ (uraid, _, msince) -> UnlinkedDelivery uraid obid $ isNothing msince) rs
unknownDeliv <- for stillUnknown $ \ (i, lus) -> do
-- TODO maybe for URA insertion we should do insertUnique?
rs <- insertMany' (\ lu -> UnfetchedRemoteActor (fst i) lu Nothing) lus
(i,) <$> insertMany' (\ (_, uraid) -> UnlinkedDelivery uraid obid True) rs
return
( takeNoError fetchedDeliv
, takeNoError unfetchedDeliv
, map
(second $ NE.map $ \ ((lu, ak), dlk) -> (ak, lu, dlk))
unknownDeliv
)
2019-04-11 13:44:44 +00:00
where
groupByHost :: [FedURI] -> [(Text, NonEmpty LocalURI)]
groupByHost = groupAllExtract furiHost (snd . f2l)
insertMany' mk xs = zip' xs <$> insertMany (NE.toList $ mk <$> xs)
where
zip' x y =
case nonEmpty y of
Just y' | length x == length y' -> NE.zip x y'
_ -> error "insertMany' returned different length!"
takeNoError = mapMaybe $ \ (i, rs) -> (i,) <$> nonEmpty (mapMaybe noError $ NE.toList rs)
where
noError ((ak, lu, Nothing), dlk) = Just (ak, lu, dlk)
noError ((_ , _ , Just _ ), _ ) = Nothing
deliverRemoteHttp
:: OutboxItemId
-> Doc Activity
-> ( [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, DeliveryId))]
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
)
-> Handler ()
deliverRemoteHttp obid doc (fetched, unfetched, unknown) = do
sign <- getHttpSign
let deliver = deliverHttp sign doc
now <- liftIO getCurrentTime
traverse_ (fork . deliverFetched deliver now) fetched
traverse_ (fork . deliverUnfetched deliver now) unfetched
traverse_ (fork . deliverUnfetched deliver now) unknown
where
fork = forkHandler $ \ e -> logError $ "Outbox POST handler: delivery failed! " <> T.pack (displayException e)
deliverFetched deliver now ((_, h), recips@(r :| rs)) = do
let (raid, luInbox, dlid) = r
e <- deliver h luInbox
let e' = case e of
Left err ->
if isInstanceErrorP err
then Nothing
else Just False
Right _resp -> Just True
case e' of
Nothing -> runDB $ do
let recips' = NE.toList recips
updateWhere [RemoteActorId <-. map fst3 recips', RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
updateWhere [DeliveryId <-. map thd3 recips'] [DeliveryRunning =. False]
Just success -> do
runDB $
if success
then delete dlid
else do
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
update dlid [DeliveryRunning =. False]
for_ rs $ \ (raid, luInbox, dlid) ->
fork $ do
e <- deliver h luInbox
runDB $
case e of
Left _err -> do
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
update dlid [DeliveryRunning =. False]
Right _resp -> delete dlid
deliverUnfetched deliver now ((iid, h), recips@(r :| rs)) = do
let (uraid, luActor, udlid) = r
e <- fetchRemoteActor iid h luActor
let e' = case e of
Left err -> Just Nothing
Right (Left err) ->
if isInstanceErrorG err
then Nothing
else Just Nothing
Right (Right era) -> Just $ Just era
case e' of
Nothing -> runDB $ do
let recips' = NE.toList recips
updateWhere [UnfetchedRemoteActorId <-. map fst3 recips', UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
updateWhere [UnlinkedDeliveryId <-. map thd3 recips'] [UnlinkedDeliveryRunning =. False]
Just mera -> do
for_ rs $ \ (uraid, luActor, udlid) ->
fork $ do
e <- fetchRemoteActor iid h luActor
case e of
Right (Right (Entity raid ra)) -> do
e' <- deliver h $ remoteActorInbox ra
runDB $
case e' of
Left _ -> do
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
delete udlid
insert_ $ Delivery raid obid False
Right _ -> delete udlid
_ -> runDB $ do
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
update udlid [UnlinkedDeliveryRunning =. False]
case mera of
Nothing -> runDB $ do
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
update udlid [UnlinkedDeliveryRunning =. False]
Just (Entity raid ra) -> do
e'' <- deliver h $ remoteActorInbox ra
runDB $
case e'' of
Left _ -> do
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
delete udlid
insert_ $ Delivery raid obid False
Right _ -> delete udlid
retryOutboxDelivery :: Worker ()
retryOutboxDelivery = do
now <- liftIO $ getCurrentTime
(udls, dls) <- runSiteDB $ do
-- Get all unlinked deliveries which aren't running already in outbox
-- post handlers
unlinked' <- E.select $ E.from $ \ (udl `E.InnerJoin` ob `E.InnerJoin` ura `E.InnerJoin` i `E.LeftOuterJoin` ra) -> do
E.on $ E.just (ura E.^. UnfetchedRemoteActorInstance) E.==. ra E.?. RemoteActorInstance
E.&&. E.just (ura E.^. UnfetchedRemoteActorIdent) E.==. ra E.?. RemoteActorIdent
E.on $ ura E.^. UnfetchedRemoteActorInstance E.==. i E.^. InstanceId
E.on $ udl E.^. UnlinkedDeliveryRecipient E.==. ura E.^. UnfetchedRemoteActorId
E.on $ udl E.^. UnlinkedDeliveryActivity E.==. ob E.^. OutboxItemId
E.where_ $ udl E.^. UnlinkedDeliveryRunning E.==. E.val False
E.orderBy [E.asc $ ura E.^. UnfetchedRemoteActorInstance, E.asc $ ura E.^. UnfetchedRemoteActorId]
return
( i E.^. InstanceId
, i E.^. InstanceHost
, ura E.^. UnfetchedRemoteActorId
, ura E.^. UnfetchedRemoteActorIdent
, ura E.^. UnfetchedRemoteActorSince
, udl E.^. UnlinkedDeliveryId
, udl E.^. UnlinkedDeliveryActivity
, ob E.^. OutboxItemActivity
, ra E.?. RemoteActorId
)
-- Strip the E.Value wrappers and organize the records for the
-- filtering and grouping we'll need to do
let unlinked = map adaptUnlinked unlinked'
-- Split into found (recipient has been reached) and lonely (recipient
-- hasn't been reached
(found, lonely) = partitionMaybes unlinked
-- Turn the found ones into linked deliveries
deleteWhere [UnlinkedDeliveryId <-. map (unlinkedID . snd) found]
insertMany_ $ map toLinked found
-- We're left with the lonely ones. We'll check which actors have been
-- unreachable for too long, and we'll delete deliveries for them. The
-- rest of the actors we'll try to reach by HTTP.
dropAfter <- lift $ asksSite $ appDropDeliveryAfter . appSettings
let (lonelyOld, lonelyNew) = partitionEithers $ map (decideBySinceUDL dropAfter now) lonely
deleteWhere [UnlinkedDeliveryId <-. lonelyOld]
-- Now let's grab the linked deliveries, and similarly delete old ones
-- and return the rest for HTTP delivery.
linked <- E.select $ E.from $ \ (dl `E.InnerJoin` ra `E.InnerJoin` i `E.InnerJoin` ob) -> do
E.on $ dl E.^. DeliveryActivity E.==. ob E.^. OutboxItemId
E.on $ ra E.^. RemoteActorInstance E.==. i E.^. InstanceId
E.on $ dl E.^. DeliveryRecipient E.==. ra E.^. RemoteActorId
E.where_ $ dl E.^. DeliveryRunning E.==. E.val False
E.orderBy [E.asc $ ra E.^. RemoteActorInstance, E.asc $ ra E.^. RemoteActorId]
return
( i E.^. InstanceId
, i E.^. InstanceHost
, ra E.^. RemoteActorId
, ra E.^. RemoteActorInbox
, ra E.^. RemoteActorErrorSince
, dl E.^. DeliveryId
, ob E.^. OutboxItemActivity
)
let (linkedOld, linkedNew) = partitionEithers $ map (decideBySinceDL dropAfter now . adaptLinked) linked
deleteWhere [DeliveryId <-. linkedOld]
return (groupUnlinked lonelyNew, groupLinked linkedNew)
sign <- getHttpSign
let deliver = deliverHttp sign
waitsDL <- traverse (fork . deliverLinked deliver now) dls
waitsUDL <- traverse (fork . deliverUnlinked deliver now) udls
resultsDL <- sequence waitsDL
unless (and resultsDL) $ logError "Periodic delivery DL error"
resultsUDL <- sequence waitsUDL
unless (and resultsUDL) $ logError "Periodic delivery UDL error"
where
adaptUnlinked
(E.Value iid, E.Value h, E.Value uraid, E.Value luRecip, E.Value since, E.Value udlid, E.Value obid, E.Value act, E.Value mraid) =
( mraid
, ( ( (iid, h)
, ((uraid, luRecip), (udlid, obid, persistJSONValue act))
)
, since
)
)
unlinkedID ((_, (_, (udlid, _, _))), _) = udlid
toLinked (raid, ((_, (_, (_, obid, _))), _)) = Delivery raid obid False
relevant dropAfter now since = addUTCTime dropAfter since > now
decideBySinceUDL dropAfter now (udl@(_, (_, (udlid, _, _))), msince) =
case msince of
Nothing -> Right udl
Just since ->
if relevant dropAfter now since
then Right udl
else Left udlid
groupUnlinked
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
. groupWithExtractBy ((==) `on` fst) fst snd
adaptLinked
(E.Value iid, E.Value h, E.Value raid, E.Value inbox, E.Value since, E.Value dlid, E.Value act) =
( ( (iid, h)
, ((raid, inbox), (dlid, persistJSONValue act))
)
, since
)
decideBySinceDL dropAfter now (dl@(_, (_, (dlid, _))), msince) =
case msince of
Nothing -> Right dl
Just since ->
if relevant dropAfter now since
then Right dl
else Left dlid
groupLinked
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
. groupWithExtractBy ((==) `on` fst) fst snd
fork action = do
wait <- asyncSite action
return $ do
result <- wait
case result of
Left e -> do
logError $ "Periodic delivery error! " <> T.pack (displayException e)
return False
Right success -> return success
deliverLinked deliver now ((_, h), recips) = do
waitsR <- for recips $ \ ((raid, inbox), delivs) -> fork $ do
waitsD <- for delivs $ \ (dlid, doc) -> fork $ do
e <- deliver doc h inbox
case e of
Left _err -> return False
Right _resp -> do
runSiteDB $ delete dlid
return True
results <- sequence waitsD
runSiteDB $
if and results
then update raid [RemoteActorErrorSince =. Nothing]
else if or results
then update raid [RemoteActorErrorSince =. Just now]
else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
return True
results <- sequence waitsR
unless (and results) $
logError $ "Periodic DL delivery error for host " <> h
return True
deliverUnlinked deliver now ((iid, h), recips) = do
waitsR <- for recips $ \ ((uraid, luRecip), delivs) -> fork $ do
e <- fetchRemoteActor iid h luRecip
case e of
Right (Right (Entity raid ra)) -> do
waitsD <- for delivs $ \ (udlid, obid, doc) -> fork $ do
e' <- deliver doc h $ remoteActorInbox ra
case e' of
Left _err -> do
runSiteDB $ do
delete udlid
insert_ $ Delivery raid obid False
return False
Right _resp -> do
runSiteDB $ delete udlid
return True
results <- sequence waitsD
runSiteDB $
if and results
then update raid [RemoteActorErrorSince =. Nothing]
else if or results
then update raid [RemoteActorErrorSince =. Just now]
else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
_ -> runSiteDB $ updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
return True
results <- sequence waitsR
unless (and results) $
logError $ "Periodic UDL delivery error for host " <> h
return True