mirror of
https://code.naskya.net/repos/ndqEd
synced 2025-01-28 12:57:51 +09:00
1267 lines
54 KiB
Haskell
1267 lines
54 KiB
Haskell
{- This file is part of Vervis.
|
|
-
|
|
- Written in 2019, 2020, 2021, 2022 by fr33domlover <fr33domlover@riseup.net>.
|
|
-
|
|
- ♡ Copying is an act of love. Please copy, reuse and share.
|
|
-
|
|
- The author(s) have dedicated all copyright and related and neighboring
|
|
- rights to this software to the public domain worldwide. This software is
|
|
- distributed without any warranty.
|
|
-
|
|
- You should have received a copy of the CC0 Public Domain Dedication along
|
|
- with this software. If not, see
|
|
- <http://creativecommons.org/publicdomain/zero/1.0/>.
|
|
-}
|
|
|
|
-- These are for Barbie-related generated instances for ForwarderBy
|
|
{-# LANGUAGE DeriveAnyClass #-}
|
|
{-# LANGUAGE DeriveGeneric #-}
|
|
|
|
module Vervis.Delivery
|
|
( deliverHttp
|
|
, deliverHttpBL
|
|
, deliverRemoteDB_D
|
|
, deliverRemoteDB_L
|
|
, deliverRemoteDB_P
|
|
, deliverRemoteDB_R
|
|
, deliverRemoteHTTP_D
|
|
, deliverRemoteHTTP_L
|
|
, deliverRemoteHTTP_P
|
|
, deliverRemoteHTTP_R
|
|
, deliverRemoteDB'
|
|
, deliverRemoteDB''
|
|
, deliverRemoteHttp
|
|
, deliverRemoteHttp'
|
|
, deliverLocal'
|
|
, deliverLocal
|
|
, insertRemoteActivityToLocalInboxes
|
|
, fixRunningDeliveries
|
|
, retryOutboxDelivery
|
|
)
|
|
where
|
|
|
|
import Control.Applicative
|
|
import Control.Exception hiding (Handler, try)
|
|
import Control.Monad
|
|
import Control.Monad.IO.Class
|
|
import Control.Monad.IO.Unlift
|
|
import Control.Monad.Logger.CallStack
|
|
import Control.Monad.Trans.Class
|
|
import Control.Monad.Trans.Except
|
|
import Control.Monad.Trans.Maybe
|
|
import Control.Monad.Trans.Reader
|
|
import Data.Barbie
|
|
import Data.Bifunctor
|
|
import Data.Bitraversable
|
|
import Data.ByteString (ByteString)
|
|
import Data.Either
|
|
import Data.Foldable
|
|
import Data.Function
|
|
import Data.List.NonEmpty (NonEmpty (..), nonEmpty)
|
|
import Data.Maybe
|
|
import Data.Semigroup
|
|
import Data.Text (Text)
|
|
import Data.Text.Encoding
|
|
import Data.Time.Clock
|
|
import Data.Traversable
|
|
import Database.Persist
|
|
import Database.Persist.Sql
|
|
import GHC.Generics
|
|
import Network.HTTP.Client
|
|
import Network.TLS -- hiding (SHA256)
|
|
import Text.Blaze.Html (preEscapedToHtml)
|
|
import Text.Blaze.Html.Renderer.Text
|
|
import UnliftIO.Exception (try)
|
|
import Yesod.Core hiding (logError, logWarn, logInfo, logDebug)
|
|
import Yesod.Core.Handler
|
|
import Yesod.Persist.Core
|
|
|
|
import qualified Data.ByteString.Lazy as BL
|
|
import qualified Data.CaseInsensitive as CI
|
|
import qualified Data.List.NonEmpty as NE
|
|
import qualified Data.List as L
|
|
import qualified Data.List.Ordered as LO
|
|
import qualified Data.Text as T
|
|
import qualified Data.Text.Lazy as TL
|
|
import qualified Database.Esqueleto as E
|
|
|
|
import Yesod.HttpSignature
|
|
|
|
import Database.Persist.JSON
|
|
import Network.FedURI
|
|
import Network.HTTP.Digest
|
|
import Yesod.ActivityPub
|
|
import Yesod.MonadSite
|
|
import Yesod.FedURI
|
|
import Yesod.Hashids
|
|
|
|
import qualified Web.ActivityPub as AP
|
|
|
|
import Control.Monad.Trans.Except.Local
|
|
import Data.Either.Local
|
|
import Data.List.NonEmpty.Local
|
|
import Data.Maybe.Local
|
|
import Data.Patch.Local hiding (Patch)
|
|
import Data.Tuple.Local
|
|
import Database.Persist.Local
|
|
|
|
import qualified Data.Patch.Local as P
|
|
|
|
import Vervis.ActivityPub
|
|
import Vervis.FedURI
|
|
import Vervis.Foundation
|
|
import Vervis.Model
|
|
import Vervis.Model.Ident
|
|
import Vervis.Recipient
|
|
import Vervis.RemoteActorStore
|
|
import Vervis.Settings
|
|
import Vervis.Time
|
|
|
|
deliverHttp
|
|
:: (MonadSite m, SiteEnv m ~ App)
|
|
=> AP.Doc AP.Activity URIMode
|
|
-> Maybe LocalURI
|
|
-> Host
|
|
-> LocalURI
|
|
-> m (Either AP.APPostError (Response ()))
|
|
deliverHttp doc mfwd h luInbox =
|
|
deliverActivity (ObjURI h luInbox) (ObjURI h <$> mfwd) doc
|
|
|
|
deliverHttpBL
|
|
:: (MonadSite m, SiteEnv m ~ App)
|
|
=> BL.ByteString
|
|
-> Maybe LocalURI
|
|
-> Host
|
|
-> LocalURI
|
|
-> m (Either AP.APPostError (Response ()))
|
|
deliverHttpBL body mfwd h luInbox =
|
|
deliverActivityBL' (ObjURI h luInbox) (ObjURI h <$> mfwd) body
|
|
|
|
deliverRemoteDB_
|
|
:: (MonadIO m, PersistRecordBackend fwder SqlBackend)
|
|
=> (ForwardingId -> Key sender -> fwder)
|
|
-> BL.ByteString
|
|
-> RemoteActivityId
|
|
-> Key sender
|
|
-> ByteString
|
|
-> [((InstanceId, Host), NonEmpty RemoteRecipient)]
|
|
-> ReaderT SqlBackend m
|
|
[((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, Key fwder))]
|
|
deliverRemoteDB_ makeFwder body ractid senderKey sig recips = do
|
|
let body' = BL.toStrict body
|
|
makeFwd (RemoteRecipient raid _ _ msince) =
|
|
Forwarding raid ractid body' sig (isNothing msince)
|
|
fetchedDeliv <- for recips $ bitraverse pure $ \ rs -> do
|
|
fwds <- insertMany' makeFwd rs
|
|
insertMany' (flip makeFwder senderKey . snd) fwds
|
|
return $ takeNoError5 fetchedDeliv
|
|
where
|
|
takeNoError noError = mapMaybe $ \ (i, rs) -> (i,) <$> nonEmpty (mapMaybe noError $ NE.toList rs)
|
|
takeNoError5 = takeNoError noError
|
|
where
|
|
noError ((RemoteRecipient ak luA luI Nothing , fwid), fwrid) = Just (ak, luA, luI, fwid, fwrid)
|
|
noError ((RemoteRecipient _ _ _ (Just _), _ ), _ ) = Nothing
|
|
|
|
deliverRemoteDB_D
|
|
:: MonadIO m
|
|
=> BL.ByteString
|
|
-> RemoteActivityId
|
|
-> DeckId
|
|
-> ByteString
|
|
-> [((InstanceId, Host), NonEmpty RemoteRecipient)]
|
|
-> ReaderT SqlBackend m
|
|
[((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderDeckId))]
|
|
deliverRemoteDB_D = deliverRemoteDB_ ForwarderDeck
|
|
|
|
deliverRemoteDB_L
|
|
:: MonadIO m
|
|
=> BL.ByteString
|
|
-> RemoteActivityId
|
|
-> LoomId
|
|
-> ByteString
|
|
-> [((InstanceId, Host), NonEmpty RemoteRecipient)]
|
|
-> ReaderT SqlBackend m
|
|
[((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderLoomId))]
|
|
deliverRemoteDB_L = deliverRemoteDB_ ForwarderLoom
|
|
|
|
deliverRemoteDB_P
|
|
:: MonadIO m
|
|
=> BL.ByteString
|
|
-> RemoteActivityId
|
|
-> PersonId
|
|
-> ByteString
|
|
-> [((InstanceId, Host), NonEmpty RemoteRecipient)]
|
|
-> ReaderT SqlBackend m
|
|
[((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderPersonId))]
|
|
deliverRemoteDB_P = deliverRemoteDB_ ForwarderPerson
|
|
|
|
deliverRemoteDB_R
|
|
:: MonadIO m
|
|
=> BL.ByteString
|
|
-> RemoteActivityId
|
|
-> RepoId
|
|
-> ByteString
|
|
-> [((InstanceId, Host), NonEmpty RemoteRecipient)]
|
|
-> ReaderT SqlBackend m
|
|
[((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderRepoId))]
|
|
deliverRemoteDB_R = deliverRemoteDB_ ForwarderRepo
|
|
|
|
deliverRemoteHTTP'
|
|
:: (MonadSite m, SiteEnv m ~ App, PersistRecordBackend fwder SqlBackend)
|
|
=> UTCTime
|
|
-> LocalActor
|
|
-> BL.ByteString
|
|
-> ByteString
|
|
-> [((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, Key fwder))]
|
|
-> m ()
|
|
deliverRemoteHTTP' now sender body sig fetched = do
|
|
let deliver h inbox =
|
|
forwardActivity (ObjURI h inbox) sig (renderLocalActor sender) body
|
|
traverse_ (fork . deliverFetched deliver now) fetched
|
|
where
|
|
fork = forkWorker "Inbox forwarding to remote members of local collections: delivery failed"
|
|
deliverFetched deliver now ((_, h), recips@(r :| rs)) = do
|
|
let (raid, _luActor, luInbox, fwid, forwarderKey) = r
|
|
e <- deliver h luInbox
|
|
let e' = case e of
|
|
Left err ->
|
|
if isInstanceErrorP err
|
|
then Nothing
|
|
else Just False
|
|
Right _resp -> Just True
|
|
case e' of
|
|
Nothing -> runSiteDB $ do
|
|
let recips' = NE.toList recips
|
|
updateWhere [RemoteActorId <-. map fst5 recips', RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
|
updateWhere [ForwardingId <-. map fourth5 recips'] [ForwardingRunning =. False]
|
|
Just success -> do
|
|
runSiteDB $
|
|
if success
|
|
then do
|
|
delete forwarderKey
|
|
delete fwid
|
|
else do
|
|
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
|
update fwid [ForwardingRunning =. False]
|
|
for_ rs $ \ (raid, _luActor, luInbox, fwid, forwarderKey) ->
|
|
fork $ do
|
|
e <- deliver h luInbox
|
|
runSiteDB $
|
|
case e of
|
|
Left _err -> do
|
|
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
|
update fwid [ForwardingRunning =. False]
|
|
Right _resp -> do
|
|
delete forwarderKey
|
|
delete fwid
|
|
|
|
deliverRemoteHTTP_D
|
|
:: (MonadSite m, SiteEnv m ~ App)
|
|
=> UTCTime
|
|
-> KeyHashid Deck
|
|
-> BL.ByteString
|
|
-> ByteString
|
|
-> [((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderDeckId))]
|
|
-> m ()
|
|
deliverRemoteHTTP_D now dkhid =
|
|
deliverRemoteHTTP' now $ LocalActorDeck dkhid
|
|
|
|
deliverRemoteHTTP_L
|
|
:: (MonadSite m, SiteEnv m ~ App)
|
|
=> UTCTime
|
|
-> KeyHashid Loom
|
|
-> BL.ByteString
|
|
-> ByteString
|
|
-> [((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderLoomId))]
|
|
-> m ()
|
|
deliverRemoteHTTP_L now lkhid =
|
|
deliverRemoteHTTP' now $ LocalActorLoom lkhid
|
|
|
|
deliverRemoteHTTP_P
|
|
:: (MonadSite m, SiteEnv m ~ App)
|
|
=> UTCTime
|
|
-> KeyHashid Person
|
|
-> BL.ByteString
|
|
-> ByteString
|
|
-> [((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderPersonId))]
|
|
-> m ()
|
|
deliverRemoteHTTP_P now pkhid = deliverRemoteHTTP' now $ LocalActorPerson pkhid
|
|
|
|
deliverRemoteHTTP_R
|
|
:: (MonadSite m, SiteEnv m ~ App)
|
|
=> UTCTime
|
|
-> KeyHashid Repo
|
|
-> BL.ByteString
|
|
-> ByteString
|
|
-> [((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderRepoId))]
|
|
-> m ()
|
|
deliverRemoteHTTP_R now rkhid = deliverRemoteHTTP' now $ LocalActorRepo rkhid
|
|
|
|
deliverRemoteDB'
|
|
:: Host
|
|
-> OutboxItemId
|
|
-> [(Host, NonEmpty LocalURI)]
|
|
-> [((InstanceId, Host), NonEmpty RemoteRecipient)]
|
|
-> AppDB
|
|
( [((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, DeliveryId))]
|
|
, [((InstanceId, Host), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
|
|
, [((InstanceId, Host), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
|
|
)
|
|
deliverRemoteDB' hContext = deliverRemoteDB'' [hContext]
|
|
|
|
data Recip
|
|
= RecipRA (Entity RemoteActor)
|
|
| RecipURA (Entity UnfetchedRemoteActor)
|
|
| RecipRC (Entity RemoteCollection)
|
|
|
|
deliverRemoteDB''
|
|
:: MonadIO m
|
|
=> [Host]
|
|
-> OutboxItemId
|
|
-> [(Host, NonEmpty LocalURI)]
|
|
-> [((InstanceId, Host), NonEmpty RemoteRecipient)]
|
|
-> ReaderT SqlBackend m
|
|
( [((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, DeliveryId))]
|
|
, [((InstanceId, Host), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
|
|
, [((InstanceId, Host), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
|
|
)
|
|
deliverRemoteDB'' hContexts obid recips known = do
|
|
recips' <- for recips $ \ (h, lus) -> do
|
|
let lus' = NE.nub lus
|
|
(iid, inew) <- idAndNew <$> insertBy' (Instance h)
|
|
if inew
|
|
then return ((iid, h), (Nothing, Nothing, Just lus'))
|
|
else do
|
|
es <- for lus' $ \ lu -> do
|
|
ma <- runMaybeT $ do
|
|
Entity roid ro <- MaybeT $ getBy $ UniqueRemoteObject iid lu
|
|
recip <- RecipRA <$> MaybeT (getBy $ UniqueRemoteActor roid)
|
|
<|> RecipURA <$> MaybeT (getBy $ UniqueUnfetchedRemoteActor roid)
|
|
<|> RecipRC <$> MaybeT (getBy $ UniqueRemoteCollection roid)
|
|
return (ro, recip)
|
|
return $
|
|
case ma of
|
|
Nothing -> Just $ Left lu
|
|
Just (ro, r) ->
|
|
case r of
|
|
RecipRA (Entity raid ra) -> Just $ Right $ Left $ RemoteRecipient raid (remoteObjectIdent ro) (remoteActorInbox ra) (remoteActorErrorSince ra)
|
|
RecipURA (Entity uraid ura) -> Just $ Right $ Right (uraid, remoteObjectIdent ro, unfetchedRemoteActorSince ura)
|
|
RecipRC _ -> Nothing
|
|
let (unknown, newKnown) = partitionEithers $ catMaybes $ NE.toList es
|
|
(fetched, unfetched) = partitionEithers newKnown
|
|
return ((iid, h), (nonEmpty fetched, nonEmpty unfetched, nonEmpty unknown))
|
|
let moreKnown = mapMaybe (\ (i, (f, _, _)) -> (i,) <$> f) recips'
|
|
unfetched = mapMaybe (\ (i, (_, uf, _)) -> (i,) <$> uf) recips'
|
|
stillUnknown = mapMaybe (\ (i, (_, _, uk)) -> (i,) <$> uk) recips'
|
|
allFetched = unionRemotes known moreKnown
|
|
fetchedDeliv <- for allFetched $ \ (i, rs) ->
|
|
let fwd = snd i `elem` hContexts
|
|
in (i,) <$> insertMany' (\ (RemoteRecipient raid _ _ msince) -> Delivery raid obid fwd $ isNothing msince) rs
|
|
unfetchedDeliv <- for unfetched $ \ (i, rs) ->
|
|
let fwd = snd i `elem` hContexts
|
|
in (i,) <$> insertMany' (\ (uraid, _, msince) -> UnlinkedDelivery uraid obid fwd $ isNothing msince) rs
|
|
unknownDeliv <- for stillUnknown $ \ (i, lus) -> do
|
|
-- TODO maybe for URA insertion we should do insertUnique?
|
|
ros <- insertMany' (\ lu -> RemoteObject (fst i) lu) lus
|
|
rs <- insertMany' (\ (_lu, roid) -> UnfetchedRemoteActor roid Nothing) ros
|
|
let fwd = snd i `elem` hContexts
|
|
(i,) <$> insertMany' (\ (_, uraid) -> UnlinkedDelivery uraid obid fwd True) rs
|
|
return
|
|
( takeNoError4 fetchedDeliv
|
|
, takeNoError3 unfetchedDeliv
|
|
, map
|
|
(second $ NE.map $ \ (((lu, _roid), ak), dlk) -> (ak, lu, dlk))
|
|
unknownDeliv
|
|
)
|
|
where
|
|
takeNoError noError = mapMaybe $ \ (i, rs) -> (i,) <$> nonEmpty (mapMaybe noError $ NE.toList rs)
|
|
takeNoError3 = takeNoError noError
|
|
where
|
|
noError ((ak, lu, Nothing), dlk) = Just (ak, lu, dlk)
|
|
noError ((_ , _ , Just _ ), _ ) = Nothing
|
|
takeNoError4 = takeNoError noError
|
|
where
|
|
noError (RemoteRecipient ak luA luI Nothing , dlk) = Just (ak, luA, luI, dlk)
|
|
noError (RemoteRecipient _ _ _ (Just _), _ ) = Nothing
|
|
|
|
deliverRemoteHttp
|
|
:: Host
|
|
-> OutboxItemId
|
|
-> AP.Doc AP.Activity URIMode
|
|
-> ( [((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, DeliveryId))]
|
|
, [((InstanceId, Host), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
|
|
, [((InstanceId, Host), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
|
|
)
|
|
-> Worker ()
|
|
deliverRemoteHttp hContext = deliverRemoteHttp' [hContext]
|
|
|
|
deliverRemoteHttp'
|
|
:: [Host]
|
|
-> OutboxItemId
|
|
-> AP.Doc AP.Activity URIMode
|
|
-> ( [((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, DeliveryId))]
|
|
, [((InstanceId, Host), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
|
|
, [((InstanceId, Host), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
|
|
)
|
|
-> Worker ()
|
|
deliverRemoteHttp' hContexts obid doc (fetched, unfetched, unknown) = do
|
|
logDebug' "Starting"
|
|
let deliver fwd h inbox = do
|
|
let fwd' = if h `elem` hContexts then Just fwd else Nothing
|
|
(isJust fwd',) <$> deliverHttp doc fwd' h inbox
|
|
now <- liftIO getCurrentTime
|
|
logDebug' $
|
|
"Launching fetched " <> showHosts fetched
|
|
traverse_ (fork . deliverFetched deliver now) fetched
|
|
logDebug' $
|
|
"Launching unfetched " <> showHosts unfetched
|
|
traverse_ (fork . deliverUnfetched deliver now) unfetched
|
|
logDebug' $
|
|
"Launching unknown " <> showHosts unknown
|
|
traverse_ (fork . deliverUnfetched deliver now) unknown
|
|
logDebug' "Done (async delivery may still be running)"
|
|
where
|
|
showHosts = T.pack . show . map (renderAuthority . snd . fst)
|
|
logDebug' t = logDebug $ prefix <> t
|
|
where
|
|
prefix =
|
|
T.concat
|
|
[ "Outbox POST handler: deliverRemoteHttp obid#"
|
|
, T.pack $ show $ fromSqlKey obid
|
|
, ": "
|
|
]
|
|
fork = forkWorker "Outbox POST handler: HTTP delivery"
|
|
deliverFetched deliver now ((_, h), recips@(r :| rs)) = do
|
|
logDebug'' "Starting"
|
|
let (raid, luActor, luInbox, dlid) = r
|
|
(_, e) <- deliver luActor h luInbox
|
|
e' <- case e of
|
|
Left err -> do
|
|
logError $ T.concat
|
|
[ "Outbox DL delivery #", T.pack $ show dlid
|
|
, " error for <", renderObjURI $ ObjURI h luActor
|
|
, ">: ", T.pack $ displayException err
|
|
]
|
|
return $
|
|
if isInstanceErrorP err
|
|
then Nothing
|
|
else Just False
|
|
Right _resp -> return $ Just True
|
|
case e' of
|
|
Nothing -> runSiteDB $ do
|
|
let recips' = NE.toList recips
|
|
updateWhere [RemoteActorId <-. map fst4 recips', RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
|
updateWhere [DeliveryId <-. map fourth4 recips'] [DeliveryRunning =. False]
|
|
Just success -> do
|
|
runSiteDB $
|
|
if success
|
|
then delete dlid
|
|
else do
|
|
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
|
update dlid [DeliveryRunning =. False]
|
|
for_ rs $ \ (raid, luActor, luInbox, dlid) ->
|
|
fork $ do
|
|
(_, e) <- deliver luActor h luInbox
|
|
runSiteDB $
|
|
case e of
|
|
Left err -> do
|
|
logError $ T.concat
|
|
[ "Outbox DL delivery #", T.pack $ show dlid
|
|
, " error for <", renderObjURI $ ObjURI h luActor
|
|
, ">: ", T.pack $ displayException err
|
|
]
|
|
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
|
update dlid [DeliveryRunning =. False]
|
|
Right _resp -> delete dlid
|
|
where
|
|
logDebug'' t = logDebug' $ T.concat ["deliverFetched ", renderAuthority h, t]
|
|
deliverUnfetched deliver now ((iid, h), recips@(r :| rs)) = do
|
|
logDebug'' "Starting"
|
|
let (uraid, luActor, udlid) = r
|
|
e <- fetchRemoteActor iid h luActor
|
|
let e' = case e of
|
|
Left err -> Just Nothing
|
|
Right (Left err) ->
|
|
if isInstanceErrorG err
|
|
then Nothing
|
|
else Just Nothing
|
|
Right (Right mera) -> Just $ Just mera
|
|
case e' of
|
|
Nothing -> runSiteDB $ do
|
|
let recips' = NE.toList recips
|
|
updateWhere [UnfetchedRemoteActorId <-. map fst3 recips', UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
|
|
updateWhere [UnlinkedDeliveryId <-. map thd3 recips'] [UnlinkedDeliveryRunning =. False]
|
|
Just mmera -> do
|
|
for_ rs $ \ (uraid, luActor, udlid) ->
|
|
fork $ do
|
|
e <- fetchRemoteActor iid h luActor
|
|
case e of
|
|
Right (Right mera) ->
|
|
case mera of
|
|
Nothing -> runSiteDB $ delete udlid
|
|
Just (Entity raid ra) -> do
|
|
(fwd, e') <- deliver luActor h $ remoteActorInbox ra
|
|
runSiteDB $
|
|
case e' of
|
|
Left _ -> do
|
|
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
|
delete udlid
|
|
insert_ $ Delivery raid obid fwd False
|
|
Right _ -> delete udlid
|
|
_ -> runSiteDB $ do
|
|
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
|
|
update udlid [UnlinkedDeliveryRunning =. False]
|
|
case mmera of
|
|
Nothing -> runSiteDB $ do
|
|
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
|
|
update udlid [UnlinkedDeliveryRunning =. False]
|
|
Just mera ->
|
|
case mera of
|
|
Nothing -> runSiteDB $ delete udlid
|
|
Just (Entity raid ra) -> do
|
|
(fwd, e'') <- deliver luActor h $ remoteActorInbox ra
|
|
runSiteDB $
|
|
case e'' of
|
|
Left _ -> do
|
|
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
|
delete udlid
|
|
insert_ $ Delivery raid obid fwd False
|
|
Right _ -> delete udlid
|
|
where
|
|
logDebug'' t = logDebug' $ T.concat ["deliverUnfetched ", renderAuthority h, t]
|
|
|
|
-- | Given a list of local recipients, which may include actors and
|
|
-- collections,
|
|
--
|
|
-- * Insert activity to inboxes of actors
|
|
-- * If collections are listed, insert activity to the local members and return
|
|
-- the remote members
|
|
insertActivityToLocalInboxes
|
|
:: ( MonadSite m
|
|
, YesodHashids (SiteEnv m)
|
|
, PersistRecordBackend record SqlBackend
|
|
)
|
|
=> (InboxId -> InboxItemId -> record)
|
|
-- ^ Database record to insert as an new inbox item to each inbox
|
|
-> Bool
|
|
-- ^ Whether to deliver to collection only if owner actor is addressed
|
|
-> Maybe LocalActor
|
|
-- ^ An actor whose collections are excluded from requiring an owner, i.e.
|
|
-- even if owner is required, this actor's collections will be delivered
|
|
-- to, even if this actor isn't addressed. This is meant to be the
|
|
-- activity's author.
|
|
-> Maybe ActorId
|
|
-- ^ A un actor whose inbox to exclude from delivery, even if this actor is
|
|
-- listed in the recipient set. This is meant to be the activity's
|
|
-- author.
|
|
-> RecipientRoutes
|
|
-> ReaderT SqlBackend m [((InstanceId, Host), NonEmpty RemoteRecipient)]
|
|
insertActivityToLocalInboxes makeInboxItem requireOwner mauthor maidAuthor recips = do
|
|
|
|
-- Unhash actor and work item hashids
|
|
people <- unhashKeys $ recipPeople recips
|
|
groups <- unhashKeys $ recipGroups recips
|
|
repos <- unhashKeys $ recipRepos recips
|
|
decksAndTickets <- do
|
|
decks <- unhashKeys $ recipDecks recips
|
|
for decks $ \ (deckID, (DeckFamilyRoutes deck tickets)) ->
|
|
(deckID,) . (deck,) <$> unhashKeys tickets
|
|
loomsAndCloths <- do
|
|
looms <- unhashKeys $ recipLooms recips
|
|
for looms $ \ (loomID, (LoomFamilyRoutes loom cloths)) ->
|
|
(loomID,) . (loom,) <$> unhashKeys cloths
|
|
|
|
-- Grab local actor sets whose stages are allowed for delivery
|
|
isAuthor <- getIsAuthor
|
|
let allowStages'
|
|
:: (famili -> routes)
|
|
-> (routes -> Bool)
|
|
-> (Key record -> LocalActorBy Key)
|
|
-> (Key record, famili)
|
|
-> Bool
|
|
allowStages' = allowStages isAuthor
|
|
|
|
peopleForStages =
|
|
filter (allowStages' id routePerson LocalActorPerson) people
|
|
groupsForStages =
|
|
filter (allowStages' id routeGroup LocalActorGroup) groups
|
|
reposForStages =
|
|
filter (allowStages' id routeRepo LocalActorRepo) repos
|
|
decksAndTicketsForStages =
|
|
filter (allowStages' fst routeDeck LocalActorDeck) decksAndTickets
|
|
loomsAndClothsForStages =
|
|
filter (allowStages' fst routeLoom LocalActorLoom) loomsAndCloths
|
|
|
|
-- Grab local actors being addressed
|
|
let personIDsForSelf =
|
|
[ key | (key, routes) <- people, routePerson routes ]
|
|
groupIDsForSelf =
|
|
[ key | (key, routes) <- groups, routeGroup routes ]
|
|
repoIDsForSelf =
|
|
[ key | (key, routes) <- repos, routeRepo routes ]
|
|
deckIDsForSelf =
|
|
[ key | (key, (routes, _)) <- decksAndTickets, routeDeck routes ]
|
|
loomIDsForSelf =
|
|
[ key | (key, (routes, _)) <- loomsAndCloths, routeLoom routes ]
|
|
|
|
-- Grab actor actors whose followers are going to be delivered to
|
|
let personIDsForFollowers =
|
|
[ key | (key, routes) <- peopleForStages, routePersonFollowers routes ]
|
|
groupIDsForFollowers =
|
|
[ key | (key, routes) <- groupsForStages, routeGroupFollowers routes ]
|
|
repoIDsForFollowers =
|
|
[ key | (key, routes) <- reposForStages, routeRepoFollowers routes ]
|
|
deckIDsForFollowers =
|
|
[ key | (key, (routes, _)) <- decksAndTicketsForStages, routeDeckFollowers routes ]
|
|
loomIDsForFollowers =
|
|
[ key | (key, (routes, _)) <- loomsAndClothsForStages, routeLoomFollowers routes ]
|
|
|
|
-- Grab tickets and cloths whose followers are going to be delivered to
|
|
let ticketSetsForFollowers =
|
|
mapMaybe
|
|
(\ (deckID, (_, tickets)) -> (deckID,) <$>
|
|
NE.nonEmpty
|
|
[ ticketDeckID | (ticketDeckID, routes) <- tickets
|
|
, routeTicketFollowers routes
|
|
]
|
|
)
|
|
decksAndTicketsForStages
|
|
clothSetsForFollowers =
|
|
mapMaybe
|
|
(\ (loomID, (_, cloths)) -> (loomID,) <$>
|
|
NE.nonEmpty
|
|
[ ticketLoomID | (ticketLoomID, routes) <- cloths
|
|
, routeClothFollowers routes
|
|
]
|
|
)
|
|
loomsAndClothsForStages
|
|
|
|
-- Get addressed Actor IDs from DB
|
|
actorIDsForSelf <- orderedUnion <$> sequenceA
|
|
[ selectActorIDsOrdered personActor PersonActor personIDsForSelf
|
|
, selectActorIDsOrdered groupActor GroupActor groupIDsForSelf
|
|
, selectActorIDsOrdered repoActor RepoActor repoIDsForSelf
|
|
, selectActorIDsOrdered deckActor DeckActor deckIDsForSelf
|
|
, selectActorIDsOrdered loomActor LoomActor loomIDsForSelf
|
|
]
|
|
|
|
-- Get actor and work item FollowerSet IDs from DB
|
|
followerSetIDs <- do
|
|
actorIDs <- concat <$> sequenceA
|
|
[ selectActorIDs personActor personIDsForFollowers
|
|
, selectActorIDs groupActor groupIDsForFollowers
|
|
, selectActorIDs repoActor repoIDsForFollowers
|
|
, selectActorIDs deckActor deckIDsForFollowers
|
|
, selectActorIDs loomActor loomIDsForFollowers
|
|
]
|
|
ticketIDs <-
|
|
concat <$>
|
|
((++)
|
|
<$> traverse
|
|
(selectTicketIDs ticketDeckTicket TicketDeckDeck)
|
|
ticketSetsForFollowers
|
|
<*> traverse
|
|
(selectTicketIDs ticketLoomTicket TicketLoomLoom)
|
|
clothSetsForFollowers
|
|
)
|
|
(++)
|
|
<$> (map (actorFollowers . entityVal) <$>
|
|
selectList [ActorId <-. actorIDs] []
|
|
)
|
|
<*> (map (ticketFollowers . entityVal) <$>
|
|
selectList [TicketId <-. ticketIDs] []
|
|
)
|
|
|
|
-- Get the local and remote followers of the follower sets from DB
|
|
localFollowers <-
|
|
map (followActor . entityVal) <$>
|
|
selectList [FollowTarget <-. followerSetIDs] [Asc FollowActor]
|
|
remoteFollowers <- getRemoteFollowers followerSetIDs
|
|
|
|
-- Insert inbox items to all local recipients, i.e. the local actors
|
|
-- directly addressed or listed in a local stage addressed
|
|
let localRecipients =
|
|
let allLocal = LO.union localFollowers actorIDsForSelf
|
|
in case maidAuthor of
|
|
Nothing -> allLocal
|
|
Just actorID -> LO.minus' allLocal [actorID]
|
|
inboxIDs <-
|
|
map (actorInbox . entityVal) <$>
|
|
selectList [ActorId <-. localRecipients] []
|
|
now <- liftIO getCurrentTime
|
|
inboxItemIDs <- insertMany $ replicate (length inboxIDs) $ InboxItem True now
|
|
insertMany_ $ zipWith makeInboxItem inboxIDs inboxItemIDs
|
|
|
|
-- Return remote followers, to whom we need to deliver via HTTP
|
|
return remoteFollowers
|
|
where
|
|
orderedUnion = foldl' LO.union []
|
|
|
|
unhashKeys
|
|
:: ( MonadSite m
|
|
, YesodHashids (SiteEnv m)
|
|
, ToBackendKey SqlBackend record
|
|
)
|
|
=> [(KeyHashid record, routes)]
|
|
-> m [(Key record, routes)]
|
|
unhashKeys actorSets = do
|
|
unhash <- decodeKeyHashidPure <$> asksSite siteHashidsContext
|
|
return $ mapMaybe (unhashKey unhash) actorSets
|
|
where
|
|
unhashKey unhash (hash, famili) = (,famili) <$> unhash hash
|
|
|
|
getIsAuthor =
|
|
case mauthor of
|
|
Nothing -> pure $ const False
|
|
Just author -> maybe (const False) (==) <$> unhashLocalActor author
|
|
|
|
allowStages
|
|
:: (LocalActorBy Key -> Bool)
|
|
-> (famili -> routes)
|
|
-> (routes -> Bool)
|
|
-> (Key record -> LocalActorBy Key)
|
|
-> (Key record, famili)
|
|
-> Bool
|
|
allowStages isAuthor familyActor routeActor makeActor (actorID, famili)
|
|
= routeActor (familyActor famili)
|
|
|| not requireOwner
|
|
|| isAuthor (makeActor actorID)
|
|
|
|
selectActorIDs
|
|
:: (MonadIO m, PersistRecordBackend record SqlBackend)
|
|
=> (record -> ActorId)
|
|
-> [Key record]
|
|
-> ReaderT SqlBackend m [ActorId]
|
|
selectActorIDs grabActor ids =
|
|
map (grabActor . entityVal) <$> selectList [persistIdField <-. ids] []
|
|
|
|
selectActorIDsOrdered
|
|
:: (MonadIO m, PersistRecordBackend record SqlBackend)
|
|
=> (record -> ActorId)
|
|
-> EntityField record ActorId
|
|
-> [Key record]
|
|
-> ReaderT SqlBackend m [ActorId]
|
|
selectActorIDsOrdered grabActor actorField ids =
|
|
map (grabActor . entityVal) <$> selectList [persistIdField <-. ids] [Asc actorField]
|
|
|
|
selectTicketIDs
|
|
:: ( MonadIO m
|
|
, PersistRecordBackend tracker SqlBackend
|
|
, PersistRecordBackend item SqlBackend
|
|
)
|
|
=> (item -> TicketId)
|
|
-> EntityField item (Key tracker)
|
|
-> (Key tracker, NonEmpty (Key item))
|
|
-> ReaderT SqlBackend m [TicketId]
|
|
selectTicketIDs grabTicket trackerField (trackerID, workItemIDs) = do
|
|
maybeTracker <- get trackerID
|
|
case maybeTracker of
|
|
Nothing -> pure []
|
|
Just _ ->
|
|
map (grabTicket . entityVal) <$>
|
|
selectList [persistIdField <-. NE.toList workItemIDs, trackerField ==. trackerID] []
|
|
|
|
getRemoteFollowers
|
|
:: MonadIO m
|
|
=> [FollowerSetId]
|
|
-> ReaderT SqlBackend m
|
|
[((InstanceId, Host), NonEmpty RemoteRecipient)]
|
|
getRemoteFollowers fsids =
|
|
fmap groupRemotes $
|
|
E.select $ E.from $ \ (rf `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i) -> do
|
|
E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId
|
|
E.on $ ra E.^. RemoteActorIdent E.==. ro E.^. RemoteObjectId
|
|
E.on $ rf E.^. RemoteFollowActor E.==. ra E.^. RemoteActorId
|
|
E.where_ $ rf E.^. RemoteFollowTarget `E.in_` E.valList fsids
|
|
E.orderBy [E.asc $ i E.^. InstanceId, E.asc $ ra E.^. RemoteActorId]
|
|
return
|
|
( i E.^. InstanceId
|
|
, i E.^. InstanceHost
|
|
, ra E.^. RemoteActorId
|
|
, ro E.^. RemoteObjectIdent
|
|
, ra E.^. RemoteActorInbox
|
|
, ra E.^. RemoteActorErrorSince
|
|
)
|
|
where
|
|
groupRemotes = groupWithExtractBy ((==) `on` fst) fst snd . map toTuples
|
|
where
|
|
toTuples (E.Value iid, E.Value h, E.Value raid, E.Value luA, E.Value luI, E.Value ms) = ((iid, h), RemoteRecipient raid luA luI ms)
|
|
|
|
-- | Given a list of local recipients, which may include actors and
|
|
-- collections,
|
|
--
|
|
-- * Insert activity to inboxes of actors
|
|
-- * If collections are listed, insert activity to the local members and return
|
|
-- the remote members
|
|
deliverLocal'
|
|
:: (MonadSite m, YesodHashids (SiteEnv m))
|
|
=> Bool -- ^ Whether to deliver to collection only if owner actor is addressed
|
|
-> LocalActor
|
|
-> ActorId
|
|
-> OutboxItemId
|
|
-> RecipientRoutes
|
|
-> ReaderT SqlBackend m [((InstanceId, Host), NonEmpty RemoteRecipient)]
|
|
deliverLocal' requireOwner author aidAuthor obiid =
|
|
insertActivityToLocalInboxes makeItem requireOwner (Just author) (Just aidAuthor)
|
|
where
|
|
makeItem ibid ibiid = InboxItemLocal ibid obiid ibiid
|
|
|
|
-- | Given a list of local recipients, which may include actors and
|
|
-- collections,
|
|
--
|
|
-- * Insert activity to inboxes of actors
|
|
-- * If the author's follower collection is listed, insert activity to the
|
|
-- local members and return the remote members
|
|
-- * Ignore other collections
|
|
deliverLocal
|
|
:: KeyHashid Person
|
|
-> ActorId
|
|
-> OutboxItemId
|
|
-> RecipientRoutes
|
|
-> AppDB
|
|
[ ( (InstanceId, Host)
|
|
, NonEmpty RemoteRecipient
|
|
)
|
|
]
|
|
deliverLocal authorHash aidAuthor obiid
|
|
= deliverLocal' True (LocalActorPerson authorHash) aidAuthor obiid
|
|
. localRecipSieve sieve True
|
|
where
|
|
sieve = RecipientRoutes [(authorHash, PersonRoutes False True)] [] [] [] []
|
|
|
|
insertRemoteActivityToLocalInboxes
|
|
:: (MonadSite m, YesodHashids (SiteEnv m))
|
|
=> Bool
|
|
-> RemoteActivityId
|
|
-> RecipientRoutes
|
|
-> ReaderT SqlBackend m [((InstanceId, Host), NonEmpty RemoteRecipient)]
|
|
insertRemoteActivityToLocalInboxes requireOwner ractid =
|
|
insertActivityToLocalInboxes makeItem requireOwner Nothing Nothing
|
|
where
|
|
makeItem ibid ibiid = InboxItemRemote ibid ractid ibiid
|
|
|
|
fixRunningDeliveries :: (MonadIO m, MonadLogger m, IsSqlBackend backend) => ReaderT backend m ()
|
|
fixRunningDeliveries = do
|
|
c <- updateWhereCount [UnlinkedDeliveryRunning ==. True] [UnlinkedDeliveryRunning =. False]
|
|
unless (c == 0) $ logWarn $ T.concat
|
|
[ "fixRunningDeliveries fixed "
|
|
, T.pack (show c)
|
|
, " linked deliveries"
|
|
]
|
|
c' <- updateWhereCount [DeliveryRunning ==. True] [DeliveryRunning =. False]
|
|
unless (c' == 0) $ logWarn $ T.concat
|
|
[ "fixRunningDeliveries fixed "
|
|
, T.pack (show c')
|
|
, " unlinked deliveries"
|
|
]
|
|
c'' <- updateWhereCount [ForwardingRunning ==. True] [ForwardingRunning =. False]
|
|
unless (c'' == 0) $ logWarn $ T.concat
|
|
[ "fixRunningDeliveries fixed "
|
|
, T.pack (show c'')
|
|
, " forwarding deliveries"
|
|
]
|
|
|
|
data ForwarderBy f
|
|
= FwderPerson (f ForwarderPerson)
|
|
| FwderGroup (f ForwarderGroup)
|
|
| FwderRepo (f ForwarderRepo)
|
|
| FwderDeck (f ForwarderDeck)
|
|
| FwderLoom (f ForwarderLoom)
|
|
deriving (Generic, FunctorB, ConstraintsB)
|
|
|
|
partitionFwders
|
|
:: [ForwarderBy f]
|
|
-> ( [f ForwarderPerson]
|
|
, [f ForwarderGroup]
|
|
, [f ForwarderRepo]
|
|
, [f ForwarderDeck]
|
|
, [f ForwarderLoom]
|
|
)
|
|
partitionFwders = foldl' f ([], [], [], [], [])
|
|
where
|
|
f (ps, gs, rs, ds, ls) = \ fwder ->
|
|
case fwder of
|
|
FwderPerson p -> (p : ps, gs, rs, ds, ls)
|
|
FwderGroup g -> (ps, g : gs, rs, ds, ls)
|
|
FwderRepo r -> (ps, gs, r : rs, ds, ls)
|
|
FwderDeck d -> (ps, gs, rs, d : ds, ls)
|
|
FwderLoom l -> (ps, gs, rs, ds, l : ls)
|
|
|
|
retryOutboxDelivery :: Worker ()
|
|
retryOutboxDelivery = do
|
|
logInfo "Periodic delivery starting"
|
|
now <- liftIO $ getCurrentTime
|
|
(unlinkedHttp, linkedHttp, forwardingHttp) <- runSiteDB $ do
|
|
|
|
-- Get all unlinked deliveries which aren't running already in outbox
|
|
-- post handlers
|
|
unlinked' <- E.select $ E.from $ \ (udl `E.InnerJoin` ob `E.InnerJoin` ura `E.InnerJoin` ro `E.InnerJoin` i `E.LeftOuterJoin` ra `E.LeftOuterJoin` rc) -> do
|
|
E.on $ E.just (ro E.^. RemoteObjectId) E.==. rc E.?. RemoteCollectionIdent
|
|
E.on $ E.just (ro E.^. RemoteObjectId) E.==. ra E.?. RemoteActorIdent
|
|
E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId
|
|
E.on $ ura E.^. UnfetchedRemoteActorIdent E.==. ro E.^. RemoteObjectId
|
|
E.on $ udl E.^. UnlinkedDeliveryRecipient E.==. ura E.^. UnfetchedRemoteActorId
|
|
E.on $ udl E.^. UnlinkedDeliveryActivity E.==. ob E.^. OutboxItemId
|
|
E.where_ $ udl E.^. UnlinkedDeliveryRunning E.==. E.val False
|
|
E.orderBy [E.asc $ ro E.^. RemoteObjectInstance, E.asc $ ura E.^. UnfetchedRemoteActorId]
|
|
return
|
|
( i E.^. InstanceId
|
|
, i E.^. InstanceHost
|
|
, ura E.^. UnfetchedRemoteActorId
|
|
, ro E.^. RemoteObjectIdent
|
|
, ura E.^. UnfetchedRemoteActorSince
|
|
, udl E.^. UnlinkedDeliveryId
|
|
, udl E.^. UnlinkedDeliveryActivity
|
|
, udl E.^. UnlinkedDeliveryForwarding
|
|
, ob E.^. OutboxItemActivity
|
|
, ra E.?. RemoteActorId
|
|
, rc E.?. RemoteCollectionId
|
|
)
|
|
|
|
-- Strip the E.Value wrappers and organize the records for the
|
|
-- filtering and grouping we'll need to do
|
|
let unlinked = map adaptUnlinked unlinked'
|
|
|
|
-- Split into found (recipient has been reached) and lonely (recipient
|
|
-- hasn't been reached
|
|
(found, lonely) = partitionMaybes unlinked
|
|
|
|
-- Turn the found ones into linked deliveries
|
|
deleteWhere [UnlinkedDeliveryId <-. map (unlinkedID . snd) found]
|
|
insertMany_ $ mapMaybe toLinked found
|
|
|
|
-- We're left with the lonely ones. We'll check which actors have been
|
|
-- unreachable for too long, and we'll delete deliveries for them. The
|
|
-- rest of the actors we'll try to reach by HTTP.
|
|
dropAfter <- lift $ asksSite $ appDropDeliveryAfter . appSettings
|
|
let (lonelyOld, lonelyNew) =
|
|
partitionEithers $ map (decideBySinceUDL dropAfter now) lonely
|
|
deleteWhere [UnlinkedDeliveryId <-. lonelyOld]
|
|
|
|
-- Now let's grab the linked deliveries, and similarly delete old ones
|
|
-- and return the rest for HTTP delivery.
|
|
linked <- E.select $ E.from $ \ (dl `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i `E.InnerJoin` ob) -> do
|
|
E.on $ dl E.^. DeliveryActivity E.==. ob E.^. OutboxItemId
|
|
E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId
|
|
E.on $ ra E.^. RemoteActorIdent E.==. ro E.^. RemoteObjectId
|
|
E.on $ dl E.^. DeliveryRecipient E.==. ra E.^. RemoteActorId
|
|
E.where_ $ dl E.^. DeliveryRunning E.==. E.val False
|
|
E.orderBy [E.asc $ ro E.^. RemoteObjectInstance, E.asc $ ra E.^. RemoteActorId]
|
|
return
|
|
( i E.^. InstanceId
|
|
, i E.^. InstanceHost
|
|
, ra E.^. RemoteActorId
|
|
, ro E.^. RemoteObjectIdent
|
|
, ra E.^. RemoteActorInbox
|
|
, ra E.^. RemoteActorErrorSince
|
|
, dl E.^. DeliveryId
|
|
, dl E.^. DeliveryForwarding
|
|
, ob E.^. OutboxItemActivity
|
|
)
|
|
let (linkedOld, linkedNew) =
|
|
partitionEithers $
|
|
map (decideBySinceDL dropAfter now . adaptLinked) linked
|
|
deleteWhere [DeliveryId <-. linkedOld]
|
|
|
|
-- Same for forwarding deliveries, which are always linked
|
|
forwarding <- E.select $ E.from $
|
|
\ (fw `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i
|
|
`E.LeftOuterJoin` fwp
|
|
`E.LeftOuterJoin` fwg
|
|
`E.LeftOuterJoin` fwr
|
|
`E.LeftOuterJoin` fwd
|
|
`E.LeftOuterJoin` fwl
|
|
) -> do
|
|
E.on $ E.just (fw E.^. ForwardingId) E.==. fwl E.?. ForwarderLoomTask
|
|
E.on $ E.just (fw E.^. ForwardingId) E.==. fwd E.?. ForwarderDeckTask
|
|
E.on $ E.just (fw E.^. ForwardingId) E.==. fwr E.?. ForwarderRepoTask
|
|
E.on $ E.just (fw E.^. ForwardingId) E.==. fwg E.?. ForwarderGroupTask
|
|
E.on $ E.just (fw E.^. ForwardingId) E.==. fwp E.?. ForwarderPersonTask
|
|
E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId
|
|
E.on $ ra E.^. RemoteActorIdent E.==. ro E.^. RemoteObjectId
|
|
E.on $ fw E.^. ForwardingRecipient E.==. ra E.^. RemoteActorId
|
|
E.where_ $ fw E.^. ForwardingRunning E.==. E.val False
|
|
E.orderBy [E.asc $ ro E.^. RemoteObjectInstance, E.asc $ ra E.^. RemoteActorId]
|
|
return (i, ra, fw, fwp, fwg, fwr, fwd, fwl)
|
|
let (forwardingOld, forwardingNew) =
|
|
partitionEithers $
|
|
map (decideBySinceFW dropAfter now . adaptForwarding)
|
|
forwarding
|
|
(fwidsOld, fwdersOld) = unzip forwardingOld
|
|
(fwpidsOld, fwgidsOld, fwridsOld, fwdidsOld, fwlidsOld) =
|
|
partitionFwders fwdersOld
|
|
deleteWhere [ForwarderPersonId <-. fwpidsOld]
|
|
deleteWhere [ForwarderGroupId <-. fwgidsOld]
|
|
deleteWhere [ForwarderRepoId <-. fwridsOld]
|
|
deleteWhere [ForwarderDeckId <-. fwdidsOld]
|
|
deleteWhere [ForwarderLoomId <-. fwlidsOld]
|
|
deleteWhere [ForwardingId <-. fwidsOld]
|
|
|
|
return
|
|
( groupUnlinked lonelyNew
|
|
, groupLinked linkedNew
|
|
, groupForwarding forwardingNew
|
|
)
|
|
|
|
let deliver = deliverHttpBL
|
|
logInfo "Periodic delivery prepared DB, starting async HTTP POSTs"
|
|
|
|
logDebug $
|
|
"Periodic delivery forking linked " <>
|
|
T.pack (show $ map (renderAuthority . snd . fst) linkedHttp)
|
|
waitsDL <- traverse (fork . deliverLinked deliver now) linkedHttp
|
|
|
|
logDebug $
|
|
"Periodic delivery forking forwarding " <>
|
|
T.pack (show $ map (renderAuthority . snd . fst) forwardingHttp)
|
|
waitsFW <- traverse (fork . deliverForwarding now) forwardingHttp
|
|
|
|
logDebug $
|
|
"Periodic delivery forking unlinked " <>
|
|
T.pack (show $ map (renderAuthority . snd . fst) unlinkedHttp)
|
|
waitsUDL <- traverse (fork . deliverUnlinked deliver now) unlinkedHttp
|
|
|
|
logDebug $
|
|
T.concat
|
|
[ "Periodic delivery waiting for ", T.pack $ show $ length waitsDL
|
|
, " linked"
|
|
]
|
|
resultsDL <- sequence waitsDL
|
|
unless (and resultsDL) $ logError "Periodic delivery DL error"
|
|
|
|
logDebug $
|
|
T.concat
|
|
[ "Periodic delivery waiting for ", T.pack $ show $ length waitsFW
|
|
, " forwarding"
|
|
]
|
|
resultsFW <- sequence waitsFW
|
|
unless (and resultsFW) $ logError "Periodic delivery FW error"
|
|
|
|
logDebug $
|
|
T.concat
|
|
[ "Periodic delivery waiting for "
|
|
, T.pack $ show $ length waitsUDL, " unlinked"
|
|
]
|
|
resultsUDL <- sequence waitsUDL
|
|
unless (and resultsUDL) $ logError "Periodic delivery UDL error"
|
|
|
|
logInfo "Periodic delivery done"
|
|
where
|
|
adaptUnlinked
|
|
(E.Value iid, E.Value h, E.Value uraid, E.Value luRecip, E.Value since, E.Value udlid, E.Value obid, E.Value fwd, E.Value act, E.Value mraid, E.Value mrcid) =
|
|
( Left <$> mraid <|> Right <$> mrcid
|
|
, ( ( (iid, h)
|
|
, ((uraid, luRecip), (udlid, fwd, obid, BL.fromStrict $ persistJSONBytes act))
|
|
)
|
|
, since
|
|
)
|
|
)
|
|
|
|
unlinkedID ((_, (_, (udlid, _, _, _))), _) = udlid
|
|
|
|
toLinked (Left raid, ((_, (_, (_, fwd, obid, _))), _)) = Just $ Delivery raid obid fwd False
|
|
toLinked (Right _ , _ ) = Nothing
|
|
|
|
relevant dropAfter now since = addUTCTime dropAfter since > now
|
|
|
|
decideBySinceUDL dropAfter now (udl@(_, (_, (udlid, _, _, _))), msince) =
|
|
case msince of
|
|
Nothing -> Right udl
|
|
Just since ->
|
|
if relevant dropAfter now since
|
|
then Right udl
|
|
else Left udlid
|
|
|
|
adaptLinked
|
|
(E.Value iid, E.Value h, E.Value raid, E.Value ident, E.Value inbox, E.Value since, E.Value dlid, E.Value fwd, E.Value act) =
|
|
( ( (iid, h)
|
|
, ((raid, (ident, inbox)), (dlid, fwd, BL.fromStrict $ persistJSONBytes act))
|
|
)
|
|
, since
|
|
)
|
|
|
|
decideBySinceDL dropAfter now (dl@(_, (_, (dlid, _, _))), msince) =
|
|
case msince of
|
|
Nothing -> Right dl
|
|
Just since ->
|
|
if relevant dropAfter now since
|
|
then Right dl
|
|
else Left dlid
|
|
|
|
adaptForwarding
|
|
( Entity iid (Instance h)
|
|
, Entity raid (RemoteActor _ _ inbox _ since)
|
|
, Entity fwid (Forwarding _ _ body sig _)
|
|
, mfwp, mfwg, mfwr, mfwd, mfwl
|
|
) =
|
|
( ( (iid, h)
|
|
, ( (raid, inbox)
|
|
, ( fwid
|
|
, BL.fromStrict body
|
|
, case (mfwp, mfwg, mfwr, mfwd, mfwl) of
|
|
(Nothing, Nothing, Nothing, Nothing, Nothing) ->
|
|
error "Found fwid without a Forwarder* record"
|
|
(Just fwp, Nothing, Nothing, Nothing, Nothing) ->
|
|
FwderPerson fwp
|
|
(Nothing, Just fwg, Nothing, Nothing, Nothing) ->
|
|
FwderGroup fwg
|
|
(Nothing, Nothing, Just fwr, Nothing, Nothing) ->
|
|
FwderRepo fwr
|
|
(Nothing, Nothing, Nothing, Just fwd, Nothing) ->
|
|
FwderDeck fwd
|
|
(Nothing, Nothing, Nothing, Nothing, Just fwl) ->
|
|
FwderLoom fwl
|
|
_ -> error "Found fwid with multiple forwarders"
|
|
, sig
|
|
)
|
|
)
|
|
)
|
|
, since
|
|
)
|
|
|
|
decideBySinceFW dropAfter now (fw@(_, (_, (fwid, _, fwder, _))), msince) =
|
|
case msince of
|
|
Nothing -> Right fw
|
|
Just since ->
|
|
if relevant dropAfter now since
|
|
then Right fw
|
|
else Left (fwid, bmap entityKey fwder)
|
|
|
|
groupUnlinked
|
|
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
|
|
. groupWithExtractBy ((==) `on` fst) fst snd
|
|
|
|
groupLinked
|
|
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
|
|
. groupWithExtractBy ((==) `on` fst) fst snd
|
|
|
|
groupForwarding
|
|
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
|
|
. groupWithExtractBy ((==) `on` fst) fst snd
|
|
|
|
fork action = do
|
|
wait <- asyncWorker action
|
|
return $ do
|
|
result <- wait
|
|
case result of
|
|
Left e -> do
|
|
logError $ "Periodic delivery error! " <> T.pack (displayException e)
|
|
return False
|
|
Right success -> return success
|
|
|
|
deliverLinked deliver now ((_, h), recips) = do
|
|
logDebug $ "Periodic deliver starting linked for host " <> renderAuthority h
|
|
waitsR <- for recips $ \ ((raid, (ident, inbox)), delivs) -> fork $ do
|
|
logDebug $
|
|
"Periodic deliver starting linked for actor " <>
|
|
renderObjURI (ObjURI h ident)
|
|
waitsD <- for delivs $ \ (dlid, fwd, doc) -> fork $ do
|
|
let fwd' = if fwd then Just ident else Nothing
|
|
e <- deliver doc fwd' h inbox
|
|
case e of
|
|
Left err -> do
|
|
logError $ T.concat
|
|
[ "Periodic DL delivery #", T.pack $ show dlid
|
|
, " error for <", renderObjURI $ ObjURI h ident, ">: "
|
|
, T.pack $ displayException err
|
|
]
|
|
return False
|
|
Right _resp -> do
|
|
runSiteDB $ delete dlid
|
|
return True
|
|
results <- sequence waitsD
|
|
runSiteDB $
|
|
if and results
|
|
then update raid [RemoteActorErrorSince =. Nothing]
|
|
else if or results
|
|
then update raid [RemoteActorErrorSince =. Just now]
|
|
else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
|
return True
|
|
results <- sequence waitsR
|
|
unless (and results) $
|
|
logError $ "Periodic DL delivery error for host " <> renderAuthority h
|
|
return True
|
|
|
|
deliverUnlinked deliver now ((iid, h), recips) = do
|
|
logDebug $ "Periodic deliver starting unlinked for host " <> renderAuthority h
|
|
waitsR <- for recips $ \ ((uraid, luRecip), delivs) -> fork $ do
|
|
logDebug $
|
|
"Periodic deliver starting unlinked for actor " <>
|
|
renderObjURI (ObjURI h luRecip)
|
|
e <- fetchRemoteActor iid h luRecip
|
|
case e of
|
|
Right (Right mera) ->
|
|
case mera of
|
|
Nothing -> runSiteDB $ deleteWhere [UnlinkedDeliveryId <-. map fst4 (NE.toList delivs)]
|
|
Just (Entity raid ra) -> do
|
|
waitsD <- for delivs $ \ (udlid, fwd, obid, doc) -> fork $ do
|
|
let fwd' = if fwd then Just luRecip else Nothing
|
|
e' <- deliver doc fwd' h $ remoteActorInbox ra
|
|
case e' of
|
|
Left _err -> do
|
|
runSiteDB $ do
|
|
delete udlid
|
|
insert_ $ Delivery raid obid fwd False
|
|
return False
|
|
Right _resp -> do
|
|
runSiteDB $ delete udlid
|
|
return True
|
|
results <- sequence waitsD
|
|
runSiteDB $
|
|
if and results
|
|
then update raid [RemoteActorErrorSince =. Nothing]
|
|
else if or results
|
|
then update raid [RemoteActorErrorSince =. Just now]
|
|
else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
|
_ -> runSiteDB $ updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
|
|
return True
|
|
results <- sequence waitsR
|
|
unless (and results) $
|
|
logError $ "Periodic UDL delivery error for host " <> renderAuthority h
|
|
return True
|
|
|
|
deliverForwarding now ((_, h), recips) = do
|
|
logDebug $ "Periodic deliver starting forwarding for host " <> renderAuthority h
|
|
waitsR <- for recips $ \ ((raid, inbox), delivs) -> fork $ do
|
|
logDebug $
|
|
"Periodic deliver starting forwarding for inbox " <>
|
|
renderObjURI (ObjURI h inbox)
|
|
waitsD <- for delivs $ \ (fwid, body, fwderE, sig) -> fork $ do
|
|
let (fwderK, senderK) = splitForwarder fwderE
|
|
sender <- renderLocalActor <$> hashLocalActor senderK
|
|
e <- forwardActivity (ObjURI h inbox) sig sender body
|
|
case e of
|
|
Left _err -> return False
|
|
Right _resp -> do
|
|
runSiteDB $ do
|
|
case fwderK of
|
|
FwderPerson k -> delete k
|
|
FwderGroup k -> delete k
|
|
FwderRepo k -> delete k
|
|
FwderDeck k -> delete k
|
|
FwderLoom k -> delete k
|
|
delete fwid
|
|
return True
|
|
results <- sequence waitsD
|
|
runSiteDB $
|
|
if and results
|
|
then update raid [RemoteActorErrorSince =. Nothing]
|
|
else if or results
|
|
then update raid [RemoteActorErrorSince =. Just now]
|
|
else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
|
|
return True
|
|
results <- sequence waitsR
|
|
unless (and results) $
|
|
logError $ "Periodic FW delivery error for host " <> renderAuthority h
|
|
return True
|
|
where
|
|
splitForwarder (FwderPerson (Entity f (ForwarderPerson _ p))) =
|
|
(FwderPerson f, LocalActorPerson p)
|
|
splitForwarder (FwderGroup (Entity f (ForwarderGroup _ g))) =
|
|
(FwderGroup f, LocalActorGroup g)
|
|
splitForwarder (FwderRepo (Entity f (ForwarderRepo _ r))) =
|
|
(FwderRepo f, LocalActorRepo r)
|
|
splitForwarder (FwderDeck (Entity f (ForwarderDeck _ d))) =
|
|
(FwderDeck f, LocalActorDeck d)
|
|
splitForwarder (FwderLoom (Entity f (ForwarderLoom _ l))) =
|
|
(FwderLoom f, LocalActorLoom l)
|