From 9dddc7d8465989aef2bd3f24ceb69a5b90bf239d Mon Sep 17 00:00:00 2001 From: fr33domlover Date: Tue, 6 Sep 2022 10:52:14 +0000 Subject: [PATCH] S2S: Prepare inbox handlers for Repo, Deck and Loom --- src/Vervis/API.hs | 2 +- src/Vervis/Access.hs | 23 +- src/Vervis/Application.hs | 2 +- src/Vervis/Delivery.hs | 459 ++++++++++++++++++++++++++ src/Vervis/Federation.hs | 550 ------------------------------- src/Vervis/Handler/Cloth.hs | 2 +- src/Vervis/Handler/Deck.hs | 44 ++- src/Vervis/Handler/Discussion.hs | 3 +- src/Vervis/Handler/Group.hs | 2 +- src/Vervis/Handler/Inbox.hs | 1 - src/Vervis/Handler/Loom.hs | 22 +- src/Vervis/Handler/Person.hs | 2 +- src/Vervis/Handler/Repo.hs | 50 ++- src/Vervis/Handler/Ticket.hs | 3 +- src/Vervis/{ => Web}/Actor.hs | 32 +- vervis.cabal | 6 +- 16 files changed, 618 insertions(+), 585 deletions(-) rename src/Vervis/{ => Web}/Actor.hs (94%) diff --git a/src/Vervis/API.hs b/src/Vervis/API.hs index c2b54ce..a35fddc 100644 --- a/src/Vervis/API.hs +++ b/src/Vervis/API.hs @@ -1868,7 +1868,7 @@ inviteC (Entity senderPersonID senderPerson) senderActor muCap summary audience case capID of Left (actor, _, item) -> return (actor, item) Right _ -> throwE "Capability is a remote URI, i.e. not authored by the local topic" - verifyCapability capability senderPersonID $ bmap entityKey r + verifyCapability capability (Left senderPersonID) (bmap entityKey r) Right _ -> pure () -- Insert new Collab to DB diff --git a/src/Vervis/Access.hs b/src/Vervis/Access.hs index f7da060..7df7099 100644 --- a/src/Vervis/Access.hs +++ b/src/Vervis/Access.hs @@ -84,6 +84,7 @@ import Control.Monad.Trans.Except import Control.Monad.Trans.Maybe import Control.Monad.Trans.Reader import Data.Barbie +import Data.Bifunctor import Data.Foldable import Data.Maybe import Data.Text (Text) @@ -97,6 +98,7 @@ import Yesod.Hashids import Yesod.MonadSite import Control.Monad.Trans.Except.Local +import Data.Either.Local import Database.Persist.Local import Vervis.ActivityPub @@ -304,10 +306,10 @@ grantResourceLocalActor (GrantResourceLoom l) = LocalActorLoom l verifyCapability :: (LocalActorBy Key, OutboxItemId) - -> PersonId + -> Either PersonId RemoteActorId -> GrantResourceBy Key -> ExceptT Text (ReaderT SqlBackend Handler) () -verifyCapability (capActor, capItem) personID resource = do +verifyCapability (capActor, capItem) actor resource = do -- Find the activity itself by URI in the DB nameExceptT "Capability activity not found" $ @@ -320,16 +322,17 @@ verifyCapability (capActor, capItem) personID resource = do fromMaybeE maybeEnable "No CollabEnable for this activity" -- Find the recipient of that Collab - recipID <- do - mcrl <- lift $ getValBy $ UniqueCollabRecipLocal collabID - crl <- fromMaybeE mcrl "No local recip for capability" - mcrr <- lift $ getBy $ UniqueCollabRecipRemote collabID - for_ mcrr $ \ _ -> error "Both local & remote recip for capability!" - return $ collabRecipLocalPerson crl + recipID <- + lift $ bimap collabRecipLocalPerson collabRecipRemoteActor <$> + requireEitherAlt + (getValBy $ UniqueCollabRecipLocal collabID) + (getValBy $ UniqueCollabRecipRemote collabID) + "No collab recip" + "Both local and remote recips for collab" -- Verify the recipient is the expected one - unless (recipID == personID) $ - throwE "Collab recipient is some other Person" + unless (recipID == actor) $ + throwE "Collab recipient is someone else" -- Find the local topic, on which this Collab gives access topic <- lift $ do diff --git a/src/Vervis/Application.hs b/src/Vervis/Application.hs index 3ba6f08..1372d57 100644 --- a/src/Vervis/Application.hs +++ b/src/Vervis/Application.hs @@ -94,7 +94,7 @@ import Web.Hashids.Local import Vervis.ActorKey (generateActorKey, actorKeyRotator) import Vervis.Darcs -import Vervis.Federation +import Vervis.Delivery import Vervis.Foundation import Vervis.Git import Vervis.Hook diff --git a/src/Vervis/Delivery.hs b/src/Vervis/Delivery.hs index 8e512ed..0e05ce0 100644 --- a/src/Vervis/Delivery.hs +++ b/src/Vervis/Delivery.hs @@ -13,13 +13,19 @@ - . -} +-- These are for Barbie-related generated instances for ForwarderBy +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} + module Vervis.Delivery ( deliverHttp , deliverHttpBL , deliverRemoteDB_D + , deliverRemoteDB_L , deliverRemoteDB_P , deliverRemoteDB_R , deliverRemoteHTTP_D + , deliverRemoteHTTP_L , deliverRemoteHTTP_P , deliverRemoteHTTP_R , deliverRemoteDB' @@ -29,6 +35,8 @@ module Vervis.Delivery , deliverLocal' , deliverLocal , insertRemoteActivityToLocalInboxes + , fixRunningDeliveries + , retryOutboxDelivery ) where @@ -42,6 +50,7 @@ import Control.Monad.Trans.Class import Control.Monad.Trans.Except import Control.Monad.Trans.Maybe import Control.Monad.Trans.Reader +import Data.Barbie import Data.Bifunctor import Data.Bitraversable import Data.ByteString (ByteString) @@ -57,6 +66,7 @@ import Data.Time.Clock import Data.Traversable import Database.Persist import Database.Persist.Sql +import GHC.Generics import Network.HTTP.Client import Network.TLS -- hiding (SHA256) import Text.Blaze.Html (preEscapedToHtml) @@ -90,6 +100,7 @@ import qualified Web.ActivityPub as AP import Control.Monad.Trans.Except.Local import Data.Either.Local import Data.List.NonEmpty.Local +import Data.Maybe.Local import Data.Patch.Local hiding (Patch) import Data.Tuple.Local import Database.Persist.Local @@ -162,6 +173,17 @@ deliverRemoteDB_D [((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderDeckId))] deliverRemoteDB_D = deliverRemoteDB_ ForwarderDeck +deliverRemoteDB_L + :: MonadIO m + => BL.ByteString + -> RemoteActivityId + -> LoomId + -> ByteString + -> [((InstanceId, Host), NonEmpty RemoteRecipient)] + -> ReaderT SqlBackend m + [((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderLoomId))] +deliverRemoteDB_L = deliverRemoteDB_ ForwarderLoom + deliverRemoteDB_P :: MonadIO m => BL.ByteString @@ -244,6 +266,17 @@ deliverRemoteHTTP_D deliverRemoteHTTP_D now dkhid = deliverRemoteHTTP' now $ LocalActorDeck dkhid +deliverRemoteHTTP_L + :: (MonadSite m, SiteEnv m ~ App) + => UTCTime + -> KeyHashid Loom + -> BL.ByteString + -> ByteString + -> [((InstanceId, Host), NonEmpty (RemoteActorId, LocalURI, LocalURI, ForwardingId, ForwarderLoomId))] + -> m () +deliverRemoteHTTP_L now lkhid = + deliverRemoteHTTP' now $ LocalActorLoom lkhid + deliverRemoteHTTP_P :: (MonadSite m, SiteEnv m ~ App) => UTCTime @@ -806,3 +839,429 @@ insertRemoteActivityToLocalInboxes requireOwner ractid = insertActivityToLocalInboxes makeItem requireOwner Nothing Nothing where makeItem ibid ibiid = InboxItemRemote ibid ractid ibiid + +fixRunningDeliveries :: (MonadIO m, MonadLogger m, IsSqlBackend backend) => ReaderT backend m () +fixRunningDeliveries = do + c <- updateWhereCount [UnlinkedDeliveryRunning ==. True] [UnlinkedDeliveryRunning =. False] + unless (c == 0) $ logWarn $ T.concat + [ "fixRunningDeliveries fixed " + , T.pack (show c) + , " linked deliveries" + ] + c' <- updateWhereCount [DeliveryRunning ==. True] [DeliveryRunning =. False] + unless (c' == 0) $ logWarn $ T.concat + [ "fixRunningDeliveries fixed " + , T.pack (show c') + , " unlinked deliveries" + ] + c'' <- updateWhereCount [ForwardingRunning ==. True] [ForwardingRunning =. False] + unless (c'' == 0) $ logWarn $ T.concat + [ "fixRunningDeliveries fixed " + , T.pack (show c'') + , " forwarding deliveries" + ] + +data ForwarderBy f + = FwderPerson (f ForwarderPerson) + | FwderGroup (f ForwarderGroup) + | FwderRepo (f ForwarderRepo) + | FwderDeck (f ForwarderDeck) + | FwderLoom (f ForwarderLoom) + deriving (Generic, FunctorB, ConstraintsB) + +partitionFwders + :: [ForwarderBy f] + -> ( [f ForwarderPerson] + , [f ForwarderGroup] + , [f ForwarderRepo] + , [f ForwarderDeck] + , [f ForwarderLoom] + ) +partitionFwders = foldl' f ([], [], [], [], []) + where + f (ps, gs, rs, ds, ls) = \ fwder -> + case fwder of + FwderPerson p -> (p : ps, gs, rs, ds, ls) + FwderGroup g -> (ps, g : gs, rs, ds, ls) + FwderRepo r -> (ps, gs, r : rs, ds, ls) + FwderDeck d -> (ps, gs, rs, d : ds, ls) + FwderLoom l -> (ps, gs, rs, ds, l : ls) + +retryOutboxDelivery :: Worker () +retryOutboxDelivery = do + logInfo "Periodic delivery starting" + now <- liftIO $ getCurrentTime + (unlinkedHttp, linkedHttp, forwardingHttp) <- runSiteDB $ do + + -- Get all unlinked deliveries which aren't running already in outbox + -- post handlers + unlinked' <- E.select $ E.from $ \ (udl `E.InnerJoin` ob `E.InnerJoin` ura `E.InnerJoin` ro `E.InnerJoin` i `E.LeftOuterJoin` ra `E.LeftOuterJoin` rc) -> do + E.on $ E.just (ro E.^. RemoteObjectId) E.==. rc E.?. RemoteCollectionIdent + E.on $ E.just (ro E.^. RemoteObjectId) E.==. ra E.?. RemoteActorIdent + E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId + E.on $ ura E.^. UnfetchedRemoteActorIdent E.==. ro E.^. RemoteObjectId + E.on $ udl E.^. UnlinkedDeliveryRecipient E.==. ura E.^. UnfetchedRemoteActorId + E.on $ udl E.^. UnlinkedDeliveryActivity E.==. ob E.^. OutboxItemId + E.where_ $ udl E.^. UnlinkedDeliveryRunning E.==. E.val False + E.orderBy [E.asc $ ro E.^. RemoteObjectInstance, E.asc $ ura E.^. UnfetchedRemoteActorId] + return + ( i E.^. InstanceId + , i E.^. InstanceHost + , ura E.^. UnfetchedRemoteActorId + , ro E.^. RemoteObjectIdent + , ura E.^. UnfetchedRemoteActorSince + , udl E.^. UnlinkedDeliveryId + , udl E.^. UnlinkedDeliveryActivity + , udl E.^. UnlinkedDeliveryForwarding + , ob E.^. OutboxItemActivity + , ra E.?. RemoteActorId + , rc E.?. RemoteCollectionId + ) + + -- Strip the E.Value wrappers and organize the records for the + -- filtering and grouping we'll need to do + let unlinked = map adaptUnlinked unlinked' + + -- Split into found (recipient has been reached) and lonely (recipient + -- hasn't been reached + (found, lonely) = partitionMaybes unlinked + + -- Turn the found ones into linked deliveries + deleteWhere [UnlinkedDeliveryId <-. map (unlinkedID . snd) found] + insertMany_ $ mapMaybe toLinked found + + -- We're left with the lonely ones. We'll check which actors have been + -- unreachable for too long, and we'll delete deliveries for them. The + -- rest of the actors we'll try to reach by HTTP. + dropAfter <- lift $ asksSite $ appDropDeliveryAfter . appSettings + let (lonelyOld, lonelyNew) = + partitionEithers $ map (decideBySinceUDL dropAfter now) lonely + deleteWhere [UnlinkedDeliveryId <-. lonelyOld] + + -- Now let's grab the linked deliveries, and similarly delete old ones + -- and return the rest for HTTP delivery. + linked <- E.select $ E.from $ \ (dl `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i `E.InnerJoin` ob) -> do + E.on $ dl E.^. DeliveryActivity E.==. ob E.^. OutboxItemId + E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId + E.on $ ra E.^. RemoteActorIdent E.==. ro E.^. RemoteObjectId + E.on $ dl E.^. DeliveryRecipient E.==. ra E.^. RemoteActorId + E.where_ $ dl E.^. DeliveryRunning E.==. E.val False + E.orderBy [E.asc $ ro E.^. RemoteObjectInstance, E.asc $ ra E.^. RemoteActorId] + return + ( i E.^. InstanceId + , i E.^. InstanceHost + , ra E.^. RemoteActorId + , ro E.^. RemoteObjectIdent + , ra E.^. RemoteActorInbox + , ra E.^. RemoteActorErrorSince + , dl E.^. DeliveryId + , dl E.^. DeliveryForwarding + , ob E.^. OutboxItemActivity + ) + let (linkedOld, linkedNew) = + partitionEithers $ + map (decideBySinceDL dropAfter now . adaptLinked) linked + deleteWhere [DeliveryId <-. linkedOld] + + -- Same for forwarding deliveries, which are always linked + forwarding <- E.select $ E.from $ + \ (fw `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i + `E.LeftOuterJoin` fwp + `E.LeftOuterJoin` fwg + `E.LeftOuterJoin` fwr + `E.LeftOuterJoin` fwd + `E.LeftOuterJoin` fwl + ) -> do + E.on $ E.just (fw E.^. ForwardingId) E.==. fwl E.?. ForwarderLoomTask + E.on $ E.just (fw E.^. ForwardingId) E.==. fwd E.?. ForwarderDeckTask + E.on $ E.just (fw E.^. ForwardingId) E.==. fwr E.?. ForwarderRepoTask + E.on $ E.just (fw E.^. ForwardingId) E.==. fwg E.?. ForwarderGroupTask + E.on $ E.just (fw E.^. ForwardingId) E.==. fwp E.?. ForwarderPersonTask + E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId + E.on $ ra E.^. RemoteActorIdent E.==. ro E.^. RemoteObjectId + E.on $ fw E.^. ForwardingRecipient E.==. ra E.^. RemoteActorId + E.where_ $ fw E.^. ForwardingRunning E.==. E.val False + E.orderBy [E.asc $ ro E.^. RemoteObjectInstance, E.asc $ ra E.^. RemoteActorId] + return (i, ra, fw, fwp, fwg, fwr, fwd, fwl) + let (forwardingOld, forwardingNew) = + partitionEithers $ + map (decideBySinceFW dropAfter now . adaptForwarding) + forwarding + (fwidsOld, fwdersOld) = unzip forwardingOld + (fwpidsOld, fwgidsOld, fwridsOld, fwdidsOld, fwlidsOld) = + partitionFwders fwdersOld + deleteWhere [ForwarderPersonId <-. fwpidsOld] + deleteWhere [ForwarderGroupId <-. fwgidsOld] + deleteWhere [ForwarderRepoId <-. fwridsOld] + deleteWhere [ForwarderDeckId <-. fwdidsOld] + deleteWhere [ForwarderLoomId <-. fwlidsOld] + deleteWhere [ForwardingId <-. fwidsOld] + + return + ( groupUnlinked lonelyNew + , groupLinked linkedNew + , groupForwarding forwardingNew + ) + + let deliver = deliverHttpBL + logInfo "Periodic delivery prepared DB, starting async HTTP POSTs" + + logDebug $ + "Periodic delivery forking linked " <> + T.pack (show $ map (renderAuthority . snd . fst) linkedHttp) + waitsDL <- traverse (fork . deliverLinked deliver now) linkedHttp + + logDebug $ + "Periodic delivery forking forwarding " <> + T.pack (show $ map (renderAuthority . snd . fst) forwardingHttp) + waitsFW <- traverse (fork . deliverForwarding now) forwardingHttp + + logDebug $ + "Periodic delivery forking unlinked " <> + T.pack (show $ map (renderAuthority . snd . fst) unlinkedHttp) + waitsUDL <- traverse (fork . deliverUnlinked deliver now) unlinkedHttp + + logDebug $ + T.concat + [ "Periodic delivery waiting for ", T.pack $ show $ length waitsDL + , " linked" + ] + resultsDL <- sequence waitsDL + unless (and resultsDL) $ logError "Periodic delivery DL error" + + logDebug $ + T.concat + [ "Periodic delivery waiting for ", T.pack $ show $ length waitsFW + , " forwarding" + ] + resultsFW <- sequence waitsFW + unless (and resultsFW) $ logError "Periodic delivery FW error" + + logDebug $ + T.concat + [ "Periodic delivery waiting for " + , T.pack $ show $ length waitsUDL, " unlinked" + ] + resultsUDL <- sequence waitsUDL + unless (and resultsUDL) $ logError "Periodic delivery UDL error" + + logInfo "Periodic delivery done" + where + adaptUnlinked + (E.Value iid, E.Value h, E.Value uraid, E.Value luRecip, E.Value since, E.Value udlid, E.Value obid, E.Value fwd, E.Value act, E.Value mraid, E.Value mrcid) = + ( Left <$> mraid <|> Right <$> mrcid + , ( ( (iid, h) + , ((uraid, luRecip), (udlid, fwd, obid, BL.fromStrict $ persistJSONBytes act)) + ) + , since + ) + ) + + unlinkedID ((_, (_, (udlid, _, _, _))), _) = udlid + + toLinked (Left raid, ((_, (_, (_, fwd, obid, _))), _)) = Just $ Delivery raid obid fwd False + toLinked (Right _ , _ ) = Nothing + + relevant dropAfter now since = addUTCTime dropAfter since > now + + decideBySinceUDL dropAfter now (udl@(_, (_, (udlid, _, _, _))), msince) = + case msince of + Nothing -> Right udl + Just since -> + if relevant dropAfter now since + then Right udl + else Left udlid + + adaptLinked + (E.Value iid, E.Value h, E.Value raid, E.Value ident, E.Value inbox, E.Value since, E.Value dlid, E.Value fwd, E.Value act) = + ( ( (iid, h) + , ((raid, (ident, inbox)), (dlid, fwd, BL.fromStrict $ persistJSONBytes act)) + ) + , since + ) + + decideBySinceDL dropAfter now (dl@(_, (_, (dlid, _, _))), msince) = + case msince of + Nothing -> Right dl + Just since -> + if relevant dropAfter now since + then Right dl + else Left dlid + + adaptForwarding + ( Entity iid (Instance h) + , Entity raid (RemoteActor _ _ inbox _ since) + , Entity fwid (Forwarding _ _ body sig _) + , mfwp, mfwg, mfwr, mfwd, mfwl + ) = + ( ( (iid, h) + , ( (raid, inbox) + , ( fwid + , BL.fromStrict body + , case (mfwp, mfwg, mfwr, mfwd, mfwl) of + (Nothing, Nothing, Nothing, Nothing, Nothing) -> + error "Found fwid without a Forwarder* record" + (Just fwp, Nothing, Nothing, Nothing, Nothing) -> + FwderPerson fwp + (Nothing, Just fwg, Nothing, Nothing, Nothing) -> + FwderGroup fwg + (Nothing, Nothing, Just fwr, Nothing, Nothing) -> + FwderRepo fwr + (Nothing, Nothing, Nothing, Just fwd, Nothing) -> + FwderDeck fwd + (Nothing, Nothing, Nothing, Nothing, Just fwl) -> + FwderLoom fwl + _ -> error "Found fwid with multiple forwarders" + , sig + ) + ) + ) + , since + ) + + decideBySinceFW dropAfter now (fw@(_, (_, (fwid, _, fwder, _))), msince) = + case msince of + Nothing -> Right fw + Just since -> + if relevant dropAfter now since + then Right fw + else Left (fwid, bmap entityKey fwder) + + groupUnlinked + = map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd) + . groupWithExtractBy ((==) `on` fst) fst snd + + groupLinked + = map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd) + . groupWithExtractBy ((==) `on` fst) fst snd + + groupForwarding + = map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd) + . groupWithExtractBy ((==) `on` fst) fst snd + + fork action = do + wait <- asyncWorker action + return $ do + result <- wait + case result of + Left e -> do + logError $ "Periodic delivery error! " <> T.pack (displayException e) + return False + Right success -> return success + + deliverLinked deliver now ((_, h), recips) = do + logDebug $ "Periodic deliver starting linked for host " <> renderAuthority h + waitsR <- for recips $ \ ((raid, (ident, inbox)), delivs) -> fork $ do + logDebug $ + "Periodic deliver starting linked for actor " <> + renderObjURI (ObjURI h ident) + waitsD <- for delivs $ \ (dlid, fwd, doc) -> fork $ do + let fwd' = if fwd then Just ident else Nothing + e <- deliver doc fwd' h inbox + case e of + Left err -> do + logError $ T.concat + [ "Periodic DL delivery #", T.pack $ show dlid + , " error for <", renderObjURI $ ObjURI h ident, ">: " + , T.pack $ displayException err + ] + return False + Right _resp -> do + runSiteDB $ delete dlid + return True + results <- sequence waitsD + runSiteDB $ + if and results + then update raid [RemoteActorErrorSince =. Nothing] + else if or results + then update raid [RemoteActorErrorSince =. Just now] + else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now] + return True + results <- sequence waitsR + unless (and results) $ + logError $ "Periodic DL delivery error for host " <> renderAuthority h + return True + + deliverUnlinked deliver now ((iid, h), recips) = do + logDebug $ "Periodic deliver starting unlinked for host " <> renderAuthority h + waitsR <- for recips $ \ ((uraid, luRecip), delivs) -> fork $ do + logDebug $ + "Periodic deliver starting unlinked for actor " <> + renderObjURI (ObjURI h luRecip) + e <- fetchRemoteActor iid h luRecip + case e of + Right (Right mera) -> + case mera of + Nothing -> runSiteDB $ deleteWhere [UnlinkedDeliveryId <-. map fst4 (NE.toList delivs)] + Just (Entity raid ra) -> do + waitsD <- for delivs $ \ (udlid, fwd, obid, doc) -> fork $ do + let fwd' = if fwd then Just luRecip else Nothing + e' <- deliver doc fwd' h $ remoteActorInbox ra + case e' of + Left _err -> do + runSiteDB $ do + delete udlid + insert_ $ Delivery raid obid fwd False + return False + Right _resp -> do + runSiteDB $ delete udlid + return True + results <- sequence waitsD + runSiteDB $ + if and results + then update raid [RemoteActorErrorSince =. Nothing] + else if or results + then update raid [RemoteActorErrorSince =. Just now] + else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now] + _ -> runSiteDB $ updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now] + return True + results <- sequence waitsR + unless (and results) $ + logError $ "Periodic UDL delivery error for host " <> renderAuthority h + return True + + deliverForwarding now ((_, h), recips) = do + logDebug $ "Periodic deliver starting forwarding for host " <> renderAuthority h + waitsR <- for recips $ \ ((raid, inbox), delivs) -> fork $ do + logDebug $ + "Periodic deliver starting forwarding for inbox " <> + renderObjURI (ObjURI h inbox) + waitsD <- for delivs $ \ (fwid, body, fwderE, sig) -> fork $ do + let (fwderK, senderK) = splitForwarder fwderE + sender <- renderLocalActor <$> hashLocalActor senderK + e <- forwardActivity (ObjURI h inbox) sig sender body + case e of + Left _err -> return False + Right _resp -> do + runSiteDB $ do + case fwderK of + FwderPerson k -> delete k + FwderGroup k -> delete k + FwderRepo k -> delete k + FwderDeck k -> delete k + FwderLoom k -> delete k + delete fwid + return True + results <- sequence waitsD + runSiteDB $ + if and results + then update raid [RemoteActorErrorSince =. Nothing] + else if or results + then update raid [RemoteActorErrorSince =. Just now] + else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now] + return True + results <- sequence waitsR + unless (and results) $ + logError $ "Periodic FW delivery error for host " <> renderAuthority h + return True + where + splitForwarder (FwderPerson (Entity f (ForwarderPerson _ p))) = + (FwderPerson f, LocalActorPerson p) + splitForwarder (FwderGroup (Entity f (ForwarderGroup _ g))) = + (FwderGroup f, LocalActorGroup g) + splitForwarder (FwderRepo (Entity f (ForwarderRepo _ r))) = + (FwderRepo f, LocalActorRepo r) + splitForwarder (FwderDeck (Entity f (ForwarderDeck _ d))) = + (FwderDeck f, LocalActorDeck d) + splitForwarder (FwderLoom (Entity f (ForwarderLoom _ l))) = + (FwderLoom f, LocalActorLoom l) diff --git a/src/Vervis/Federation.hs b/src/Vervis/Federation.hs index 49c36b5..2fb5f88 100644 --- a/src/Vervis/Federation.hs +++ b/src/Vervis/Federation.hs @@ -13,22 +13,8 @@ - . -} --- These are for Barbie-related generated instances for ForwarderBy -{-# LANGUAGE DeriveAnyClass #-} -{-# LANGUAGE DeriveGeneric #-} ---{-# LANGUAGE StandaloneDeriving #-} ---{-# LANGUAGE UndecidableInstances #-} - module Vervis.Federation ( - {- - handlePersonInbox - , handleDeckInbox - , handleLoomInbox - , handleRepoInbox - -} - fixRunningDeliveries - , retryOutboxDelivery ) where @@ -169,540 +155,4 @@ handleProjectInbox shrRecip prjRecip now auth body = do errorLocalForwarded (ActivityAuthLocalRepo rid) = "Project inbox got local forwarded activity by rid#" <> T.pack (show $ fromSqlKey rid) - -handleDeckInbox - :: KeyHashid Project - -> UTCTime - -> ActivityAuthentication - -> ActivityBody - -> ExceptT Text Handler (Text, Maybe (ExceptT Text Worker Text)) -handleDeckInbox dkkhid now auth body = do - remoteAuthor <- - case auth of - ActivityAuthLocal local -> throwE $ errorLocalForwarded local - ActivityAuthRemote ra -> return ra - luActivity <- - fromMaybeE (activityId $ actbActivity body) "Activity without 'id'" - localRecips <- do - mrecips <- parseAudience $ activityAudience $ actbActivity body - paudLocalRecips <$> fromMaybeE mrecips "Activity with no recipients" - msig <- checkForwarding $ LocalActorProject shrRecip prjRecip - let mfwd = (localRecips,) <$> msig - case activitySpecific $ actbActivity body of - CreateActivity (Create obj mtarget) -> - case obj of - CreateNote _ note -> - (,Nothing) <$> projectCreateNoteF now shrRecip prjRecip remoteAuthor body mfwd luActivity note - CreateTicket _ ticket -> - (,Nothing) <$> projectCreateTicketF now shrRecip prjRecip remoteAuthor body mfwd luActivity ticket mtarget - _ -> error "Unsupported create object type for projects" - FollowActivity follow -> - (,Nothing) <$> projectFollowF shrRecip prjRecip now remoteAuthor body mfwd luActivity follow - OfferActivity (Offer obj target) -> - case obj of - OfferTicket ticket -> - (,Nothing) <$> projectOfferTicketF now shrRecip prjRecip remoteAuthor body mfwd luActivity ticket target - OfferDep dep -> - projectOfferDepF now shrRecip prjRecip remoteAuthor body mfwd luActivity dep target - _ -> return ("Unsupported offer object type for projects", Nothing) - ResolveActivity resolve -> - (,Nothing) <$> projectResolveF now shrRecip prjRecip remoteAuthor body mfwd luActivity resolve - UndoActivity undo -> - (,Nothing) <$> projectUndoF shrRecip prjRecip now remoteAuthor body mfwd luActivity undo - _ -> return ("Unsupported activity type for projects", Nothing) - where - errorLocalForwarded (ActivityAuthLocalPerson pid) = - "Project inbox got local forwarded activity by pid#" <> - T.pack (show $ fromSqlKey pid) - errorLocalForwarded (ActivityAuthLocalProject jid) = - "Project inbox got local forwarded activity by jid#" <> - T.pack (show $ fromSqlKey jid) - errorLocalForwarded (ActivityAuthLocalRepo rid) = - "Project inbox got local forwarded activity by rid#" <> - T.pack (show $ fromSqlKey rid) - -handleRepoInbox - :: ShrIdent - -> RpIdent - -> UTCTime - -> ActivityAuthentication - -> ActivityBody - -> ExceptT Text Handler (Text, Maybe (ExceptT Text Worker Text)) -handleRepoInbox shrRecip rpRecip now auth body = do - remoteAuthor <- - case auth of - ActivityAuthLocal local -> throwE $ errorLocalForwarded local - ActivityAuthRemote ra -> return ra - luActivity <- - fromMaybeE (activityId $ actbActivity body) "Activity without 'id'" - localRecips <- do - mrecips <- parseAudience $ activityAudience $ actbActivity body - paudLocalRecips <$> fromMaybeE mrecips "Activity with no recipients" - msig <- checkForwarding $ LocalActorRepo shrRecip rpRecip - let mfwd = (localRecips,) <$> msig - case activitySpecific $ actbActivity body of - ApplyActivity (AP.Apply uObject uTarget) -> - repoApplyF now shrRecip rpRecip remoteAuthor body mfwd luActivity uObject uTarget - AddActivity (AP.Add obj target) -> - case obj of - Right (AddBundle patches) -> - repoAddBundleF now shrRecip rpRecip remoteAuthor body mfwd luActivity patches target - _ -> return ("Unsupported add object type for repos", Nothing) - CreateActivity (Create obj mtarget) -> - case obj of - CreateNote _ note -> - (,Nothing) <$> repoCreateNoteF now shrRecip rpRecip remoteAuthor body mfwd luActivity note - CreateTicket _ ticket -> - (,Nothing) <$> repoCreateTicketF now shrRecip rpRecip remoteAuthor body mfwd luActivity ticket mtarget - _ -> error "Unsupported create object type for repos" - FollowActivity follow -> - (,Nothing) <$> repoFollowF shrRecip rpRecip now remoteAuthor body mfwd luActivity follow - OfferActivity (Offer obj target) -> - case obj of - OfferTicket ticket -> - (,Nothing) <$> repoOfferTicketF now shrRecip rpRecip remoteAuthor body mfwd luActivity ticket target - OfferDep dep -> - repoOfferDepF now shrRecip rpRecip remoteAuthor body mfwd luActivity dep target - _ -> return ("Unsupported offer object type for repos", Nothing) - ResolveActivity resolve -> - (,Nothing) <$> repoResolveF now shrRecip rpRecip remoteAuthor body mfwd luActivity resolve - UndoActivity undo-> - (,Nothing) <$> repoUndoF shrRecip rpRecip now remoteAuthor body mfwd luActivity undo - _ -> return ("Unsupported activity type for repos", Nothing) - where - errorLocalForwarded (ActivityAuthLocalPerson pid) = - "Repo inbox got local forwarded activity by pid#" <> - T.pack (show $ fromSqlKey pid) - errorLocalForwarded (ActivityAuthLocalProject jid) = - "Repo inbox got local forwarded activity by jid#" <> - T.pack (show $ fromSqlKey jid) - errorLocalForwarded (ActivityAuthLocalRepo rid) = - "Repo inbox got local forwarded activity by rid#" <> - T.pack (show $ fromSqlKey rid) -} - -fixRunningDeliveries :: (MonadIO m, MonadLogger m, IsSqlBackend backend) => ReaderT backend m () -fixRunningDeliveries = do - c <- updateWhereCount [UnlinkedDeliveryRunning ==. True] [UnlinkedDeliveryRunning =. False] - unless (c == 0) $ logWarn $ T.concat - [ "fixRunningDeliveries fixed " - , T.pack (show c) - , " linked deliveries" - ] - c' <- updateWhereCount [DeliveryRunning ==. True] [DeliveryRunning =. False] - unless (c' == 0) $ logWarn $ T.concat - [ "fixRunningDeliveries fixed " - , T.pack (show c') - , " unlinked deliveries" - ] - c'' <- updateWhereCount [ForwardingRunning ==. True] [ForwardingRunning =. False] - unless (c'' == 0) $ logWarn $ T.concat - [ "fixRunningDeliveries fixed " - , T.pack (show c'') - , " forwarding deliveries" - ] - -data ForwarderBy f - = FwderPerson (f ForwarderPerson) - | FwderGroup (f ForwarderGroup) - | FwderRepo (f ForwarderRepo) - | FwderDeck (f ForwarderDeck) - | FwderLoom (f ForwarderLoom) - deriving (Generic, FunctorB, ConstraintsB) - -partitionFwders - :: [ForwarderBy f] - -> ( [f ForwarderPerson] - , [f ForwarderGroup] - , [f ForwarderRepo] - , [f ForwarderDeck] - , [f ForwarderLoom] - ) -partitionFwders = foldl' f ([], [], [], [], []) - where - f (ps, gs, rs, ds, ls) = \ fwder -> - case fwder of - FwderPerson p -> (p : ps, gs, rs, ds, ls) - FwderGroup g -> (ps, g : gs, rs, ds, ls) - FwderRepo r -> (ps, gs, r : rs, ds, ls) - FwderDeck d -> (ps, gs, rs, d : ds, ls) - FwderLoom l -> (ps, gs, rs, ds, l : ls) - -retryOutboxDelivery :: Worker () -retryOutboxDelivery = do - logInfo "Periodic delivery starting" - now <- liftIO $ getCurrentTime - (unlinkedHttp, linkedHttp, forwardingHttp) <- runSiteDB $ do - - -- Get all unlinked deliveries which aren't running already in outbox - -- post handlers - unlinked' <- E.select $ E.from $ \ (udl `E.InnerJoin` ob `E.InnerJoin` ura `E.InnerJoin` ro `E.InnerJoin` i `E.LeftOuterJoin` ra `E.LeftOuterJoin` rc) -> do - E.on $ E.just (ro E.^. RemoteObjectId) E.==. rc E.?. RemoteCollectionIdent - E.on $ E.just (ro E.^. RemoteObjectId) E.==. ra E.?. RemoteActorIdent - E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId - E.on $ ura E.^. UnfetchedRemoteActorIdent E.==. ro E.^. RemoteObjectId - E.on $ udl E.^. UnlinkedDeliveryRecipient E.==. ura E.^. UnfetchedRemoteActorId - E.on $ udl E.^. UnlinkedDeliveryActivity E.==. ob E.^. OutboxItemId - E.where_ $ udl E.^. UnlinkedDeliveryRunning E.==. E.val False - E.orderBy [E.asc $ ro E.^. RemoteObjectInstance, E.asc $ ura E.^. UnfetchedRemoteActorId] - return - ( i E.^. InstanceId - , i E.^. InstanceHost - , ura E.^. UnfetchedRemoteActorId - , ro E.^. RemoteObjectIdent - , ura E.^. UnfetchedRemoteActorSince - , udl E.^. UnlinkedDeliveryId - , udl E.^. UnlinkedDeliveryActivity - , udl E.^. UnlinkedDeliveryForwarding - , ob E.^. OutboxItemActivity - , ra E.?. RemoteActorId - , rc E.?. RemoteCollectionId - ) - - -- Strip the E.Value wrappers and organize the records for the - -- filtering and grouping we'll need to do - let unlinked = map adaptUnlinked unlinked' - - -- Split into found (recipient has been reached) and lonely (recipient - -- hasn't been reached - (found, lonely) = partitionMaybes unlinked - - -- Turn the found ones into linked deliveries - deleteWhere [UnlinkedDeliveryId <-. map (unlinkedID . snd) found] - insertMany_ $ mapMaybe toLinked found - - -- We're left with the lonely ones. We'll check which actors have been - -- unreachable for too long, and we'll delete deliveries for them. The - -- rest of the actors we'll try to reach by HTTP. - dropAfter <- lift $ asksSite $ appDropDeliveryAfter . appSettings - let (lonelyOld, lonelyNew) = - partitionEithers $ map (decideBySinceUDL dropAfter now) lonely - deleteWhere [UnlinkedDeliveryId <-. lonelyOld] - - -- Now let's grab the linked deliveries, and similarly delete old ones - -- and return the rest for HTTP delivery. - linked <- E.select $ E.from $ \ (dl `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i `E.InnerJoin` ob) -> do - E.on $ dl E.^. DeliveryActivity E.==. ob E.^. OutboxItemId - E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId - E.on $ ra E.^. RemoteActorIdent E.==. ro E.^. RemoteObjectId - E.on $ dl E.^. DeliveryRecipient E.==. ra E.^. RemoteActorId - E.where_ $ dl E.^. DeliveryRunning E.==. E.val False - E.orderBy [E.asc $ ro E.^. RemoteObjectInstance, E.asc $ ra E.^. RemoteActorId] - return - ( i E.^. InstanceId - , i E.^. InstanceHost - , ra E.^. RemoteActorId - , ro E.^. RemoteObjectIdent - , ra E.^. RemoteActorInbox - , ra E.^. RemoteActorErrorSince - , dl E.^. DeliveryId - , dl E.^. DeliveryForwarding - , ob E.^. OutboxItemActivity - ) - let (linkedOld, linkedNew) = - partitionEithers $ - map (decideBySinceDL dropAfter now . adaptLinked) linked - deleteWhere [DeliveryId <-. linkedOld] - - -- Same for forwarding deliveries, which are always linked - forwarding <- E.select $ E.from $ - \ (fw `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i - `E.LeftOuterJoin` fwp - `E.LeftOuterJoin` fwg - `E.LeftOuterJoin` fwr - `E.LeftOuterJoin` fwd - `E.LeftOuterJoin` fwl - ) -> do - E.on $ E.just (fw E.^. ForwardingId) E.==. fwl E.?. ForwarderLoomTask - E.on $ E.just (fw E.^. ForwardingId) E.==. fwd E.?. ForwarderDeckTask - E.on $ E.just (fw E.^. ForwardingId) E.==. fwr E.?. ForwarderRepoTask - E.on $ E.just (fw E.^. ForwardingId) E.==. fwg E.?. ForwarderGroupTask - E.on $ E.just (fw E.^. ForwardingId) E.==. fwp E.?. ForwarderPersonTask - E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId - E.on $ ra E.^. RemoteActorIdent E.==. ro E.^. RemoteObjectId - E.on $ fw E.^. ForwardingRecipient E.==. ra E.^. RemoteActorId - E.where_ $ fw E.^. ForwardingRunning E.==. E.val False - E.orderBy [E.asc $ ro E.^. RemoteObjectInstance, E.asc $ ra E.^. RemoteActorId] - return (i, ra, fw, fwp, fwg, fwr, fwd, fwl) - let (forwardingOld, forwardingNew) = - partitionEithers $ - map (decideBySinceFW dropAfter now . adaptForwarding) - forwarding - (fwidsOld, fwdersOld) = unzip forwardingOld - (fwpidsOld, fwgidsOld, fwridsOld, fwdidsOld, fwlidsOld) = - partitionFwders fwdersOld - deleteWhere [ForwarderPersonId <-. fwpidsOld] - deleteWhere [ForwarderGroupId <-. fwgidsOld] - deleteWhere [ForwarderRepoId <-. fwridsOld] - deleteWhere [ForwarderDeckId <-. fwdidsOld] - deleteWhere [ForwarderLoomId <-. fwlidsOld] - deleteWhere [ForwardingId <-. fwidsOld] - - return - ( groupUnlinked lonelyNew - , groupLinked linkedNew - , groupForwarding forwardingNew - ) - - let deliver = deliverHttpBL - logInfo "Periodic delivery prepared DB, starting async HTTP POSTs" - - logDebug $ - "Periodic delivery forking linked " <> - T.pack (show $ map (renderAuthority . snd . fst) linkedHttp) - waitsDL <- traverse (fork . deliverLinked deliver now) linkedHttp - - logDebug $ - "Periodic delivery forking forwarding " <> - T.pack (show $ map (renderAuthority . snd . fst) forwardingHttp) - waitsFW <- traverse (fork . deliverForwarding now) forwardingHttp - - logDebug $ - "Periodic delivery forking unlinked " <> - T.pack (show $ map (renderAuthority . snd . fst) unlinkedHttp) - waitsUDL <- traverse (fork . deliverUnlinked deliver now) unlinkedHttp - - logDebug $ - T.concat - [ "Periodic delivery waiting for ", T.pack $ show $ length waitsDL - , " linked" - ] - resultsDL <- sequence waitsDL - unless (and resultsDL) $ logError "Periodic delivery DL error" - - logDebug $ - T.concat - [ "Periodic delivery waiting for ", T.pack $ show $ length waitsFW - , " forwarding" - ] - resultsFW <- sequence waitsFW - unless (and resultsFW) $ logError "Periodic delivery FW error" - - logDebug $ - T.concat - [ "Periodic delivery waiting for " - , T.pack $ show $ length waitsUDL, " unlinked" - ] - resultsUDL <- sequence waitsUDL - unless (and resultsUDL) $ logError "Periodic delivery UDL error" - - logInfo "Periodic delivery done" - where - adaptUnlinked - (E.Value iid, E.Value h, E.Value uraid, E.Value luRecip, E.Value since, E.Value udlid, E.Value obid, E.Value fwd, E.Value act, E.Value mraid, E.Value mrcid) = - ( Left <$> mraid <|> Right <$> mrcid - , ( ( (iid, h) - , ((uraid, luRecip), (udlid, fwd, obid, BL.fromStrict $ persistJSONBytes act)) - ) - , since - ) - ) - - unlinkedID ((_, (_, (udlid, _, _, _))), _) = udlid - - toLinked (Left raid, ((_, (_, (_, fwd, obid, _))), _)) = Just $ Delivery raid obid fwd False - toLinked (Right _ , _ ) = Nothing - - relevant dropAfter now since = addUTCTime dropAfter since > now - - decideBySinceUDL dropAfter now (udl@(_, (_, (udlid, _, _, _))), msince) = - case msince of - Nothing -> Right udl - Just since -> - if relevant dropAfter now since - then Right udl - else Left udlid - - adaptLinked - (E.Value iid, E.Value h, E.Value raid, E.Value ident, E.Value inbox, E.Value since, E.Value dlid, E.Value fwd, E.Value act) = - ( ( (iid, h) - , ((raid, (ident, inbox)), (dlid, fwd, BL.fromStrict $ persistJSONBytes act)) - ) - , since - ) - - decideBySinceDL dropAfter now (dl@(_, (_, (dlid, _, _))), msince) = - case msince of - Nothing -> Right dl - Just since -> - if relevant dropAfter now since - then Right dl - else Left dlid - - adaptForwarding - ( Entity iid (Instance h) - , Entity raid (RemoteActor _ _ inbox _ since) - , Entity fwid (Forwarding _ _ body sig _) - , mfwp, mfwg, mfwr, mfwd, mfwl - ) = - ( ( (iid, h) - , ( (raid, inbox) - , ( fwid - , BL.fromStrict body - , case (mfwp, mfwg, mfwr, mfwd, mfwl) of - (Nothing, Nothing, Nothing, Nothing, Nothing) -> - error "Found fwid without a Forwarder* record" - (Just fwp, Nothing, Nothing, Nothing, Nothing) -> - FwderPerson fwp - (Nothing, Just fwg, Nothing, Nothing, Nothing) -> - FwderGroup fwg - (Nothing, Nothing, Just fwr, Nothing, Nothing) -> - FwderRepo fwr - (Nothing, Nothing, Nothing, Just fwd, Nothing) -> - FwderDeck fwd - (Nothing, Nothing, Nothing, Nothing, Just fwl) -> - FwderLoom fwl - _ -> error "Found fwid with multiple forwarders" - , sig - ) - ) - ) - , since - ) - - decideBySinceFW dropAfter now (fw@(_, (_, (fwid, _, fwder, _))), msince) = - case msince of - Nothing -> Right fw - Just since -> - if relevant dropAfter now since - then Right fw - else Left (fwid, bmap entityKey fwder) - - groupUnlinked - = map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd) - . groupWithExtractBy ((==) `on` fst) fst snd - - groupLinked - = map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd) - . groupWithExtractBy ((==) `on` fst) fst snd - - groupForwarding - = map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd) - . groupWithExtractBy ((==) `on` fst) fst snd - - fork action = do - wait <- asyncWorker action - return $ do - result <- wait - case result of - Left e -> do - logError $ "Periodic delivery error! " <> T.pack (displayException e) - return False - Right success -> return success - - deliverLinked deliver now ((_, h), recips) = do - logDebug $ "Periodic deliver starting linked for host " <> renderAuthority h - waitsR <- for recips $ \ ((raid, (ident, inbox)), delivs) -> fork $ do - logDebug $ - "Periodic deliver starting linked for actor " <> - renderObjURI (ObjURI h ident) - waitsD <- for delivs $ \ (dlid, fwd, doc) -> fork $ do - let fwd' = if fwd then Just ident else Nothing - e <- deliver doc fwd' h inbox - case e of - Left err -> do - logError $ T.concat - [ "Periodic DL delivery #", T.pack $ show dlid - , " error for <", renderObjURI $ ObjURI h ident, ">: " - , T.pack $ displayException err - ] - return False - Right _resp -> do - runSiteDB $ delete dlid - return True - results <- sequence waitsD - runSiteDB $ - if and results - then update raid [RemoteActorErrorSince =. Nothing] - else if or results - then update raid [RemoteActorErrorSince =. Just now] - else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now] - return True - results <- sequence waitsR - unless (and results) $ - logError $ "Periodic DL delivery error for host " <> renderAuthority h - return True - - deliverUnlinked deliver now ((iid, h), recips) = do - logDebug $ "Periodic deliver starting unlinked for host " <> renderAuthority h - waitsR <- for recips $ \ ((uraid, luRecip), delivs) -> fork $ do - logDebug $ - "Periodic deliver starting unlinked for actor " <> - renderObjURI (ObjURI h luRecip) - e <- fetchRemoteActor iid h luRecip - case e of - Right (Right mera) -> - case mera of - Nothing -> runSiteDB $ deleteWhere [UnlinkedDeliveryId <-. map fst4 (NE.toList delivs)] - Just (Entity raid ra) -> do - waitsD <- for delivs $ \ (udlid, fwd, obid, doc) -> fork $ do - let fwd' = if fwd then Just luRecip else Nothing - e' <- deliver doc fwd' h $ remoteActorInbox ra - case e' of - Left _err -> do - runSiteDB $ do - delete udlid - insert_ $ Delivery raid obid fwd False - return False - Right _resp -> do - runSiteDB $ delete udlid - return True - results <- sequence waitsD - runSiteDB $ - if and results - then update raid [RemoteActorErrorSince =. Nothing] - else if or results - then update raid [RemoteActorErrorSince =. Just now] - else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now] - _ -> runSiteDB $ updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now] - return True - results <- sequence waitsR - unless (and results) $ - logError $ "Periodic UDL delivery error for host " <> renderAuthority h - return True - - deliverForwarding now ((_, h), recips) = do - logDebug $ "Periodic deliver starting forwarding for host " <> renderAuthority h - waitsR <- for recips $ \ ((raid, inbox), delivs) -> fork $ do - logDebug $ - "Periodic deliver starting forwarding for inbox " <> - renderObjURI (ObjURI h inbox) - waitsD <- for delivs $ \ (fwid, body, fwderE, sig) -> fork $ do - let (fwderK, senderK) = splitForwarder fwderE - sender <- renderLocalActor <$> hashLocalActor senderK - e <- forwardActivity (ObjURI h inbox) sig sender body - case e of - Left _err -> return False - Right _resp -> do - runSiteDB $ do - case fwderK of - FwderPerson k -> delete k - FwderGroup k -> delete k - FwderRepo k -> delete k - FwderDeck k -> delete k - FwderLoom k -> delete k - delete fwid - return True - results <- sequence waitsD - runSiteDB $ - if and results - then update raid [RemoteActorErrorSince =. Nothing] - else if or results - then update raid [RemoteActorErrorSince =. Just now] - else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now] - return True - results <- sequence waitsR - unless (and results) $ - logError $ "Periodic FW delivery error for host " <> renderAuthority h - return True - where - splitForwarder (FwderPerson (Entity f (ForwarderPerson _ p))) = - (FwderPerson f, LocalActorPerson p) - splitForwarder (FwderGroup (Entity f (ForwarderGroup _ g))) = - (FwderGroup f, LocalActorGroup g) - splitForwarder (FwderRepo (Entity f (ForwarderRepo _ r))) = - (FwderRepo f, LocalActorRepo r) - splitForwarder (FwderDeck (Entity f (ForwarderDeck _ d))) = - (FwderDeck f, LocalActorDeck d) - splitForwarder (FwderLoom (Entity f (ForwarderLoom _ l))) = - (FwderLoom f, LocalActorLoom l) diff --git a/src/Vervis/Handler/Cloth.hs b/src/Vervis/Handler/Cloth.hs index 15e5a16..6e357f5 100644 --- a/src/Vervis/Handler/Cloth.hs +++ b/src/Vervis/Handler/Cloth.hs @@ -86,7 +86,6 @@ import Database.Persist.Local import Yesod.Persist.Local import Vervis.ActivityPub -import Vervis.Actor import Vervis.API import Vervis.Cloth import Vervis.Data.Actor @@ -100,6 +99,7 @@ import Vervis.Paginate import Vervis.Persist.Actor import Vervis.Recipient import Vervis.Ticket +import Vervis.Web.Actor getClothR :: KeyHashid Loom -> KeyHashid TicketLoom -> Handler TypedContent getClothR loomHash clothHash = do diff --git a/src/Vervis/Handler/Deck.hs b/src/Vervis/Handler/Deck.hs index ac4721e..cabb8d6 100644 --- a/src/Vervis/Handler/Deck.hs +++ b/src/Vervis/Handler/Deck.hs @@ -56,6 +56,7 @@ where import Control.Monad import Control.Monad.Trans.Except import Data.Aeson +import Data.ByteString (ByteString) import Data.Foldable import Data.Maybe (fromMaybe) import Data.Text (Text) @@ -90,14 +91,16 @@ import Data.Paginate.Local import Database.Persist.Local import Yesod.Persist.Local -import Vervis.Actor import Vervis.API -import Vervis.Federation +import Vervis.Federation.Auth +import Vervis.FedURI import Vervis.Form.Project import Vervis.Foundation import Vervis.Model import Vervis.Paginate +import Vervis.Recipient import Vervis.Settings +import Vervis.Web.Actor import Vervis.Widget.Person import qualified Vervis.Client as C @@ -147,8 +150,41 @@ getDeckR deckHash = do getDeckInboxR :: KeyHashid Deck -> Handler TypedContent getDeckInboxR = getInbox DeckInboxR deckActor -postDeckInboxR :: KeyHashid Deck -> Handler TypedContent -postDeckInboxR _ = error "Temporarily disabled" +postDeckInboxR :: KeyHashid Deck -> Handler () +postDeckInboxR recipDeckHash = + postInbox $ handleRobotInbox (LocalActorDeck recipDeckHash) handle + where + handle + :: RemoteAuthor + -> Maybe (RecipientRoutes, ByteString) + -> LocalURI + -> SpecificActivity URIMode + -> ExceptT Text Handler (Text, Maybe (ExceptT Text Worker Text)) + handle _author _mfwd _luActivity specific = + case specific of + {- + CreateActivity (Create obj mtarget) -> + case obj of + CreateNote _ note -> + (,Nothing) <$> projectCreateNoteF now shrRecip prjRecip remoteAuthor body mfwd luActivity note + CreateTicket _ ticket -> + (,Nothing) <$> projectCreateTicketF now shrRecip prjRecip remoteAuthor body mfwd luActivity ticket mtarget + _ -> error "Unsupported create object type for projects" + FollowActivity follow -> + (,Nothing) <$> projectFollowF shrRecip prjRecip now remoteAuthor body mfwd luActivity follow + OfferActivity (Offer obj target) -> + case obj of + OfferTicket ticket -> + (,Nothing) <$> projectOfferTicketF now shrRecip prjRecip remoteAuthor body mfwd luActivity ticket target + OfferDep dep -> + projectOfferDepF now shrRecip prjRecip remoteAuthor body mfwd luActivity dep target + _ -> return ("Unsupported offer object type for projects", Nothing) + ResolveActivity resolve -> + (,Nothing) <$> projectResolveF now shrRecip prjRecip remoteAuthor body mfwd luActivity resolve + UndoActivity undo -> + (,Nothing) <$> projectUndoF shrRecip prjRecip now remoteAuthor body mfwd luActivity undo + -} + _ -> return ("Unsupported activity type for decks", Nothing) getDeckOutboxR :: KeyHashid Deck -> Handler TypedContent getDeckOutboxR = getOutbox DeckOutboxR deckActor diff --git a/src/Vervis/Handler/Discussion.hs b/src/Vervis/Handler/Discussion.hs index 7f53c39..71d176b 100644 --- a/src/Vervis/Handler/Discussion.hs +++ b/src/Vervis/Handler/Discussion.hs @@ -1,6 +1,6 @@ {- This file is part of Vervis. - - - Written in 2016, 2019, 2020 by fr33domlover . + - Written in 2016, 2019, 2020, 2022 by fr33domlover . - - ♡ Copying is an act of love. Please copy, reuse and share. - @@ -57,7 +57,6 @@ import Yesod.Persist.Local import Vervis.API import Vervis.Discussion -import Vervis.Federation import Vervis.FedURI import Vervis.Form.Discussion import Vervis.Foundation diff --git a/src/Vervis/Handler/Group.hs b/src/Vervis/Handler/Group.hs index abfd07d..71dd001 100644 --- a/src/Vervis/Handler/Group.hs +++ b/src/Vervis/Handler/Group.hs @@ -65,12 +65,12 @@ import Yesod.Hashids import qualified Web.ActivityPub as AP -import Vervis.Actor import Vervis.Foundation import Vervis.Model import Vervis.Model.Group import Vervis.Settings (widgetFile) import Vervis.Time (showDate) +import Vervis.Web.Actor getGroupR :: KeyHashid Group -> Handler TypedContent getGroupR groupHash = do diff --git a/src/Vervis/Handler/Inbox.hs b/src/Vervis/Handler/Inbox.hs index eab3496..19ea3bd 100644 --- a/src/Vervis/Handler/Inbox.hs +++ b/src/Vervis/Handler/Inbox.hs @@ -96,7 +96,6 @@ import Vervis.ActivityPub import Vervis.ActorKey import Vervis.API import Vervis.FedURI -import Vervis.Federation import Vervis.Federation.Auth import Vervis.Foundation import Vervis.Model hiding (Ticket) diff --git a/src/Vervis/Handler/Loom.hs b/src/Vervis/Handler/Loom.hs index d1d4b50..06ddb1f 100644 --- a/src/Vervis/Handler/Loom.hs +++ b/src/Vervis/Handler/Loom.hs @@ -27,6 +27,7 @@ where import Control.Monad import Control.Monad.Trans.Except import Data.Aeson +import Data.ByteString (ByteString) import Data.Foldable import Data.Maybe (fromMaybe) import Data.Text (Text) @@ -60,13 +61,15 @@ import Data.Paginate.Local import Database.Persist.Local import Yesod.Persist.Local -import Vervis.Actor import Vervis.API -import Vervis.Federation +import Vervis.Federation.Auth +import Vervis.FedURI import Vervis.Foundation import Vervis.Model import Vervis.Paginate +import Vervis.Recipient import Vervis.Settings +import Vervis.Web.Actor getLoomR :: KeyHashid Loom -> Handler TypedContent getLoomR loomHash = do @@ -105,8 +108,19 @@ getLoomR loomHash = do getLoomInboxR :: KeyHashid Loom -> Handler TypedContent getLoomInboxR = getInbox LoomInboxR loomActor -postLoomInboxR :: KeyHashid Loom -> Handler TypedContent -postLoomInboxR _ = error "Temporarily disabled" +postLoomInboxR :: KeyHashid Loom -> Handler () +postLoomInboxR recipLoomHash = + postInbox $ handleRobotInbox (LocalActorLoom recipLoomHash) handle + where + handle + :: RemoteAuthor + -> Maybe (RecipientRoutes, ByteString) + -> LocalURI + -> AP.SpecificActivity URIMode + -> ExceptT Text Handler (Text, Maybe (ExceptT Text Worker Text)) + handle _author _mfwd _luActivity specific = + case specific of + _ -> return ("Unsupported activity type for looms", Nothing) getLoomOutboxR :: KeyHashid Loom -> Handler TypedContent getLoomOutboxR = getOutbox LoomOutboxR loomActor diff --git a/src/Vervis/Handler/Person.hs b/src/Vervis/Handler/Person.hs index d251226..3cf497a 100644 --- a/src/Vervis/Handler/Person.hs +++ b/src/Vervis/Handler/Person.hs @@ -65,7 +65,6 @@ import Data.Either.Local import Database.Persist.Local import Vervis.ActivityPub -import Vervis.Actor import Vervis.ActorKey import Vervis.API import Vervis.Data.Actor @@ -78,6 +77,7 @@ import Vervis.Recipient import Vervis.Secure import Vervis.Settings import Vervis.Ticket +import Vervis.Web.Actor import Vervis.Widget import Vervis.Widget.Person diff --git a/src/Vervis/Handler/Repo.hs b/src/Vervis/Handler/Repo.hs index 4c25245..962cc4e 100644 --- a/src/Vervis/Handler/Repo.hs +++ b/src/Vervis/Handler/Repo.hs @@ -72,6 +72,7 @@ import Control.Monad.Logger (logWarn) import Control.Monad.Trans.Except import Data.Bifunctor import Data.Binary.Put +import Data.ByteString (ByteString) import Data.Foldable import Data.Git.Graph import Data.Git.Harder @@ -147,17 +148,20 @@ import Yesod.Persist.Local import qualified Data.Git.Local as G (createRepo) import qualified Darcs.Local.Repository as D (createRepo) -import Vervis.Actor import Vervis.API +import Vervis.Federation.Auth +import Vervis.FedURI import Vervis.Foundation import Vervis.Path import Vervis.Model import Vervis.Model.Ident import Vervis.Paginate import Vervis.Readme +import Vervis.Recipient import Vervis.Settings import Vervis.SourceTree import Vervis.Style +import Vervis.Web.Actor import qualified Vervis.Formatting as F import qualified Vervis.Hook as H @@ -206,8 +210,48 @@ getRepoR repoHash = do getRepoInboxR :: KeyHashid Repo -> Handler TypedContent getRepoInboxR = getInbox RepoInboxR repoActor -postRepoInboxR :: KeyHashid Repo -> Handler TypedContent -postRepoInboxR _ = error "Temporarily disabled" +postRepoInboxR :: KeyHashid Repo -> Handler () +postRepoInboxR recipRepoHash = + postInbox $ handleRobotInbox (LocalActorRepo recipRepoHash) handle + where + handle + :: RemoteAuthor + -> Maybe (RecipientRoutes, ByteString) + -> LocalURI + -> AP.SpecificActivity URIMode + -> ExceptT Text Handler (Text, Maybe (ExceptT Text Worker Text)) + handle _author _mfwd _luActivity specific = + case specific of + {- + ApplyActivity (AP.Apply uObject uTarget) -> + repoApplyF now shrRecip rpRecip remoteAuthor body mfwd luActivity uObject uTarget + AddActivity (AP.Add obj target) -> + case obj of + Right (AddBundle patches) -> + repoAddBundleF now shrRecip rpRecip remoteAuthor body mfwd luActivity patches target + _ -> return ("Unsupported add object type for repos", Nothing) + CreateActivity (Create obj mtarget) -> + case obj of + CreateNote _ note -> + (,Nothing) <$> repoCreateNoteF now shrRecip rpRecip remoteAuthor body mfwd luActivity note + CreateTicket _ ticket -> + (,Nothing) <$> repoCreateTicketF now shrRecip rpRecip remoteAuthor body mfwd luActivity ticket mtarget + _ -> error "Unsupported create object type for repos" + FollowActivity follow -> + (,Nothing) <$> repoFollowF shrRecip rpRecip now remoteAuthor body mfwd luActivity follow + OfferActivity (Offer obj target) -> + case obj of + OfferTicket ticket -> + (,Nothing) <$> repoOfferTicketF now shrRecip rpRecip remoteAuthor body mfwd luActivity ticket target + OfferDep dep -> + repoOfferDepF now shrRecip rpRecip remoteAuthor body mfwd luActivity dep target + _ -> return ("Unsupported offer object type for repos", Nothing) + ResolveActivity resolve -> + (,Nothing) <$> repoResolveF now shrRecip rpRecip remoteAuthor body mfwd luActivity resolve + UndoActivity undo-> + (,Nothing) <$> repoUndoF shrRecip rpRecip now remoteAuthor body mfwd luActivity undo + -} + _ -> return ("Unsupported activity type for repos", Nothing) getRepoOutboxR :: KeyHashid Repo -> Handler TypedContent getRepoOutboxR = getOutbox RepoOutboxR repoActor diff --git a/src/Vervis/Handler/Ticket.hs b/src/Vervis/Handler/Ticket.hs index bb58212..7161fff 100644 --- a/src/Vervis/Handler/Ticket.hs +++ b/src/Vervis/Handler/Ticket.hs @@ -131,11 +131,9 @@ import Database.Persist.Local import Yesod.Persist.Local import Vervis.ActivityPub -import Vervis.Actor import Vervis.API import Vervis.Data.Actor import Vervis.Discussion -import Vervis.Federation import Vervis.FedURI import Vervis.Foundation import Vervis.Handler.Discussion @@ -152,6 +150,7 @@ import Vervis.Style import Vervis.Ticket import Vervis.TicketFilter (filterTickets) import Vervis.Time (showDate) +import Vervis.Web.Actor getTicketR :: KeyHashid Deck -> KeyHashid TicketDeck -> Handler TypedContent getTicketR deckHash ticketHash = do diff --git a/src/Vervis/Actor.hs b/src/Vervis/Web/Actor.hs similarity index 94% rename from src/Vervis/Actor.hs rename to src/Vervis/Web/Actor.hs index 28413c7..119ff62 100644 --- a/src/Vervis/Actor.hs +++ b/src/Vervis/Web/Actor.hs @@ -13,7 +13,7 @@ - . -} -module Vervis.Actor +module Vervis.Web.Actor ( getInbox , postInbox , getOutbox @@ -21,6 +21,7 @@ module Vervis.Actor , getFollowersCollection , getActorFollowersCollection , getFollowingCollection + , handleRobotInbox ) where @@ -37,6 +38,7 @@ import Data.Aeson import Data.Aeson.Encode.Pretty import Data.Bifunctor import Data.Bitraversable +import Data.ByteString (ByteString) import Data.Foldable (for_) import Data.List import Data.Maybe @@ -77,6 +79,7 @@ import Yesod.Hashids import Yesod.MonadSite import Yesod.RenderSource +import Control.Monad.Trans.Except.Local import Data.Aeson.Local import Data.Either.Local import Data.EventTime.Local @@ -89,7 +92,6 @@ import Vervis.ActivityPub import Vervis.ActorKey import Vervis.API import Vervis.FedURI -import Vervis.Federation import Vervis.Federation.Auth import Vervis.Foundation import Vervis.Model hiding (Ticket) @@ -453,3 +455,29 @@ getFollowingCollection here actor hash = do getRemotes aid = map (followRemoteTarget . entityVal) <$> selectList [FollowRemoteActor ==. aid] [] + +handleRobotInbox + :: LocalActorBy KeyHashid + -> ( RemoteAuthor + -> Maybe (RecipientRoutes, ByteString) + -> LocalURI + -> SpecificActivity URIMode + -> ExceptT Text Handler (Text, Maybe (ExceptT Text Worker Text)) + ) + -> UTCTime + -> ActivityAuthentication + -> ActivityBody + -> ExceptT Text Handler (Text, Maybe (ExceptT Text Worker Text)) +handleRobotInbox recipByHash handleSpecific now auth body = do + remoteAuthor <- + case auth of + ActivityAuthLocal _ -> throwE "Got a forwarded local activity, I don't need those" + ActivityAuthRemote ra -> return ra + luActivity <- + fromMaybeE (activityId $ actbActivity body) "Activity without 'id'" + localRecips <- do + mrecips <- parseAudience $ activityAudience $ actbActivity body + paudLocalRecips <$> fromMaybeE mrecips "Activity with no recipients" + msig <- checkForwarding recipByHash + let mfwd = (localRecips,) <$> msig + handleSpecific remoteAuthor mfwd luActivity (activitySpecific $ actbActivity body) diff --git a/vervis.cabal b/vervis.cabal index e7b4e72..9757ff7 100644 --- a/vervis.cabal +++ b/vervis.cabal @@ -124,7 +124,6 @@ library Vervis.Access Vervis.ActivityPub - Vervis.Actor Vervis.ActorKey Vervis.API Vervis.Avatar @@ -143,7 +142,7 @@ library Vervis.Delivery Vervis.Discussion - Vervis.Federation + --Vervis.Federation Vervis.Federation.Auth --Vervis.Federation.Discussion --Vervis.Federation.Offer @@ -224,6 +223,9 @@ library Vervis.Ticket Vervis.TicketFilter Vervis.Time + + Vervis.Web.Actor + Vervis.Widget Vervis.Widget.Discussion Vervis.Widget.Person