1
0
Fork 0
mirror of https://code.sup39.dev/repos/Wqawg synced 2024-12-27 18:04:53 +09:00

S2S: Prepare inbox handlers for Repo, Deck and Loom

This commit is contained in:
fr33domlover 2022-09-06 10:52:14 +00:00
parent cdcf3a3326
commit 9dddc7d846
16 changed files with 618 additions and 585 deletions

View file

@ -1868,7 +1868,7 @@ inviteC (Entity senderPersonID senderPerson) senderActor muCap summary audience
case capID of case capID of
Left (actor, _, item) -> return (actor, item) Left (actor, _, item) -> return (actor, item)
Right _ -> throwE "Capability is a remote URI, i.e. not authored by the local topic" Right _ -> throwE "Capability is a remote URI, i.e. not authored by the local topic"
verifyCapability capability senderPersonID $ bmap entityKey r verifyCapability capability (Left senderPersonID) (bmap entityKey r)
Right _ -> pure () Right _ -> pure ()
-- Insert new Collab to DB -- Insert new Collab to DB

View file

@ -84,6 +84,7 @@ import Control.Monad.Trans.Except
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Control.Monad.Trans.Reader import Control.Monad.Trans.Reader
import Data.Barbie import Data.Barbie
import Data.Bifunctor
import Data.Foldable import Data.Foldable
import Data.Maybe import Data.Maybe
import Data.Text (Text) import Data.Text (Text)
@ -97,6 +98,7 @@ import Yesod.Hashids
import Yesod.MonadSite import Yesod.MonadSite
import Control.Monad.Trans.Except.Local import Control.Monad.Trans.Except.Local
import Data.Either.Local
import Database.Persist.Local import Database.Persist.Local
import Vervis.ActivityPub import Vervis.ActivityPub
@ -304,10 +306,10 @@ grantResourceLocalActor (GrantResourceLoom l) = LocalActorLoom l
verifyCapability verifyCapability
:: (LocalActorBy Key, OutboxItemId) :: (LocalActorBy Key, OutboxItemId)
-> PersonId -> Either PersonId RemoteActorId
-> GrantResourceBy Key -> GrantResourceBy Key
-> ExceptT Text (ReaderT SqlBackend Handler) () -> ExceptT Text (ReaderT SqlBackend Handler) ()
verifyCapability (capActor, capItem) personID resource = do verifyCapability (capActor, capItem) actor resource = do
-- Find the activity itself by URI in the DB -- Find the activity itself by URI in the DB
nameExceptT "Capability activity not found" $ nameExceptT "Capability activity not found" $
@ -320,16 +322,17 @@ verifyCapability (capActor, capItem) personID resource = do
fromMaybeE maybeEnable "No CollabEnable for this activity" fromMaybeE maybeEnable "No CollabEnable for this activity"
-- Find the recipient of that Collab -- Find the recipient of that Collab
recipID <- do recipID <-
mcrl <- lift $ getValBy $ UniqueCollabRecipLocal collabID lift $ bimap collabRecipLocalPerson collabRecipRemoteActor <$>
crl <- fromMaybeE mcrl "No local recip for capability" requireEitherAlt
mcrr <- lift $ getBy $ UniqueCollabRecipRemote collabID (getValBy $ UniqueCollabRecipLocal collabID)
for_ mcrr $ \ _ -> error "Both local & remote recip for capability!" (getValBy $ UniqueCollabRecipRemote collabID)
return $ collabRecipLocalPerson crl "No collab recip"
"Both local and remote recips for collab"
-- Verify the recipient is the expected one -- Verify the recipient is the expected one
unless (recipID == personID) $ unless (recipID == actor) $
throwE "Collab recipient is some other Person" throwE "Collab recipient is someone else"
-- Find the local topic, on which this Collab gives access -- Find the local topic, on which this Collab gives access
topic <- lift $ do topic <- lift $ do

View file

@ -94,7 +94,7 @@ import Web.Hashids.Local
import Vervis.ActorKey (generateActorKey, actorKeyRotator) import Vervis.ActorKey (generateActorKey, actorKeyRotator)
import Vervis.Darcs import Vervis.Darcs
import Vervis.Federation import Vervis.Delivery
import Vervis.Foundation import Vervis.Foundation
import Vervis.Git import Vervis.Git
import Vervis.Hook import Vervis.Hook

View file

@ -13,13 +13,19 @@
- <http://creativecommons.org/publicdomain/zero/1.0/>. - <http://creativecommons.org/publicdomain/zero/1.0/>.
-} -}
-- These are for Barbie-related generated instances for ForwarderBy
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
module Vervis.Delivery module Vervis.Delivery
( deliverHttp ( deliverHttp
, deliverHttpBL , deliverHttpBL
, deliverRemoteDB_D , deliverRemoteDB_D
, deliverRemoteDB_L
, deliverRemoteDB_P , deliverRemoteDB_P
, deliverRemoteDB_R , deliverRemoteDB_R
, deliverRemoteHTTP_D , deliverRemoteHTTP_D
, deliverRemoteHTTP_L
, deliverRemoteHTTP_P , deliverRemoteHTTP_P
, deliverRemoteHTTP_R , deliverRemoteHTTP_R
, deliverRemoteDB' , deliverRemoteDB'
@ -29,6 +35,8 @@ module Vervis.Delivery
, deliverLocal' , deliverLocal'
, deliverLocal , deliverLocal
, insertRemoteActivityToLocalInboxes , insertRemoteActivityToLocalInboxes
, fixRunningDeliveries
, retryOutboxDelivery
) )
where where
@ -42,6 +50,7 @@ import Control.Monad.Trans.Class
import Control.Monad.Trans.Except import Control.Monad.Trans.Except
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Control.Monad.Trans.Reader import Control.Monad.Trans.Reader
import Data.Barbie
import Data.Bifunctor import Data.Bifunctor
import Data.Bitraversable import Data.Bitraversable
import Data.ByteString (ByteString) import Data.ByteString (ByteString)
@ -57,6 +66,7 @@ import Data.Time.Clock
import Data.Traversable import Data.Traversable
import Database.Persist import Database.Persist
import Database.Persist.Sql import Database.Persist.Sql
import GHC.Generics
import Network.HTTP.Client import Network.HTTP.Client
import Network.TLS -- hiding (SHA256) import Network.TLS -- hiding (SHA256)
import Text.Blaze.Html (preEscapedToHtml) import Text.Blaze.Html (preEscapedToHtml)
@ -90,6 +100,7 @@ import qualified Web.ActivityPub as AP
import Control.Monad.Trans.Except.Local import Control.Monad.Trans.Except.Local
import Data.Either.Local import Data.Either.Local
import Data.List.NonEmpty.Local import Data.List.NonEmpty.Local
import Data.Maybe.Local
import Data.Patch.Local hiding (Patch) import Data.Patch.Local hiding (Patch)
import Data.Tuple.Local import Data.Tuple.Local
import Database.Persist.Local import Database.Persist.Local
@ -162,6 +173,17 @@ deliverRemoteDB_D
[((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderDeckId))] [((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderDeckId))]
deliverRemoteDB_D = deliverRemoteDB_ ForwarderDeck deliverRemoteDB_D = deliverRemoteDB_ ForwarderDeck
deliverRemoteDB_L
:: MonadIO m
=> BL.ByteString
-> RemoteActivityId
-> LoomId
-> ByteString
-> [((InstanceId, Host), NonEmpty RemoteRecipient)]
-> ReaderT SqlBackend m
[((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderLoomId))]
deliverRemoteDB_L = deliverRemoteDB_ ForwarderLoom
deliverRemoteDB_P deliverRemoteDB_P
:: MonadIO m :: MonadIO m
=> BL.ByteString => BL.ByteString
@ -244,6 +266,17 @@ deliverRemoteHTTP_D
deliverRemoteHTTP_D now dkhid = deliverRemoteHTTP_D now dkhid =
deliverRemoteHTTP' now $ LocalActorDeck dkhid deliverRemoteHTTP' now $ LocalActorDeck dkhid
deliverRemoteHTTP_L
:: (MonadSite m, SiteEnv m ~ App)
=> UTCTime
-> KeyHashid Loom
-> BL.ByteString
-> ByteString
-> [((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderLoomId))]
-> m ()
deliverRemoteHTTP_L now lkhid =
deliverRemoteHTTP' now $ LocalActorLoom lkhid
deliverRemoteHTTP_P deliverRemoteHTTP_P
:: (MonadSite m, SiteEnv m ~ App) :: (MonadSite m, SiteEnv m ~ App)
=> UTCTime => UTCTime
@ -806,3 +839,429 @@ insertRemoteActivityToLocalInboxes requireOwner ractid =
insertActivityToLocalInboxes makeItem requireOwner Nothing Nothing insertActivityToLocalInboxes makeItem requireOwner Nothing Nothing
where where
makeItem ibid ibiid = InboxItemRemote ibid ractid ibiid makeItem ibid ibiid = InboxItemRemote ibid ractid ibiid
fixRunningDeliveries :: (MonadIO m, MonadLogger m, IsSqlBackend backend) => ReaderT backend m ()
fixRunningDeliveries = do
c <- updateWhereCount [UnlinkedDeliveryRunning ==. True] [UnlinkedDeliveryRunning =. False]
unless (c == 0) $ logWarn $ T.concat
[ "fixRunningDeliveries fixed "
, T.pack (show c)
, " linked deliveries"
]
c' <- updateWhereCount [DeliveryRunning ==. True] [DeliveryRunning =. False]
unless (c' == 0) $ logWarn $ T.concat
[ "fixRunningDeliveries fixed "
, T.pack (show c')
, " unlinked deliveries"
]
c'' <- updateWhereCount [ForwardingRunning ==. True] [ForwardingRunning =. False]
unless (c'' == 0) $ logWarn $ T.concat
[ "fixRunningDeliveries fixed "
, T.pack (show c'')
, " forwarding deliveries"
]
data ForwarderBy f
= FwderPerson (f ForwarderPerson)
| FwderGroup (f ForwarderGroup)
| FwderRepo (f ForwarderRepo)
| FwderDeck (f ForwarderDeck)
| FwderLoom (f ForwarderLoom)
deriving (Generic, FunctorB, ConstraintsB)
partitionFwders
:: [ForwarderBy f]
-> ( [f ForwarderPerson]
, [f ForwarderGroup]
, [f ForwarderRepo]
, [f ForwarderDeck]
, [f ForwarderLoom]
)
partitionFwders = foldl' f ([], [], [], [], [])
where
f (ps, gs, rs, ds, ls) = \ fwder ->
case fwder of
FwderPerson p -> (p : ps, gs, rs, ds, ls)
FwderGroup g -> (ps, g : gs, rs, ds, ls)
FwderRepo r -> (ps, gs, r : rs, ds, ls)
FwderDeck d -> (ps, gs, rs, d : ds, ls)
FwderLoom l -> (ps, gs, rs, ds, l : ls)
retryOutboxDelivery :: Worker ()
retryOutboxDelivery = do
logInfo "Periodic delivery starting"
now <- liftIO $ getCurrentTime
(unlinkedHttp, linkedHttp, forwardingHttp) <- runSiteDB $ do
-- Get all unlinked deliveries which aren't running already in outbox
-- post handlers
unlinked' <- E.select $ E.from $ \ (udl `E.InnerJoin` ob `E.InnerJoin` ura `E.InnerJoin` ro `E.InnerJoin` i `E.LeftOuterJoin` ra `E.LeftOuterJoin` rc) -> do
E.on $ E.just (ro E.^. RemoteObjectId) E.==. rc E.?. RemoteCollectionIdent
E.on $ E.just (ro E.^. RemoteObjectId) E.==. ra E.?. RemoteActorIdent
E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId
E.on $ ura E.^. UnfetchedRemoteActorIdent E.==. ro E.^. RemoteObjectId
E.on $ udl E.^. UnlinkedDeliveryRecipient E.==. ura E.^. UnfetchedRemoteActorId
E.on $ udl E.^. UnlinkedDeliveryActivity E.==. ob E.^. OutboxItemId
E.where_ $ udl E.^. UnlinkedDeliveryRunning E.==. E.val False
E.orderBy [E.asc $ ro E.^. RemoteObjectInstance, E.asc $ ura E.^. UnfetchedRemoteActorId]
return
( i E.^. InstanceId
, i E.^. InstanceHost
, ura E.^. UnfetchedRemoteActorId
, ro E.^. RemoteObjectIdent
, ura E.^. UnfetchedRemoteActorSince
, udl E.^. UnlinkedDeliveryId
, udl E.^. UnlinkedDeliveryActivity
, udl E.^. UnlinkedDeliveryForwarding
, ob E.^. OutboxItemActivity
, ra E.?. RemoteActorId
, rc E.?. RemoteCollectionId
)
-- Strip the E.Value wrappers and organize the records for the
-- filtering and grouping we'll need to do
let unlinked = map adaptUnlinked unlinked'
-- Split into found (recipient has been reached) and lonely (recipient
-- hasn't been reached
(found, lonely) = partitionMaybes unlinked
-- Turn the found ones into linked deliveries
deleteWhere [UnlinkedDeliveryId <-. map (unlinkedID . snd) found]
insertMany_ $ mapMaybe toLinked found
-- We're left with the lonely ones. We'll check which actors have been
-- unreachable for too long, and we'll delete deliveries for them. The
-- rest of the actors we'll try to reach by HTTP.
dropAfter <- lift $ asksSite $ appDropDeliveryAfter . appSettings
let (lonelyOld, lonelyNew) =
partitionEithers $ map (decideBySinceUDL dropAfter now) lonely
deleteWhere [UnlinkedDeliveryId <-. lonelyOld]
-- Now let's grab the linked deliveries, and similarly delete old ones
-- and return the rest for HTTP delivery.
linked <- E.select $ E.from $ \ (dl `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i `E.InnerJoin` ob) -> do
E.on $ dl E.^. DeliveryActivity E.==. ob E.^. OutboxItemId
E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId
E.on $ ra E.^. RemoteActorIdent E.==. ro E.^. RemoteObjectId
E.on $ dl E.^. DeliveryRecipient E.==. ra E.^. RemoteActorId
E.where_ $ dl E.^. DeliveryRunning E.==. E.val False
E.orderBy [E.asc $ ro E.^. RemoteObjectInstance, E.asc $ ra E.^. RemoteActorId]
return
( i E.^. InstanceId
, i E.^. InstanceHost
, ra E.^. RemoteActorId
, ro E.^. RemoteObjectIdent
, ra E.^. RemoteActorInbox
, ra E.^. RemoteActorErrorSince
, dl E.^. DeliveryId
, dl E.^. DeliveryForwarding
, ob E.^. OutboxItemActivity
)
let (linkedOld, linkedNew) =
partitionEithers $
map (decideBySinceDL dropAfter now . adaptLinked) linked
deleteWhere [DeliveryId <-. linkedOld]
-- Same for forwarding deliveries, which are always linked
forwarding <- E.select $ E.from $
\ (fw `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i
`E.LeftOuterJoin` fwp
`E.LeftOuterJoin` fwg
`E.LeftOuterJoin` fwr
`E.LeftOuterJoin` fwd
`E.LeftOuterJoin` fwl
) -> do
E.on $ E.just (fw E.^. ForwardingId) E.==. fwl E.?. ForwarderLoomTask
E.on $ E.just (fw E.^. ForwardingId) E.==. fwd E.?. ForwarderDeckTask
E.on $ E.just (fw E.^. ForwardingId) E.==. fwr E.?. ForwarderRepoTask
E.on $ E.just (fw E.^. ForwardingId) E.==. fwg E.?. ForwarderGroupTask
E.on $ E.just (fw E.^. ForwardingId) E.==. fwp E.?. ForwarderPersonTask
E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId
E.on $ ra E.^. RemoteActorIdent E.==. ro E.^. RemoteObjectId
E.on $ fw E.^. ForwardingRecipient E.==. ra E.^. RemoteActorId
E.where_ $ fw E.^. ForwardingRunning E.==. E.val False
E.orderBy [E.asc $ ro E.^. RemoteObjectInstance, E.asc $ ra E.^. RemoteActorId]
return (i, ra, fw, fwp, fwg, fwr, fwd, fwl)
let (forwardingOld, forwardingNew) =
partitionEithers $
map (decideBySinceFW dropAfter now . adaptForwarding)
forwarding
(fwidsOld, fwdersOld) = unzip forwardingOld
(fwpidsOld, fwgidsOld, fwridsOld, fwdidsOld, fwlidsOld) =
partitionFwders fwdersOld
deleteWhere [ForwarderPersonId <-. fwpidsOld]
deleteWhere [ForwarderGroupId <-. fwgidsOld]
deleteWhere [ForwarderRepoId <-. fwridsOld]
deleteWhere [ForwarderDeckId <-. fwdidsOld]
deleteWhere [ForwarderLoomId <-. fwlidsOld]
deleteWhere [ForwardingId <-. fwidsOld]
return
( groupUnlinked lonelyNew
, groupLinked linkedNew
, groupForwarding forwardingNew
)
let deliver = deliverHttpBL
logInfo "Periodic delivery prepared DB, starting async HTTP POSTs"
logDebug $
"Periodic delivery forking linked " <>
T.pack (show $ map (renderAuthority . snd . fst) linkedHttp)
waitsDL <- traverse (fork . deliverLinked deliver now) linkedHttp
logDebug $
"Periodic delivery forking forwarding " <>
T.pack (show $ map (renderAuthority . snd . fst) forwardingHttp)
waitsFW <- traverse (fork . deliverForwarding now) forwardingHttp
logDebug $
"Periodic delivery forking unlinked " <>
T.pack (show $ map (renderAuthority . snd . fst) unlinkedHttp)
waitsUDL <- traverse (fork . deliverUnlinked deliver now) unlinkedHttp
logDebug $
T.concat
[ "Periodic delivery waiting for ", T.pack $ show $ length waitsDL
, " linked"
]
resultsDL <- sequence waitsDL
unless (and resultsDL) $ logError "Periodic delivery DL error"
logDebug $
T.concat
[ "Periodic delivery waiting for ", T.pack $ show $ length waitsFW
, " forwarding"
]
resultsFW <- sequence waitsFW
unless (and resultsFW) $ logError "Periodic delivery FW error"
logDebug $
T.concat
[ "Periodic delivery waiting for "
, T.pack $ show $ length waitsUDL, " unlinked"
]
resultsUDL <- sequence waitsUDL
unless (and resultsUDL) $ logError "Periodic delivery UDL error"
logInfo "Periodic delivery done"
where
adaptUnlinked
(E.Value iid, E.Value h, E.Value uraid, E.Value luRecip, E.Value since, E.Value udlid, E.Value obid, E.Value fwd, E.Value act, E.Value mraid, E.Value mrcid) =
( Left <$> mraid <|> Right <$> mrcid
, ( ( (iid, h)
, ((uraid, luRecip), (udlid, fwd, obid, BL.fromStrict $ persistJSONBytes act))
)
, since
)
)
unlinkedID ((_, (_, (udlid, _, _, _))), _) = udlid
toLinked (Left raid, ((_, (_, (_, fwd, obid, _))), _)) = Just $ Delivery raid obid fwd False
toLinked (Right _ , _ ) = Nothing
relevant dropAfter now since = addUTCTime dropAfter since > now
decideBySinceUDL dropAfter now (udl@(_, (_, (udlid, _, _, _))), msince) =
case msince of
Nothing -> Right udl
Just since ->
if relevant dropAfter now since
then Right udl
else Left udlid
adaptLinked
(E.Value iid, E.Value h, E.Value raid, E.Value ident, E.Value inbox, E.Value since, E.Value dlid, E.Value fwd, E.Value act) =
( ( (iid, h)
, ((raid, (ident, inbox)), (dlid, fwd, BL.fromStrict $ persistJSONBytes act))
)
, since
)
decideBySinceDL dropAfter now (dl@(_, (_, (dlid, _, _))), msince) =
case msince of
Nothing -> Right dl
Just since ->
if relevant dropAfter now since
then Right dl
else Left dlid
adaptForwarding
( Entity iid (Instance h)
, Entity raid (RemoteActor _ _ inbox _ since)
, Entity fwid (Forwarding _ _ body sig _)
, mfwp, mfwg, mfwr, mfwd, mfwl
) =
( ( (iid, h)
, ( (raid, inbox)
, ( fwid
, BL.fromStrict body
, case (mfwp, mfwg, mfwr, mfwd, mfwl) of
(Nothing, Nothing, Nothing, Nothing, Nothing) ->
error "Found fwid without a Forwarder* record"
(Just fwp, Nothing, Nothing, Nothing, Nothing) ->
FwderPerson fwp
(Nothing, Just fwg, Nothing, Nothing, Nothing) ->
FwderGroup fwg
(Nothing, Nothing, Just fwr, Nothing, Nothing) ->
FwderRepo fwr
(Nothing, Nothing, Nothing, Just fwd, Nothing) ->
FwderDeck fwd
(Nothing, Nothing, Nothing, Nothing, Just fwl) ->
FwderLoom fwl
_ -> error "Found fwid with multiple forwarders"
, sig
)
)
)
, since
)
decideBySinceFW dropAfter now (fw@(_, (_, (fwid, _, fwder, _))), msince) =
case msince of
Nothing -> Right fw
Just since ->
if relevant dropAfter now since
then Right fw
else Left (fwid, bmap entityKey fwder)
groupUnlinked
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
. groupWithExtractBy ((==) `on` fst) fst snd
groupLinked
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
. groupWithExtractBy ((==) `on` fst) fst snd
groupForwarding
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
. groupWithExtractBy ((==) `on` fst) fst snd
fork action = do
wait <- asyncWorker action
return $ do
result <- wait
case result of
Left e -> do
logError $ "Periodic delivery error! " <> T.pack (displayException e)
return False
Right success -> return success
deliverLinked deliver now ((_, h), recips) = do
logDebug $ "Periodic deliver starting linked for host " <> renderAuthority h
waitsR <- for recips $ \ ((raid, (ident, inbox)), delivs) -> fork $ do
logDebug $
"Periodic deliver starting linked for actor " <>
renderObjURI (ObjURI h ident)
waitsD <- for delivs $ \ (dlid, fwd, doc) -> fork $ do
let fwd' = if fwd then Just ident else Nothing
e <- deliver doc fwd' h inbox
case e of
Left err -> do
logError $ T.concat
[ "Periodic DL delivery #", T.pack $ show dlid
, " error for <", renderObjURI $ ObjURI h ident, ">: "
, T.pack $ displayException err
]
return False
Right _resp -> do
runSiteDB $ delete dlid
return True
results <- sequence waitsD
runSiteDB $
if and results
then update raid [RemoteActorErrorSince =. Nothing]
else if or results
then update raid [RemoteActorErrorSince =. Just now]
else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
return True
results <- sequence waitsR
unless (and results) $
logError $ "Periodic DL delivery error for host " <> renderAuthority h
return True
deliverUnlinked deliver now ((iid, h), recips) = do
logDebug $ "Periodic deliver starting unlinked for host " <> renderAuthority h
waitsR <- for recips $ \ ((uraid, luRecip), delivs) -> fork $ do
logDebug $
"Periodic deliver starting unlinked for actor " <>
renderObjURI (ObjURI h luRecip)
e <- fetchRemoteActor iid h luRecip
case e of
Right (Right mera) ->
case mera of
Nothing -> runSiteDB $ deleteWhere [UnlinkedDeliveryId <-. map fst4 (NE.toList delivs)]
Just (Entity raid ra) -> do
waitsD <- for delivs $ \ (udlid, fwd, obid, doc) -> fork $ do
let fwd' = if fwd then Just luRecip else Nothing
e' <- deliver doc fwd' h $ remoteActorInbox ra
case e' of
Left _err -> do
runSiteDB $ do
delete udlid
insert_ $ Delivery raid obid fwd False
return False
Right _resp -> do
runSiteDB $ delete udlid
return True
results <- sequence waitsD
runSiteDB $
if and results
then update raid [RemoteActorErrorSince =. Nothing]
else if or results
then update raid [RemoteActorErrorSince =. Just now]
else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
_ -> runSiteDB $ updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
return True
results <- sequence waitsR
unless (and results) $
logError $ "Periodic UDL delivery error for host " <> renderAuthority h
return True
deliverForwarding now ((_, h), recips) = do
logDebug $ "Periodic deliver starting forwarding for host " <> renderAuthority h
waitsR <- for recips $ \ ((raid, inbox), delivs) -> fork $ do
logDebug $
"Periodic deliver starting forwarding for inbox " <>
renderObjURI (ObjURI h inbox)
waitsD <- for delivs $ \ (fwid, body, fwderE, sig) -> fork $ do
let (fwderK, senderK) = splitForwarder fwderE
sender <- renderLocalActor <$> hashLocalActor senderK
e <- forwardActivity (ObjURI h inbox) sig sender body
case e of
Left _err -> return False
Right _resp -> do
runSiteDB $ do
case fwderK of
FwderPerson k -> delete k
FwderGroup k -> delete k
FwderRepo k -> delete k
FwderDeck k -> delete k
FwderLoom k -> delete k
delete fwid
return True
results <- sequence waitsD
runSiteDB $
if and results
then update raid [RemoteActorErrorSince =. Nothing]
else if or results
then update raid [RemoteActorErrorSince =. Just now]
else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
return True
results <- sequence waitsR
unless (and results) $
logError $ "Periodic FW delivery error for host " <> renderAuthority h
return True
where
splitForwarder (FwderPerson (Entity f (ForwarderPerson _ p))) =
(FwderPerson f, LocalActorPerson p)
splitForwarder (FwderGroup (Entity f (ForwarderGroup _ g))) =
(FwderGroup f, LocalActorGroup g)
splitForwarder (FwderRepo (Entity f (ForwarderRepo _ r))) =
(FwderRepo f, LocalActorRepo r)
splitForwarder (FwderDeck (Entity f (ForwarderDeck _ d))) =
(FwderDeck f, LocalActorDeck d)
splitForwarder (FwderLoom (Entity f (ForwarderLoom _ l))) =
(FwderLoom f, LocalActorLoom l)

View file

@ -13,22 +13,8 @@
- <http://creativecommons.org/publicdomain/zero/1.0/>. - <http://creativecommons.org/publicdomain/zero/1.0/>.
-} -}
-- These are for Barbie-related generated instances for ForwarderBy
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
--{-# LANGUAGE StandaloneDeriving #-}
--{-# LANGUAGE UndecidableInstances #-}
module Vervis.Federation module Vervis.Federation
( (
{-
handlePersonInbox
, handleDeckInbox
, handleLoomInbox
, handleRepoInbox
-}
fixRunningDeliveries
, retryOutboxDelivery
) )
where where
@ -169,540 +155,4 @@ handleProjectInbox shrRecip prjRecip now auth body = do
errorLocalForwarded (ActivityAuthLocalRepo rid) = errorLocalForwarded (ActivityAuthLocalRepo rid) =
"Project inbox got local forwarded activity by rid#" <> "Project inbox got local forwarded activity by rid#" <>
T.pack (show $ fromSqlKey rid) T.pack (show $ fromSqlKey rid)
handleDeckInbox
:: KeyHashid Project
-> UTCTime
-> ActivityAuthentication
-> ActivityBody
-> ExceptT Text Handler (Text, Maybe (ExceptT Text Worker Text))
handleDeckInbox dkkhid now auth body = do
remoteAuthor <-
case auth of
ActivityAuthLocal local -> throwE $ errorLocalForwarded local
ActivityAuthRemote ra -> return ra
luActivity <-
fromMaybeE (activityId $ actbActivity body) "Activity without 'id'"
localRecips <- do
mrecips <- parseAudience $ activityAudience $ actbActivity body
paudLocalRecips <$> fromMaybeE mrecips "Activity with no recipients"
msig <- checkForwarding $ LocalActorProject shrRecip prjRecip
let mfwd = (localRecips,) <$> msig
case activitySpecific $ actbActivity body of
CreateActivity (Create obj mtarget) ->
case obj of
CreateNote _ note ->
(,Nothing) <$> projectCreateNoteF now shrRecip prjRecip remoteAuthor body mfwd luActivity note
CreateTicket _ ticket ->
(,Nothing) <$> projectCreateTicketF now shrRecip prjRecip remoteAuthor body mfwd luActivity ticket mtarget
_ -> error "Unsupported create object type for projects"
FollowActivity follow ->
(,Nothing) <$> projectFollowF shrRecip prjRecip now remoteAuthor body mfwd luActivity follow
OfferActivity (Offer obj target) ->
case obj of
OfferTicket ticket ->
(,Nothing) <$> projectOfferTicketF now shrRecip prjRecip remoteAuthor body mfwd luActivity ticket target
OfferDep dep ->
projectOfferDepF now shrRecip prjRecip remoteAuthor body mfwd luActivity dep target
_ -> return ("Unsupported offer object type for projects", Nothing)
ResolveActivity resolve ->
(,Nothing) <$> projectResolveF now shrRecip prjRecip remoteAuthor body mfwd luActivity resolve
UndoActivity undo ->
(,Nothing) <$> projectUndoF shrRecip prjRecip now remoteAuthor body mfwd luActivity undo
_ -> return ("Unsupported activity type for projects", Nothing)
where
errorLocalForwarded (ActivityAuthLocalPerson pid) =
"Project inbox got local forwarded activity by pid#" <>
T.pack (show $ fromSqlKey pid)
errorLocalForwarded (ActivityAuthLocalProject jid) =
"Project inbox got local forwarded activity by jid#" <>
T.pack (show $ fromSqlKey jid)
errorLocalForwarded (ActivityAuthLocalRepo rid) =
"Project inbox got local forwarded activity by rid#" <>
T.pack (show $ fromSqlKey rid)
handleRepoInbox
:: ShrIdent
-> RpIdent
-> UTCTime
-> ActivityAuthentication
-> ActivityBody
-> ExceptT Text Handler (Text, Maybe (ExceptT Text Worker Text))
handleRepoInbox shrRecip rpRecip now auth body = do
remoteAuthor <-
case auth of
ActivityAuthLocal local -> throwE $ errorLocalForwarded local
ActivityAuthRemote ra -> return ra
luActivity <-
fromMaybeE (activityId $ actbActivity body) "Activity without 'id'"
localRecips <- do
mrecips <- parseAudience $ activityAudience $ actbActivity body
paudLocalRecips <$> fromMaybeE mrecips "Activity with no recipients"
msig <- checkForwarding $ LocalActorRepo shrRecip rpRecip
let mfwd = (localRecips,) <$> msig
case activitySpecific $ actbActivity body of
ApplyActivity (AP.Apply uObject uTarget) ->
repoApplyF now shrRecip rpRecip remoteAuthor body mfwd luActivity uObject uTarget
AddActivity (AP.Add obj target) ->
case obj of
Right (AddBundle patches) ->
repoAddBundleF now shrRecip rpRecip remoteAuthor body mfwd luActivity patches target
_ -> return ("Unsupported add object type for repos", Nothing)
CreateActivity (Create obj mtarget) ->
case obj of
CreateNote _ note ->
(,Nothing) <$> repoCreateNoteF now shrRecip rpRecip remoteAuthor body mfwd luActivity note
CreateTicket _ ticket ->
(,Nothing) <$> repoCreateTicketF now shrRecip rpRecip remoteAuthor body mfwd luActivity ticket mtarget
_ -> error "Unsupported create object type for repos"
FollowActivity follow ->
(,Nothing) <$> repoFollowF shrRecip rpRecip now remoteAuthor body mfwd luActivity follow
OfferActivity (Offer obj target) ->
case obj of
OfferTicket ticket ->
(,Nothing) <$> repoOfferTicketF now shrRecip rpRecip remoteAuthor body mfwd luActivity ticket target
OfferDep dep ->
repoOfferDepF now shrRecip rpRecip remoteAuthor body mfwd luActivity dep target
_ -> return ("Unsupported offer object type for repos", Nothing)
ResolveActivity resolve ->
(,Nothing) <$> repoResolveF now shrRecip rpRecip remoteAuthor body mfwd luActivity resolve
UndoActivity undo->
(,Nothing) <$> repoUndoF shrRecip rpRecip now remoteAuthor body mfwd luActivity undo
_ -> return ("Unsupported activity type for repos", Nothing)
where
errorLocalForwarded (ActivityAuthLocalPerson pid) =
"Repo inbox got local forwarded activity by pid#" <>
T.pack (show $ fromSqlKey pid)
errorLocalForwarded (ActivityAuthLocalProject jid) =
"Repo inbox got local forwarded activity by jid#" <>
T.pack (show $ fromSqlKey jid)
errorLocalForwarded (ActivityAuthLocalRepo rid) =
"Repo inbox got local forwarded activity by rid#" <>
T.pack (show $ fromSqlKey rid)
-} -}
fixRunningDeliveries :: (MonadIO m, MonadLogger m, IsSqlBackend backend) => ReaderT backend m ()
fixRunningDeliveries = do
c <- updateWhereCount [UnlinkedDeliveryRunning ==. True] [UnlinkedDeliveryRunning =. False]
unless (c == 0) $ logWarn $ T.concat
[ "fixRunningDeliveries fixed "
, T.pack (show c)
, " linked deliveries"
]
c' <- updateWhereCount [DeliveryRunning ==. True] [DeliveryRunning =. False]
unless (c' == 0) $ logWarn $ T.concat
[ "fixRunningDeliveries fixed "
, T.pack (show c')
, " unlinked deliveries"
]
c'' <- updateWhereCount [ForwardingRunning ==. True] [ForwardingRunning =. False]
unless (c'' == 0) $ logWarn $ T.concat
[ "fixRunningDeliveries fixed "
, T.pack (show c'')
, " forwarding deliveries"
]
data ForwarderBy f
= FwderPerson (f ForwarderPerson)
| FwderGroup (f ForwarderGroup)
| FwderRepo (f ForwarderRepo)
| FwderDeck (f ForwarderDeck)
| FwderLoom (f ForwarderLoom)
deriving (Generic, FunctorB, ConstraintsB)
partitionFwders
:: [ForwarderBy f]
-> ( [f ForwarderPerson]
, [f ForwarderGroup]
, [f ForwarderRepo]
, [f ForwarderDeck]
, [f ForwarderLoom]
)
partitionFwders = foldl' f ([], [], [], [], [])
where
f (ps, gs, rs, ds, ls) = \ fwder ->
case fwder of
FwderPerson p -> (p : ps, gs, rs, ds, ls)
FwderGroup g -> (ps, g : gs, rs, ds, ls)
FwderRepo r -> (ps, gs, r : rs, ds, ls)
FwderDeck d -> (ps, gs, rs, d : ds, ls)
FwderLoom l -> (ps, gs, rs, ds, l : ls)
retryOutboxDelivery :: Worker ()
retryOutboxDelivery = do
logInfo "Periodic delivery starting"
now <- liftIO $ getCurrentTime
(unlinkedHttp, linkedHttp, forwardingHttp) <- runSiteDB $ do
-- Get all unlinked deliveries which aren't running already in outbox
-- post handlers
unlinked' <- E.select $ E.from $ \ (udl `E.InnerJoin` ob `E.InnerJoin` ura `E.InnerJoin` ro `E.InnerJoin` i `E.LeftOuterJoin` ra `E.LeftOuterJoin` rc) -> do
E.on $ E.just (ro E.^. RemoteObjectId) E.==. rc E.?. RemoteCollectionIdent
E.on $ E.just (ro E.^. RemoteObjectId) E.==. ra E.?. RemoteActorIdent
E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId
E.on $ ura E.^. UnfetchedRemoteActorIdent E.==. ro E.^. RemoteObjectId
E.on $ udl E.^. UnlinkedDeliveryRecipient E.==. ura E.^. UnfetchedRemoteActorId
E.on $ udl E.^. UnlinkedDeliveryActivity E.==. ob E.^. OutboxItemId
E.where_ $ udl E.^. UnlinkedDeliveryRunning E.==. E.val False
E.orderBy [E.asc $ ro E.^. RemoteObjectInstance, E.asc $ ura E.^. UnfetchedRemoteActorId]
return
( i E.^. InstanceId
, i E.^. InstanceHost
, ura E.^. UnfetchedRemoteActorId
, ro E.^. RemoteObjectIdent
, ura E.^. UnfetchedRemoteActorSince
, udl E.^. UnlinkedDeliveryId
, udl E.^. UnlinkedDeliveryActivity
, udl E.^. UnlinkedDeliveryForwarding
, ob E.^. OutboxItemActivity
, ra E.?. RemoteActorId
, rc E.?. RemoteCollectionId
)
-- Strip the E.Value wrappers and organize the records for the
-- filtering and grouping we'll need to do
let unlinked = map adaptUnlinked unlinked'
-- Split into found (recipient has been reached) and lonely (recipient
-- hasn't been reached
(found, lonely) = partitionMaybes unlinked
-- Turn the found ones into linked deliveries
deleteWhere [UnlinkedDeliveryId <-. map (unlinkedID . snd) found]
insertMany_ $ mapMaybe toLinked found
-- We're left with the lonely ones. We'll check which actors have been
-- unreachable for too long, and we'll delete deliveries for them. The
-- rest of the actors we'll try to reach by HTTP.
dropAfter <- lift $ asksSite $ appDropDeliveryAfter . appSettings
let (lonelyOld, lonelyNew) =
partitionEithers $ map (decideBySinceUDL dropAfter now) lonely
deleteWhere [UnlinkedDeliveryId <-. lonelyOld]
-- Now let's grab the linked deliveries, and similarly delete old ones
-- and return the rest for HTTP delivery.
linked <- E.select $ E.from $ \ (dl `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i `E.InnerJoin` ob) -> do
E.on $ dl E.^. DeliveryActivity E.==. ob E.^. OutboxItemId
E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId
E.on $ ra E.^. RemoteActorIdent E.==. ro E.^. RemoteObjectId
E.on $ dl E.^. DeliveryRecipient E.==. ra E.^. RemoteActorId
E.where_ $ dl E.^. DeliveryRunning E.==. E.val False
E.orderBy [E.asc $ ro E.^. RemoteObjectInstance, E.asc $ ra E.^. RemoteActorId]
return
( i E.^. InstanceId
, i E.^. InstanceHost
, ra E.^. RemoteActorId
, ro E.^. RemoteObjectIdent
, ra E.^. RemoteActorInbox
, ra E.^. RemoteActorErrorSince
, dl E.^. DeliveryId
, dl E.^. DeliveryForwarding
, ob E.^. OutboxItemActivity
)
let (linkedOld, linkedNew) =
partitionEithers $
map (decideBySinceDL dropAfter now . adaptLinked) linked
deleteWhere [DeliveryId <-. linkedOld]
-- Same for forwarding deliveries, which are always linked
forwarding <- E.select $ E.from $
\ (fw `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i
`E.LeftOuterJoin` fwp
`E.LeftOuterJoin` fwg
`E.LeftOuterJoin` fwr
`E.LeftOuterJoin` fwd
`E.LeftOuterJoin` fwl
) -> do
E.on $ E.just (fw E.^. ForwardingId) E.==. fwl E.?. ForwarderLoomTask
E.on $ E.just (fw E.^. ForwardingId) E.==. fwd E.?. ForwarderDeckTask
E.on $ E.just (fw E.^. ForwardingId) E.==. fwr E.?. ForwarderRepoTask
E.on $ E.just (fw E.^. ForwardingId) E.==. fwg E.?. ForwarderGroupTask
E.on $ E.just (fw E.^. ForwardingId) E.==. fwp E.?. ForwarderPersonTask
E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId
E.on $ ra E.^. RemoteActorIdent E.==. ro E.^. RemoteObjectId
E.on $ fw E.^. ForwardingRecipient E.==. ra E.^. RemoteActorId
E.where_ $ fw E.^. ForwardingRunning E.==. E.val False
E.orderBy [E.asc $ ro E.^. RemoteObjectInstance, E.asc $ ra E.^. RemoteActorId]
return (i, ra, fw, fwp, fwg, fwr, fwd, fwl)
let (forwardingOld, forwardingNew) =
partitionEithers $
map (decideBySinceFW dropAfter now . adaptForwarding)
forwarding
(fwidsOld, fwdersOld) = unzip forwardingOld
(fwpidsOld, fwgidsOld, fwridsOld, fwdidsOld, fwlidsOld) =
partitionFwders fwdersOld
deleteWhere [ForwarderPersonId <-. fwpidsOld]
deleteWhere [ForwarderGroupId <-. fwgidsOld]
deleteWhere [ForwarderRepoId <-. fwridsOld]
deleteWhere [ForwarderDeckId <-. fwdidsOld]
deleteWhere [ForwarderLoomId <-. fwlidsOld]
deleteWhere [ForwardingId <-. fwidsOld]
return
( groupUnlinked lonelyNew
, groupLinked linkedNew
, groupForwarding forwardingNew
)
let deliver = deliverHttpBL
logInfo "Periodic delivery prepared DB, starting async HTTP POSTs"
logDebug $
"Periodic delivery forking linked " <>
T.pack (show $ map (renderAuthority . snd . fst) linkedHttp)
waitsDL <- traverse (fork . deliverLinked deliver now) linkedHttp
logDebug $
"Periodic delivery forking forwarding " <>
T.pack (show $ map (renderAuthority . snd . fst) forwardingHttp)
waitsFW <- traverse (fork . deliverForwarding now) forwardingHttp
logDebug $
"Periodic delivery forking unlinked " <>
T.pack (show $ map (renderAuthority . snd . fst) unlinkedHttp)
waitsUDL <- traverse (fork . deliverUnlinked deliver now) unlinkedHttp
logDebug $
T.concat
[ "Periodic delivery waiting for ", T.pack $ show $ length waitsDL
, " linked"
]
resultsDL <- sequence waitsDL
unless (and resultsDL) $ logError "Periodic delivery DL error"
logDebug $
T.concat
[ "Periodic delivery waiting for ", T.pack $ show $ length waitsFW
, " forwarding"
]
resultsFW <- sequence waitsFW
unless (and resultsFW) $ logError "Periodic delivery FW error"
logDebug $
T.concat
[ "Periodic delivery waiting for "
, T.pack $ show $ length waitsUDL, " unlinked"
]
resultsUDL <- sequence waitsUDL
unless (and resultsUDL) $ logError "Periodic delivery UDL error"
logInfo "Periodic delivery done"
where
adaptUnlinked
(E.Value iid, E.Value h, E.Value uraid, E.Value luRecip, E.Value since, E.Value udlid, E.Value obid, E.Value fwd, E.Value act, E.Value mraid, E.Value mrcid) =
( Left <$> mraid <|> Right <$> mrcid
, ( ( (iid, h)
, ((uraid, luRecip), (udlid, fwd, obid, BL.fromStrict $ persistJSONBytes act))
)
, since
)
)
unlinkedID ((_, (_, (udlid, _, _, _))), _) = udlid
toLinked (Left raid, ((_, (_, (_, fwd, obid, _))), _)) = Just $ Delivery raid obid fwd False
toLinked (Right _ , _ ) = Nothing
relevant dropAfter now since = addUTCTime dropAfter since > now
decideBySinceUDL dropAfter now (udl@(_, (_, (udlid, _, _, _))), msince) =
case msince of
Nothing -> Right udl
Just since ->
if relevant dropAfter now since
then Right udl
else Left udlid
adaptLinked
(E.Value iid, E.Value h, E.Value raid, E.Value ident, E.Value inbox, E.Value since, E.Value dlid, E.Value fwd, E.Value act) =
( ( (iid, h)
, ((raid, (ident, inbox)), (dlid, fwd, BL.fromStrict $ persistJSONBytes act))
)
, since
)
decideBySinceDL dropAfter now (dl@(_, (_, (dlid, _, _))), msince) =
case msince of
Nothing -> Right dl
Just since ->
if relevant dropAfter now since
then Right dl
else Left dlid
adaptForwarding
( Entity iid (Instance h)
, Entity raid (RemoteActor _ _ inbox _ since)
, Entity fwid (Forwarding _ _ body sig _)
, mfwp, mfwg, mfwr, mfwd, mfwl
) =
( ( (iid, h)
, ( (raid, inbox)
, ( fwid
, BL.fromStrict body
, case (mfwp, mfwg, mfwr, mfwd, mfwl) of
(Nothing, Nothing, Nothing, Nothing, Nothing) ->
error "Found fwid without a Forwarder* record"
(Just fwp, Nothing, Nothing, Nothing, Nothing) ->
FwderPerson fwp
(Nothing, Just fwg, Nothing, Nothing, Nothing) ->
FwderGroup fwg
(Nothing, Nothing, Just fwr, Nothing, Nothing) ->
FwderRepo fwr
(Nothing, Nothing, Nothing, Just fwd, Nothing) ->
FwderDeck fwd
(Nothing, Nothing, Nothing, Nothing, Just fwl) ->
FwderLoom fwl
_ -> error "Found fwid with multiple forwarders"
, sig
)
)
)
, since
)
decideBySinceFW dropAfter now (fw@(_, (_, (fwid, _, fwder, _))), msince) =
case msince of
Nothing -> Right fw
Just since ->
if relevant dropAfter now since
then Right fw
else Left (fwid, bmap entityKey fwder)
groupUnlinked
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
. groupWithExtractBy ((==) `on` fst) fst snd
groupLinked
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
. groupWithExtractBy ((==) `on` fst) fst snd
groupForwarding
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
. groupWithExtractBy ((==) `on` fst) fst snd
fork action = do
wait <- asyncWorker action
return $ do
result <- wait
case result of
Left e -> do
logError $ "Periodic delivery error! " <> T.pack (displayException e)
return False
Right success -> return success
deliverLinked deliver now ((_, h), recips) = do
logDebug $ "Periodic deliver starting linked for host " <> renderAuthority h
waitsR <- for recips $ \ ((raid, (ident, inbox)), delivs) -> fork $ do
logDebug $
"Periodic deliver starting linked for actor " <>
renderObjURI (ObjURI h ident)
waitsD <- for delivs $ \ (dlid, fwd, doc) -> fork $ do
let fwd' = if fwd then Just ident else Nothing
e <- deliver doc fwd' h inbox
case e of
Left err -> do
logError $ T.concat
[ "Periodic DL delivery #", T.pack $ show dlid
, " error for <", renderObjURI $ ObjURI h ident, ">: "
, T.pack $ displayException err
]
return False
Right _resp -> do
runSiteDB $ delete dlid
return True
results <- sequence waitsD
runSiteDB $
if and results
then update raid [RemoteActorErrorSince =. Nothing]
else if or results
then update raid [RemoteActorErrorSince =. Just now]
else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
return True
results <- sequence waitsR
unless (and results) $
logError $ "Periodic DL delivery error for host " <> renderAuthority h
return True
deliverUnlinked deliver now ((iid, h), recips) = do
logDebug $ "Periodic deliver starting unlinked for host " <> renderAuthority h
waitsR <- for recips $ \ ((uraid, luRecip), delivs) -> fork $ do
logDebug $
"Periodic deliver starting unlinked for actor " <>
renderObjURI (ObjURI h luRecip)
e <- fetchRemoteActor iid h luRecip
case e of
Right (Right mera) ->
case mera of
Nothing -> runSiteDB $ deleteWhere [UnlinkedDeliveryId <-. map fst4 (NE.toList delivs)]
Just (Entity raid ra) -> do
waitsD <- for delivs $ \ (udlid, fwd, obid, doc) -> fork $ do
let fwd' = if fwd then Just luRecip else Nothing
e' <- deliver doc fwd' h $ remoteActorInbox ra
case e' of
Left _err -> do
runSiteDB $ do
delete udlid
insert_ $ Delivery raid obid fwd False
return False
Right _resp -> do
runSiteDB $ delete udlid
return True
results <- sequence waitsD
runSiteDB $
if and results
then update raid [RemoteActorErrorSince =. Nothing]
else if or results
then update raid [RemoteActorErrorSince =. Just now]
else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
_ -> runSiteDB $ updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
return True
results <- sequence waitsR
unless (and results) $
logError $ "Periodic UDL delivery error for host " <> renderAuthority h
return True
deliverForwarding now ((_, h), recips) = do
logDebug $ "Periodic deliver starting forwarding for host " <> renderAuthority h
waitsR <- for recips $ \ ((raid, inbox), delivs) -> fork $ do
logDebug $
"Periodic deliver starting forwarding for inbox " <>
renderObjURI (ObjURI h inbox)
waitsD <- for delivs $ \ (fwid, body, fwderE, sig) -> fork $ do
let (fwderK, senderK) = splitForwarder fwderE
sender <- renderLocalActor <$> hashLocalActor senderK
e <- forwardActivity (ObjURI h inbox) sig sender body
case e of
Left _err -> return False
Right _resp -> do
runSiteDB $ do
case fwderK of
FwderPerson k -> delete k
FwderGroup k -> delete k
FwderRepo k -> delete k
FwderDeck k -> delete k
FwderLoom k -> delete k
delete fwid
return True
results <- sequence waitsD
runSiteDB $
if and results
then update raid [RemoteActorErrorSince =. Nothing]
else if or results
then update raid [RemoteActorErrorSince =. Just now]
else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
return True
results <- sequence waitsR
unless (and results) $
logError $ "Periodic FW delivery error for host " <> renderAuthority h
return True
where
splitForwarder (FwderPerson (Entity f (ForwarderPerson _ p))) =
(FwderPerson f, LocalActorPerson p)
splitForwarder (FwderGroup (Entity f (ForwarderGroup _ g))) =
(FwderGroup f, LocalActorGroup g)
splitForwarder (FwderRepo (Entity f (ForwarderRepo _ r))) =
(FwderRepo f, LocalActorRepo r)
splitForwarder (FwderDeck (Entity f (ForwarderDeck _ d))) =
(FwderDeck f, LocalActorDeck d)
splitForwarder (FwderLoom (Entity f (ForwarderLoom _ l))) =
(FwderLoom f, LocalActorLoom l)

View file

@ -86,7 +86,6 @@ import Database.Persist.Local
import Yesod.Persist.Local import Yesod.Persist.Local
import Vervis.ActivityPub import Vervis.ActivityPub
import Vervis.Actor
import Vervis.API import Vervis.API
import Vervis.Cloth import Vervis.Cloth
import Vervis.Data.Actor import Vervis.Data.Actor
@ -100,6 +99,7 @@ import Vervis.Paginate
import Vervis.Persist.Actor import Vervis.Persist.Actor
import Vervis.Recipient import Vervis.Recipient
import Vervis.Ticket import Vervis.Ticket
import Vervis.Web.Actor
getClothR :: KeyHashid Loom -> KeyHashid TicketLoom -> Handler TypedContent getClothR :: KeyHashid Loom -> KeyHashid TicketLoom -> Handler TypedContent
getClothR loomHash clothHash = do getClothR loomHash clothHash = do

View file

@ -56,6 +56,7 @@ where
import Control.Monad import Control.Monad
import Control.Monad.Trans.Except import Control.Monad.Trans.Except
import Data.Aeson import Data.Aeson
import Data.ByteString (ByteString)
import Data.Foldable import Data.Foldable
import Data.Maybe (fromMaybe) import Data.Maybe (fromMaybe)
import Data.Text (Text) import Data.Text (Text)
@ -90,14 +91,16 @@ import Data.Paginate.Local
import Database.Persist.Local import Database.Persist.Local
import Yesod.Persist.Local import Yesod.Persist.Local
import Vervis.Actor
import Vervis.API import Vervis.API
import Vervis.Federation import Vervis.Federation.Auth
import Vervis.FedURI
import Vervis.Form.Project import Vervis.Form.Project
import Vervis.Foundation import Vervis.Foundation
import Vervis.Model import Vervis.Model
import Vervis.Paginate import Vervis.Paginate
import Vervis.Recipient
import Vervis.Settings import Vervis.Settings
import Vervis.Web.Actor
import Vervis.Widget.Person import Vervis.Widget.Person
import qualified Vervis.Client as C import qualified Vervis.Client as C
@ -147,8 +150,41 @@ getDeckR deckHash = do
getDeckInboxR :: KeyHashid Deck -> Handler TypedContent getDeckInboxR :: KeyHashid Deck -> Handler TypedContent
getDeckInboxR = getInbox DeckInboxR deckActor getDeckInboxR = getInbox DeckInboxR deckActor
postDeckInboxR :: KeyHashid Deck -> Handler TypedContent postDeckInboxR :: KeyHashid Deck -> Handler ()
postDeckInboxR _ = error "Temporarily disabled" postDeckInboxR recipDeckHash =
postInbox $ handleRobotInbox (LocalActorDeck recipDeckHash) handle
where
handle
:: RemoteAuthor
-> Maybe (RecipientRoutes, ByteString)
-> LocalURI
-> SpecificActivity URIMode
-> ExceptT Text Handler (Text, Maybe (ExceptT Text Worker Text))
handle _author _mfwd _luActivity specific =
case specific of
{-
CreateActivity (Create obj mtarget) ->
case obj of
CreateNote _ note ->
(,Nothing) <$> projectCreateNoteF now shrRecip prjRecip remoteAuthor body mfwd luActivity note
CreateTicket _ ticket ->
(,Nothing) <$> projectCreateTicketF now shrRecip prjRecip remoteAuthor body mfwd luActivity ticket mtarget
_ -> error "Unsupported create object type for projects"
FollowActivity follow ->
(,Nothing) <$> projectFollowF shrRecip prjRecip now remoteAuthor body mfwd luActivity follow
OfferActivity (Offer obj target) ->
case obj of
OfferTicket ticket ->
(,Nothing) <$> projectOfferTicketF now shrRecip prjRecip remoteAuthor body mfwd luActivity ticket target
OfferDep dep ->
projectOfferDepF now shrRecip prjRecip remoteAuthor body mfwd luActivity dep target
_ -> return ("Unsupported offer object type for projects", Nothing)
ResolveActivity resolve ->
(,Nothing) <$> projectResolveF now shrRecip prjRecip remoteAuthor body mfwd luActivity resolve
UndoActivity undo ->
(,Nothing) <$> projectUndoF shrRecip prjRecip now remoteAuthor body mfwd luActivity undo
-}
_ -> return ("Unsupported activity type for decks", Nothing)
getDeckOutboxR :: KeyHashid Deck -> Handler TypedContent getDeckOutboxR :: KeyHashid Deck -> Handler TypedContent
getDeckOutboxR = getOutbox DeckOutboxR deckActor getDeckOutboxR = getOutbox DeckOutboxR deckActor

View file

@ -1,6 +1,6 @@
{- This file is part of Vervis. {- This file is part of Vervis.
- -
- Written in 2016, 2019, 2020 by fr33domlover <fr33domlover@riseup.net>. - Written in 2016, 2019, 2020, 2022 by fr33domlover <fr33domlover@riseup.net>.
- -
- Copying is an act of love. Please copy, reuse and share. - Copying is an act of love. Please copy, reuse and share.
- -
@ -57,7 +57,6 @@ import Yesod.Persist.Local
import Vervis.API import Vervis.API
import Vervis.Discussion import Vervis.Discussion
import Vervis.Federation
import Vervis.FedURI import Vervis.FedURI
import Vervis.Form.Discussion import Vervis.Form.Discussion
import Vervis.Foundation import Vervis.Foundation

View file

@ -65,12 +65,12 @@ import Yesod.Hashids
import qualified Web.ActivityPub as AP import qualified Web.ActivityPub as AP
import Vervis.Actor
import Vervis.Foundation import Vervis.Foundation
import Vervis.Model import Vervis.Model
import Vervis.Model.Group import Vervis.Model.Group
import Vervis.Settings (widgetFile) import Vervis.Settings (widgetFile)
import Vervis.Time (showDate) import Vervis.Time (showDate)
import Vervis.Web.Actor
getGroupR :: KeyHashid Group -> Handler TypedContent getGroupR :: KeyHashid Group -> Handler TypedContent
getGroupR groupHash = do getGroupR groupHash = do

View file

@ -96,7 +96,6 @@ import Vervis.ActivityPub
import Vervis.ActorKey import Vervis.ActorKey
import Vervis.API import Vervis.API
import Vervis.FedURI import Vervis.FedURI
import Vervis.Federation
import Vervis.Federation.Auth import Vervis.Federation.Auth
import Vervis.Foundation import Vervis.Foundation
import Vervis.Model hiding (Ticket) import Vervis.Model hiding (Ticket)

View file

@ -27,6 +27,7 @@ where
import Control.Monad import Control.Monad
import Control.Monad.Trans.Except import Control.Monad.Trans.Except
import Data.Aeson import Data.Aeson
import Data.ByteString (ByteString)
import Data.Foldable import Data.Foldable
import Data.Maybe (fromMaybe) import Data.Maybe (fromMaybe)
import Data.Text (Text) import Data.Text (Text)
@ -60,13 +61,15 @@ import Data.Paginate.Local
import Database.Persist.Local import Database.Persist.Local
import Yesod.Persist.Local import Yesod.Persist.Local
import Vervis.Actor
import Vervis.API import Vervis.API
import Vervis.Federation import Vervis.Federation.Auth
import Vervis.FedURI
import Vervis.Foundation import Vervis.Foundation
import Vervis.Model import Vervis.Model
import Vervis.Paginate import Vervis.Paginate
import Vervis.Recipient
import Vervis.Settings import Vervis.Settings
import Vervis.Web.Actor
getLoomR :: KeyHashid Loom -> Handler TypedContent getLoomR :: KeyHashid Loom -> Handler TypedContent
getLoomR loomHash = do getLoomR loomHash = do
@ -105,8 +108,19 @@ getLoomR loomHash = do
getLoomInboxR :: KeyHashid Loom -> Handler TypedContent getLoomInboxR :: KeyHashid Loom -> Handler TypedContent
getLoomInboxR = getInbox LoomInboxR loomActor getLoomInboxR = getInbox LoomInboxR loomActor
postLoomInboxR :: KeyHashid Loom -> Handler TypedContent postLoomInboxR :: KeyHashid Loom -> Handler ()
postLoomInboxR _ = error "Temporarily disabled" postLoomInboxR recipLoomHash =
postInbox $ handleRobotInbox (LocalActorLoom recipLoomHash) handle
where
handle
:: RemoteAuthor
-> Maybe (RecipientRoutes, ByteString)
-> LocalURI
-> AP.SpecificActivity URIMode
-> ExceptT Text Handler (Text, Maybe (ExceptT Text Worker Text))
handle _author _mfwd _luActivity specific =
case specific of
_ -> return ("Unsupported activity type for looms", Nothing)
getLoomOutboxR :: KeyHashid Loom -> Handler TypedContent getLoomOutboxR :: KeyHashid Loom -> Handler TypedContent
getLoomOutboxR = getOutbox LoomOutboxR loomActor getLoomOutboxR = getOutbox LoomOutboxR loomActor

View file

@ -65,7 +65,6 @@ import Data.Either.Local
import Database.Persist.Local import Database.Persist.Local
import Vervis.ActivityPub import Vervis.ActivityPub
import Vervis.Actor
import Vervis.ActorKey import Vervis.ActorKey
import Vervis.API import Vervis.API
import Vervis.Data.Actor import Vervis.Data.Actor
@ -78,6 +77,7 @@ import Vervis.Recipient
import Vervis.Secure import Vervis.Secure
import Vervis.Settings import Vervis.Settings
import Vervis.Ticket import Vervis.Ticket
import Vervis.Web.Actor
import Vervis.Widget import Vervis.Widget
import Vervis.Widget.Person import Vervis.Widget.Person

View file

@ -72,6 +72,7 @@ import Control.Monad.Logger (logWarn)
import Control.Monad.Trans.Except import Control.Monad.Trans.Except
import Data.Bifunctor import Data.Bifunctor
import Data.Binary.Put import Data.Binary.Put
import Data.ByteString (ByteString)
import Data.Foldable import Data.Foldable
import Data.Git.Graph import Data.Git.Graph
import Data.Git.Harder import Data.Git.Harder
@ -147,17 +148,20 @@ import Yesod.Persist.Local
import qualified Data.Git.Local as G (createRepo) import qualified Data.Git.Local as G (createRepo)
import qualified Darcs.Local.Repository as D (createRepo) import qualified Darcs.Local.Repository as D (createRepo)
import Vervis.Actor
import Vervis.API import Vervis.API
import Vervis.Federation.Auth
import Vervis.FedURI
import Vervis.Foundation import Vervis.Foundation
import Vervis.Path import Vervis.Path
import Vervis.Model import Vervis.Model
import Vervis.Model.Ident import Vervis.Model.Ident
import Vervis.Paginate import Vervis.Paginate
import Vervis.Readme import Vervis.Readme
import Vervis.Recipient
import Vervis.Settings import Vervis.Settings
import Vervis.SourceTree import Vervis.SourceTree
import Vervis.Style import Vervis.Style
import Vervis.Web.Actor
import qualified Vervis.Formatting as F import qualified Vervis.Formatting as F
import qualified Vervis.Hook as H import qualified Vervis.Hook as H
@ -206,8 +210,48 @@ getRepoR repoHash = do
getRepoInboxR :: KeyHashid Repo -> Handler TypedContent getRepoInboxR :: KeyHashid Repo -> Handler TypedContent
getRepoInboxR = getInbox RepoInboxR repoActor getRepoInboxR = getInbox RepoInboxR repoActor
postRepoInboxR :: KeyHashid Repo -> Handler TypedContent postRepoInboxR :: KeyHashid Repo -> Handler ()
postRepoInboxR _ = error "Temporarily disabled" postRepoInboxR recipRepoHash =
postInbox $ handleRobotInbox (LocalActorRepo recipRepoHash) handle
where
handle
:: RemoteAuthor
-> Maybe (RecipientRoutes, ByteString)
-> LocalURI
-> AP.SpecificActivity URIMode
-> ExceptT Text Handler (Text, Maybe (ExceptT Text Worker Text))
handle _author _mfwd _luActivity specific =
case specific of
{-
ApplyActivity (AP.Apply uObject uTarget) ->
repoApplyF now shrRecip rpRecip remoteAuthor body mfwd luActivity uObject uTarget
AddActivity (AP.Add obj target) ->
case obj of
Right (AddBundle patches) ->
repoAddBundleF now shrRecip rpRecip remoteAuthor body mfwd luActivity patches target
_ -> return ("Unsupported add object type for repos", Nothing)
CreateActivity (Create obj mtarget) ->
case obj of
CreateNote _ note ->
(,Nothing) <$> repoCreateNoteF now shrRecip rpRecip remoteAuthor body mfwd luActivity note
CreateTicket _ ticket ->
(,Nothing) <$> repoCreateTicketF now shrRecip rpRecip remoteAuthor body mfwd luActivity ticket mtarget
_ -> error "Unsupported create object type for repos"
FollowActivity follow ->
(,Nothing) <$> repoFollowF shrRecip rpRecip now remoteAuthor body mfwd luActivity follow
OfferActivity (Offer obj target) ->
case obj of
OfferTicket ticket ->
(,Nothing) <$> repoOfferTicketF now shrRecip rpRecip remoteAuthor body mfwd luActivity ticket target
OfferDep dep ->
repoOfferDepF now shrRecip rpRecip remoteAuthor body mfwd luActivity dep target
_ -> return ("Unsupported offer object type for repos", Nothing)
ResolveActivity resolve ->
(,Nothing) <$> repoResolveF now shrRecip rpRecip remoteAuthor body mfwd luActivity resolve
UndoActivity undo->
(,Nothing) <$> repoUndoF shrRecip rpRecip now remoteAuthor body mfwd luActivity undo
-}
_ -> return ("Unsupported activity type for repos", Nothing)
getRepoOutboxR :: KeyHashid Repo -> Handler TypedContent getRepoOutboxR :: KeyHashid Repo -> Handler TypedContent
getRepoOutboxR = getOutbox RepoOutboxR repoActor getRepoOutboxR = getOutbox RepoOutboxR repoActor

View file

@ -131,11 +131,9 @@ import Database.Persist.Local
import Yesod.Persist.Local import Yesod.Persist.Local
import Vervis.ActivityPub import Vervis.ActivityPub
import Vervis.Actor
import Vervis.API import Vervis.API
import Vervis.Data.Actor import Vervis.Data.Actor
import Vervis.Discussion import Vervis.Discussion
import Vervis.Federation
import Vervis.FedURI import Vervis.FedURI
import Vervis.Foundation import Vervis.Foundation
import Vervis.Handler.Discussion import Vervis.Handler.Discussion
@ -152,6 +150,7 @@ import Vervis.Style
import Vervis.Ticket import Vervis.Ticket
import Vervis.TicketFilter (filterTickets) import Vervis.TicketFilter (filterTickets)
import Vervis.Time (showDate) import Vervis.Time (showDate)
import Vervis.Web.Actor
getTicketR :: KeyHashid Deck -> KeyHashid TicketDeck -> Handler TypedContent getTicketR :: KeyHashid Deck -> KeyHashid TicketDeck -> Handler TypedContent
getTicketR deckHash ticketHash = do getTicketR deckHash ticketHash = do

View file

@ -13,7 +13,7 @@
- <http://creativecommons.org/publicdomain/zero/1.0/>. - <http://creativecommons.org/publicdomain/zero/1.0/>.
-} -}
module Vervis.Actor module Vervis.Web.Actor
( getInbox ( getInbox
, postInbox , postInbox
, getOutbox , getOutbox
@ -21,6 +21,7 @@ module Vervis.Actor
, getFollowersCollection , getFollowersCollection
, getActorFollowersCollection , getActorFollowersCollection
, getFollowingCollection , getFollowingCollection
, handleRobotInbox
) )
where where
@ -37,6 +38,7 @@ import Data.Aeson
import Data.Aeson.Encode.Pretty import Data.Aeson.Encode.Pretty
import Data.Bifunctor import Data.Bifunctor
import Data.Bitraversable import Data.Bitraversable
import Data.ByteString (ByteString)
import Data.Foldable (for_) import Data.Foldable (for_)
import Data.List import Data.List
import Data.Maybe import Data.Maybe
@ -77,6 +79,7 @@ import Yesod.Hashids
import Yesod.MonadSite import Yesod.MonadSite
import Yesod.RenderSource import Yesod.RenderSource
import Control.Monad.Trans.Except.Local
import Data.Aeson.Local import Data.Aeson.Local
import Data.Either.Local import Data.Either.Local
import Data.EventTime.Local import Data.EventTime.Local
@ -89,7 +92,6 @@ import Vervis.ActivityPub
import Vervis.ActorKey import Vervis.ActorKey
import Vervis.API import Vervis.API
import Vervis.FedURI import Vervis.FedURI
import Vervis.Federation
import Vervis.Federation.Auth import Vervis.Federation.Auth
import Vervis.Foundation import Vervis.Foundation
import Vervis.Model hiding (Ticket) import Vervis.Model hiding (Ticket)
@ -453,3 +455,29 @@ getFollowingCollection here actor hash = do
getRemotes aid = getRemotes aid =
map (followRemoteTarget . entityVal) <$> map (followRemoteTarget . entityVal) <$>
selectList [FollowRemoteActor ==. aid] [] selectList [FollowRemoteActor ==. aid] []
handleRobotInbox
:: LocalActorBy KeyHashid
-> ( RemoteAuthor
-> Maybe (RecipientRoutes, ByteString)
-> LocalURI
-> SpecificActivity URIMode
-> ExceptT Text Handler (Text, Maybe (ExceptT Text Worker Text))
)
-> UTCTime
-> ActivityAuthentication
-> ActivityBody
-> ExceptT Text Handler (Text, Maybe (ExceptT Text Worker Text))
handleRobotInbox recipByHash handleSpecific now auth body = do
remoteAuthor <-
case auth of
ActivityAuthLocal _ -> throwE "Got a forwarded local activity, I don't need those"
ActivityAuthRemote ra -> return ra
luActivity <-
fromMaybeE (activityId $ actbActivity body) "Activity without 'id'"
localRecips <- do
mrecips <- parseAudience $ activityAudience $ actbActivity body
paudLocalRecips <$> fromMaybeE mrecips "Activity with no recipients"
msig <- checkForwarding recipByHash
let mfwd = (localRecips,) <$> msig
handleSpecific remoteAuthor mfwd luActivity (activitySpecific $ actbActivity body)

View file

@ -124,7 +124,6 @@ library
Vervis.Access Vervis.Access
Vervis.ActivityPub Vervis.ActivityPub
Vervis.Actor
Vervis.ActorKey Vervis.ActorKey
Vervis.API Vervis.API
Vervis.Avatar Vervis.Avatar
@ -143,7 +142,7 @@ library
Vervis.Delivery Vervis.Delivery
Vervis.Discussion Vervis.Discussion
Vervis.Federation --Vervis.Federation
Vervis.Federation.Auth Vervis.Federation.Auth
--Vervis.Federation.Discussion --Vervis.Federation.Discussion
--Vervis.Federation.Offer --Vervis.Federation.Offer
@ -224,6 +223,9 @@ library
Vervis.Ticket Vervis.Ticket
Vervis.TicketFilter Vervis.TicketFilter
Vervis.Time Vervis.Time
Vervis.Web.Actor
Vervis.Widget Vervis.Widget
Vervis.Widget.Discussion Vervis.Widget.Discussion
Vervis.Widget.Person Vervis.Widget.Person