1
0
Fork 0
mirror of https://code.naskya.net/repos/ndqEd synced 2025-01-10 11:16:46 +09:00

Implement outbox remote delivery, in handler and periodic, not used yet

This commit is contained in:
fr33domlover 2019-04-16 14:27:50 +00:00
parent 7946fe441d
commit d5eefd1553
11 changed files with 490 additions and 394 deletions

View file

@ -54,6 +54,20 @@ InboxItemLocal
UniqueInboxItemLocal person activity UniqueInboxItemLocal person activity
UnlinkedDelivery
recipient UnfetchedRemoteActorId
activity OutboxItemId
running Bool
UniqueUnlinkedDelivery recipient activity
Delivery
recipient RemoteActorId
activity OutboxItemId
running Bool
UniqueDelivery recipient activity
VerifKey VerifKey
ident LocalURI ident LocalURI
instance InstanceId instance InstanceId
@ -69,6 +83,13 @@ VerifKeySharedUsage
UniqueVerifKeySharedUsage key user UniqueVerifKeySharedUsage key user
UnfetchedRemoteActor
instance InstanceId
ident LocalURI
since UTCTime Maybe
UniqueUnfetchedRemoteActor instance ident
RemoteActor RemoteActor
ident LocalURI ident LocalURI
instance InstanceId instance InstanceId

View file

@ -0,0 +1,20 @@
UnfetchedRemoteActor
instance InstanceId
ident Text
since UTCTime Maybe
UniqueUnfetchedRemoteActor instance ident
UnlinkedDelivery
recipient UnfetchedRemoteActorId
activity OutboxItemId
running Bool
UniqueUnlinkedDelivery recipient activity
Delivery
recipient RemoteActorId
activity OutboxItemId
running Bool
UniqueDelivery recipient activity

View file

@ -63,7 +63,7 @@ data ResultShare m k v a = ResultShare
} }
newResultShare newResultShare
:: MonadIO m => ResultShareSettings m k v a -> m (ResultShare m k v a) :: MonadIO n => ResultShareSettings m k v a -> n (ResultShare m k v a)
newResultShare (ResultShareSettings fork action) = do newResultShare (ResultShareSettings fork action) = do
tvar <- liftIO $ newTVarIO M.empty tvar <- liftIO $ newTVarIO M.empty
return $ ResultShare tvar fork action return $ ResultShare tvar fork action

View file

@ -60,12 +60,15 @@ import Control.Concurrent.Local (forkCheck)
import Database.Persist.Schema.PostgreSQL (schemaBackend) import Database.Persist.Schema.PostgreSQL (schemaBackend)
import Control.Concurrent.ResultShare
import Data.KeyFile import Data.KeyFile
import Web.Hashids.Local import Web.Hashids.Local
import Vervis.ActorKey (generateActorKey, actorKeyRotator) import Vervis.ActorKey (generateActorKey, actorKeyRotator)
import Vervis.Federation
import Vervis.KeyFile (isInitialSetup) import Vervis.KeyFile (isInitialSetup)
import Vervis.RemoteActorStore (newInstanceMutex) import Vervis.RemoteActorStore
-- Import all relevant handler modules here. -- Import all relevant handler modules here.
-- Don't forget to add new modules to your cabal file! -- Don't forget to add new modules to your cabal file!
@ -125,6 +128,8 @@ makeFoundation appSettings = do
appInstanceMutex <- newInstanceMutex appInstanceMutex <- newInstanceMutex
appActorFetchShare <- newResultShare actorFetchShareSettings
appActivities <- newTVarIO mempty appActivities <- newTVarIO mempty
-- We need a log function to create a connection pool. We need a connection -- We need a log function to create a connection pool. We need a connection
@ -169,7 +174,10 @@ makeFoundation appSettings = do
let msg = "DB migration failed: " <> err let msg = "DB migration failed: " <> err
$logError msg $logError msg
error $ T.unpack msg error $ T.unpack msg
Right (_from, _to) -> $logInfo "DB migration success" Right (_from, _to) -> do
$logInfo "DB migration success"
fixRunningDeliveries
deleteUnusedURAs
-- Return the foundation -- Return the foundation
return $ mkFoundation pool capSignKey hashidsCtx return $ mkFoundation pool capSignKey hashidsCtx

View file

@ -15,18 +15,23 @@
module Vervis.Federation module Vervis.Federation
( handleInboxActivity ( handleInboxActivity
, fixRunningDeliveries
, handleOutboxNote , handleOutboxNote
, retryOutboxDelivery
) )
where where
import Prelude import Prelude
import Control.Applicative
import Control.Concurrent.MVar
import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TVar
import Control.Exception hiding (Handler, try) import Control.Exception hiding (Handler, try)
import Control.Monad import Control.Monad
import Control.Monad.Logger.CallStack import Control.Monad.Logger.CallStack
import Control.Monad.Trans.Except import Control.Monad.Trans.Except
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Control.Monad.Trans.Reader
import Data.Aeson (Object) import Data.Aeson (Object)
import Data.Bifunctor import Data.Bifunctor
import Data.Either import Data.Either
@ -43,8 +48,10 @@ import Data.Traversable
import Data.Tuple import Data.Tuple
import Database.Persist hiding (deleteBy) import Database.Persist hiding (deleteBy)
import Database.Persist.Sql hiding (deleteBy) import Database.Persist.Sql hiding (deleteBy)
import Network.HTTP.Client
import Network.HTTP.Types.Header import Network.HTTP.Types.Header
import Network.HTTP.Types.URI import Network.HTTP.Types.URI
import Network.TLS
import UnliftIO.Exception (try) import UnliftIO.Exception (try)
import Yesod.Core hiding (logError, logWarn, logInfo) import Yesod.Core hiding (logError, logWarn, logInfo)
import Yesod.Persist.Core import Yesod.Persist.Core
@ -66,6 +73,7 @@ import Yesod.Hashids
import Data.Either.Local import Data.Either.Local
import Data.List.Local import Data.List.Local
import Data.List.NonEmpty.Local import Data.List.NonEmpty.Local
import Data.Maybe.Local
import Database.Persist.Local import Database.Persist.Local
import Vervis.ActorKey import Vervis.ActorKey
@ -296,329 +304,20 @@ handleInboxActivity raw hActor iidActor rsidActor (Activity _id _luActor audienc
] ]
return (uNote, luContext) return (uNote, luContext)
{- fixRunningDeliveries :: (MonadIO m, MonadLogger m, IsSqlBackend backend) => ReaderT backend m ()
-- | Handle a Note submitted by a local user to their outbox. It can be either fixRunningDeliveries = do
-- a comment on a local ticket, or a comment on some remote context. Return an c <- updateWhereCount [UnlinkedDeliveryRunning ==. True] [UnlinkedDeliveryRunning =. False]
-- error message if the Note is rejected, otherwise the new 'LocalMessageId'. unless (c == 0) $ logWarn $ T.concat
handleOutboxNote :: Text -> Note -> Handler (Either Text LocalMessageId) [ "fixRunningDeliveries fixed "
handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished content) = runExceptT $ do , T.pack (show c)
verifyHostLocal host "Attributed to non-local actor" , " linked deliveries"
verifyNothing mluNote "Note specifies an id" ]
verifyNothing mpublished "Note specifies published" c' <- updateWhereCount [DeliveryRunning ==. True] [DeliveryRunning =. False]
uContext <- fromMaybeE muContext "Note without context" unless (c' == 0) $ logWarn $ T.concat
uRecip <- parseAudience aud "Note has not-just-single-to audience" [ "fixRunningDeliveries fixed "
recipContextParent <- parseRecipContextParent uRecip uContext muParent , T.pack (show c)
, " unlinked deliveries"
(lmid, mdeliver) <- ExceptT $ runDB $ runExceptT $ do ]
(pid, shrUser) <- verifyIsLoggedInUser luAttrib "Note attributed to different actor"
case recipContextParent of
(mparent, Left (shr, prj, num)) -> do
mdid <- lift $ runMaybeT $ do
sid <- MaybeT $ getKeyBy $ UniqueSharer shr
jid <- MaybeT $ getKeyBy $ UniqueProject prj sid
t <- MaybeT $ getValBy $ UniqueTicket jid num
return $ ticketDiscuss t
did <- fromMaybeE mdid "Context: No such local ticket"
mmidParent <- for mparent $ \ parent ->
case parent of
Left (shrParent, lmidParent) -> getLocalParentMessageId did shrParent lmidParent
Right (hParent, luParent) -> do
mrm <- lift $ runMaybeT $ do
iid <- MaybeT $ getKeyBy $ UniqueInstance hParent
MaybeT $ getValBy $ UniqueRemoteMessageIdent iid luParent
rm <- fromMaybeE mrm "Remote parent unknown locally"
let mid = remoteMessageRest rm
m <- lift $ getJust mid
unless (messageRoot m == did) $
throwE "Remote parent belongs to a different discussion"
return mid
let meparent = Left <$> mmidParent
(lmid, _doc) <- lift $ insertMessage luAttrib shrUser pid uContext did muParent meparent content
return (lmid, Nothing)
(mparent, Right (hRecip, luRecip, luContext)) -> do
(did, rdid, rdnew, mluInbox) <- do
miid <- lift $ getKeyBy $ UniqueInstance hRecip
erd <-
case miid of
Just iid -> findExistingRemoteDiscussion iid hRecip luRecip luContext
Nothing -> return Nothing
case erd of
Just (d, rd, minb) -> return (d, rd, False, minb)
Nothing -> ExceptT $ withHostLock hRecip $ runExceptT $ storeRemoteDiscussion miid hRecip luRecip luContext
meparent <- for mparent $ \ parent ->
case parent of
Left (shrParent, lmidParent) -> do
when rdnew $ throwE "Local parent inexistent, RemoteDiscussion is new"
Left <$> getLocalParentMessageId did shrParent lmidParent
Right (hParent, luParent) -> do
mrm <- lift $ runMaybeT $ do
iid <- MaybeT $ getKeyBy $ UniqueInstance hParent
MaybeT $ getValBy $ UniqueRemoteMessageIdent iid luParent
case mrm of
Nothing -> return $ Right $ l2f hParent luParent
Just rm -> Left <$> do
let mid = remoteMessageRest rm
m <- lift $ getJust mid
unless (messageRoot m == did) $
throwE "Remote parent belongs to a different discussion"
return mid
(lmid, doc) <- lift $ insertMessage luAttrib shrUser pid uContext did muParent meparent content
return (lmid, Just (doc, hRecip, maybe (Right (luRecip, rdid)) Left mluInbox))
let handleDeliverError e = logError $ "Outbox POST handler: delivery failed! " <> T.pack (displayException e)
lift $ for_ mdeliver $ \ (doc, hRecip, einb) -> forkHandler handleDeliverError $ do
uInbox <-
case einb of
Left luInbox -> return $ l2f hRecip luInbox
Right (luRecip, rdid) -> do
mluInbox <- runDB $ runMaybeT $ do
iid <- MaybeT $ getKeyBy $ UniqueInstance hRecip
rs <- MaybeT $ getValBy $ UniqueRemoteActor iid luRecip
return $ remoteActorInbox rs
case mluInbox of
Just luInbox -> return $ l2f hRecip luInbox
Nothing -> do
manager <- getsYesod appHttpManager
eactor <- fetchAPID manager actorId hRecip luRecip
case eactor of
Left s -> fail $ "Fetched recipient actor: " ++ s
Right actor -> withHostLock hRecip $ runDB $ do
iid <- either entityKey id <$> insertBy (Instance hRecip)
let luInbox = actorInbox actor
rsid <- either entityKey id <$> insertBy (RemoteActor luRecip iid luInbox Nothing)
update rdid [RemoteDiscussionActor =. Just rsid, RemoteDiscussionUnlinkedActor =. Nothing]
return $ l2f hRecip luInbox
-- TODO based on the httpPostAP usage in postOutboxR
manager <- getsYesod appHttpManager
(akey1, akey2, new1) <- liftIO . readTVarIO =<< getsYesod appActorKeys
renderUrl <- getUrlRender
let (keyID, akey) =
if new1
then (renderUrl ActorKey1R, akey1)
else (renderUrl ActorKey2R, akey2)
sign b = (KeyId $ encodeUtf8 keyID, actorKeySign akey b)
actorID = renderFedURI $ l2f host luAttrib
eres <- httpPostAP manager uInbox (hRequestTarget :| [hHost, hDate, hActivityPubActor]) sign actorID doc
case eres of
Left e -> logError $ "Failed to POST to recipient's inbox: " <> T.pack (displayException e)
Right _ -> logInfo $ T.concat
[ "Successful delivery of <"
, renderFedURI $ l2f (docHost doc) (activityId $ docValue doc)
, " to <"
, renderFedURI uRecip
, ">"
]
return lmid
where
verifyNothing :: Monad m => Maybe a -> Text -> ExceptT Text m ()
verifyNothing Nothing _ = return ()
verifyNothing (Just _) t = throwE t
verifySameHost
:: Monad m => Text -> FedURI -> Text -> ExceptT Text m LocalURI
verifySameHost h fu t = do
let (h', lu) = f2l fu
if h == h'
then return lu
else throwE t
parseRecipContextParent
:: FedURI
-> FedURI
-> Maybe FedURI
-> ExceptT
Text
Handler
( Maybe (Either (ShrIdent, LocalMessageId) (Text, LocalURI))
, Either
(ShrIdent, PrjIdent, Int)
(Text, LocalURI, LocalURI)
)
parseRecipContextParent uRecip uContext muParent = do
let r@(hRecip, luRecip) = f2l uRecip
luContext <- verifySameHost hRecip uContext "Recipient and context on different hosts"
meparent <-
case muParent of
Nothing -> return Nothing
Just uParent ->
if uParent == uContext
then return Nothing
else Just <$> do
let (hParent, luParent) = f2l uParent
parentLocal <- hostIsLocal hParent
if parentLocal
then Left <$> parseComment luParent
else return $ Right (hParent, luParent)
local <- hostIsLocal hRecip
if local
then do
(shr, prj) <- parseProject luRecip
num <- parseTicket (shr, prj) luContext
return (meparent, Left (shr, prj, num))
else do
when (luRecip == luContext) $
throwE "Identical recipient and context"
{-
mrs <- lift $ runDB $ runMaybeT $ do
iid <- MaybeT $ getKeyBy $ UniqueInstance hRecip
MaybeT $ getBy $ UniqueRemoteActor iid luRecip
erecip <-
case mrs of
Just ers -> return $ Left ers
Nothing -> do
manager <- getsYesod appHttpManager
eactor <- fetchAPID manager actorId hRecip luRecip
case eactor of
Left s -> throwE $ "Fetched recipient actor: " <> T.pack s
Right actor -> return $ Right actor
-}
return (meparent, Right (hRecip, luRecip, luContext))
verifyIsLoggedInUser
:: LocalURI -> Text -> ExceptT Text AppDB (PersonId, ShrIdent)
verifyIsLoggedInUser lu t = do
Entity pid p <- requireVerifiedAuth
s <- lift $ getJust $ personIdent p
route2local <- getEncodeRouteLocal
let shr = sharerIdent s
if route2local (SharerR shr) == lu
then return (pid, shr)
else throwE t
findExistingRemoteDiscussion
:: InstanceId
-> Text
-> LocalURI
-> LocalURI
-> ExceptT Text AppDB
(Maybe (DiscussionId, RemoteDiscussionId, Maybe LocalURI))
findExistingRemoteDiscussion iid hRecip luRecip luContext = do
merd <- lift $ getBy $ UniqueRemoteDiscussionIdent iid luContext
for merd $ \ (Entity rdid rd) -> do
eactor <-
requireEitherM
(remoteDiscussionActor rd)
(remoteDiscussionUnlinkedActor rd)
"RemoteDiscussion actor and unlinkedActor both unset"
"RemoteDiscussion actor and unlinkedActor both set"
minb <- case eactor of
Left rsid -> do
rs <- lift $ getJust rsid
unless (remoteActorInstance rs == iid && remoteActorIdent rs == luRecip) $
throwE "Known remote context, but its actor doesn't match the new Note's recipient"
return $ Just $ remoteActorInbox rs
Right uActor -> do
unless (uActor == l2f hRecip luRecip) $
throwE "Known remote context, but its unlinked actor doesn't match the new Note's recipient"
return Nothing
return (remoteDiscussionDiscuss rd, rdid, minb)
insertRemoteDiscussion
:: InstanceId
-> Bool
-> Text
-> LocalURI
-> LocalURI
-> AppDB (DiscussionId, RemoteDiscussionId, Maybe LocalURI)
insertRemoteDiscussion iid inew hRecip luRecip luContext = do
mrs <-
if inew
then return Nothing
else getBy $ UniqueRemoteActor iid luRecip
did <- insert Discussion
rdid <- insert RemoteDiscussion
{ remoteDiscussionActor = entityKey <$> mrs
, remoteDiscussionInstance = iid
, remoteDiscussionIdent = luContext
, remoteDiscussionDiscuss = did
, remoteDiscussionUnlinkedActor =
case mrs of
Nothing -> Just $ l2f hRecip luRecip
Just _ -> Nothing
}
return (did, rdid, remoteActorInbox . entityVal <$> mrs)
storeRemoteDiscussion
:: Maybe InstanceId
-> Text
-> LocalURI
-> LocalURI
-> ExceptT Text AppDB
(DiscussionId, RemoteDiscussionId, Bool, Maybe LocalURI)
storeRemoteDiscussion miid hRecip luRecip luContext = do
(iid, inew) <-
case miid of
Just i -> return (i, False)
Nothing -> lift $ idAndNew <$> insertBy (Instance hRecip)
if inew
then do
(did, rdid, minb) <- lift $ insertRemoteDiscussion iid True hRecip luRecip luContext
return (did, rdid, True, minb)
else do
erd <- findExistingRemoteDiscussion iid hRecip luRecip luContext
case erd of
Just (did, rdid, minb) -> return (did, rdid, False, minb)
Nothing -> do
(did, rdid, minb) <- lift $ insertRemoteDiscussion iid False hRecip luRecip luContext
return (did, rdid, True, minb)
insertMessage
:: LocalURI
-> ShrIdent
-> PersonId
-> FedURI
-> DiscussionId
-> Maybe FedURI
-> Maybe (Either MessageId FedURI)
-> Text
-> AppDB (LocalMessageId, Doc Activity)
insertMessage luAttrib shrUser pid uContext did muParent meparent content = do
now <- liftIO getCurrentTime
mid <- insert Message
{ messageCreated = now
, messageContent = content
, messageParent =
case meparent of
Just (Left midParent) -> Just midParent
_ -> Nothing
, messageRoot = did
}
lmid <- insert LocalMessage
{ localMessageAuthor = pid
, localMessageRest = mid
, localMessageUnlinkedParent =
case meparent of
Just (Right uParent) -> Just uParent
_ -> Nothing
}
route2local <- getEncodeRouteLocal
lmhid <- encodeKeyHashid lmid
let activity luAct = Doc host Activity
{ activityId = luAct
, activityActor = luAttrib
, activityAudience = aud
, activitySpecific = CreateActivity Create
{ createObject = Note
{ noteId = Just $ route2local $ MessageR shrUser lmhid
, noteAttrib = luAttrib
, noteAudience = aud
, noteReplyTo = Just $ fromMaybe uContext muParent
, noteContext = Just uContext
, notePublished = Just now
, noteContent = content
}
}
}
obid <- insert OutboxItem
{ outboxItemPerson = pid
, outboxItemActivity = PersistJSON $ activity $ LocalURI "" ""
, outboxItemPublished = now
}
obhid <- encodeKeyHashid obid
let luAct = route2local $ OutboxItemR shrUser obhid
doc = activity luAct
update obid [OutboxItemActivity =. PersistJSON doc]
return (lmid, doc)
-}
data LocalTicketRecipient = LocalTicketParticipants | LocalTicketTeam data LocalTicketRecipient = LocalTicketParticipants | LocalTicketTeam
deriving (Eq, Ord) deriving (Eq, Ord)
@ -657,6 +356,44 @@ newtype FedError = FedError Text deriving Show
instance Exception FedError instance Exception FedError
getHttpSign = do
(akey1, akey2, new1) <- liftIO . readTVarIO =<< getsYesod appActorKeys
renderUrl <- getUrlRender
let (keyID, akey) =
if new1
then (renderUrl ActorKey1R, akey1)
else (renderUrl ActorKey2R, akey2)
return $ \ b -> (KeyId $ encodeUtf8 keyID, actorKeySign akey b)
deliverHttp sign doc h luInbox = do
manager <- getsYesod appHttpManager
let inbox = l2f h luInbox
headers = hRequestTarget :| [hHost, hDate, hActivityPubActor]
httpPostAP manager inbox headers sign docActor doc
where
docActor = renderFedURI $ l2f (docHost doc) (activityActor $ docValue doc)
isInstanceErrorHttp (InvalidUrlException _ _) = False
isInstanceErrorHttp (HttpExceptionRequest _ hec) =
case hec of
ResponseTimeout -> True
ConnectionTimeout -> True
InternalException se ->
case fromException se of
Just (HandshakeFailed _) -> True
_ -> False
_ -> False
isInstanceErrorP (APPostErrorSig _) = False
isInstanceErrorP (APPostErrorHTTP he) = isInstanceErrorHttp he
isInstanceErrorG Nothing = False
isInstanceErrorG (Just e) =
case e of
APGetErrorHTTP he -> isInstanceErrorHttp he
APGetErrorJSON _ -> False
APGetErrorContentType _ -> False
-- | Handle a Note submitted by a local user to their outbox. It can be either -- | Handle a Note submitted by a local user to their outbox. It can be either
-- a comment on a local ticket, or a comment on some remote context. Return an -- a comment on a local ticket, or a comment on some remote context. Return an
-- error message if the Note is rejected, otherwise the new 'LocalMessageId'. -- error message if the Note is rejected, otherwise the new 'LocalMessageId'.
@ -742,15 +479,13 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
return (did, meparent, Nothing) return (did, meparent, Nothing)
(lmid, obid, doc) <- lift $ insertMessage luAttrib shrUser pid uContext did muParent meparent content (lmid, obid, doc) <- lift $ insertMessage luAttrib shrUser pid uContext did muParent meparent content
moreRemotes <- deliverLocal obid localRecips mcollections moreRemotes <- deliverLocal obid localRecips mcollections
return (lmid, doc, moreRemotes) remotesHttp <- lift $ deliverRemoteDB obid remoteRecips moreRemotes
(lmid, doc, moreRemotes) <- case result of return (lmid, obid, doc, remotesHttp)
(lmid, obid, doc, remotesHttp) <- case result of
Left (FedError t) -> throwE t Left (FedError t) -> throwE t
Right r -> return r Right r -> return r
-- TODO deliver *async* to remote sharers: remoteRecips and moreRemotes let handleDeliveryError e = logError $ "Outbox POST handler: delivery failed! " <> T.pack (displayException e)
-- lift $ forkHandler handleDeliveryError $ deliverRemoteHttp obid doc remotesHttp
-- doc :: Doc Activity
-- remoteRecips :: [FedURI]
-- moreRemotes :: [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI))]
return lmid return lmid
where where
verifyNothing :: Monad m => Maybe a -> e -> ExceptT e m () verifyNothing :: Monad m => Maybe a -> e -> ExceptT e m ()
@ -992,6 +727,12 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
mergeConcat :: (Ord a, Semigroup b) => [(a, b)] -> [(a, b)] -> [(a, b)] mergeConcat :: (Ord a, Semigroup b) => [(a, b)] -> [(a, b)] -> [(a, b)]
mergeConcat xs ys = map (second sconcat) $ groupWithExtract fst snd $ LO.mergeBy (compare `on` fst) xs ys mergeConcat xs ys = map (second sconcat) $ groupWithExtract fst snd $ LO.mergeBy (compare `on` fst) xs ys
fst3 :: (a, b, c) -> a
fst3 (x, _, _) = x
thd3 :: (a, b, c) -> c
thd3 (_, _, z) = z
-- Deliver to local recipients. For local users, find in DB and deliver. -- Deliver to local recipients. For local users, find in DB and deliver.
-- For local collections, expand them, deliver to local users, and return a -- For local collections, expand them, deliver to local users, and return a
-- list of remote actors found in them. -- list of remote actors found in them.
@ -999,7 +740,7 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
:: OutboxItemId :: OutboxItemId
-> [ShrIdent] -> [ShrIdent]
-> Maybe (SharerId, FollowerSetId) -> Maybe (SharerId, FollowerSetId)
-> ExceptT Text AppDB [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI))] -> ExceptT Text AppDB [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, Maybe UTCTime))]
deliverLocal obid recips mticket = do deliverLocal obid recips mticket = do
recipPids <- traverse getPersonId $ nub recips recipPids <- traverse getPersonId $ nub recips
(morePids, remotes) <- (morePids, remotes) <-
@ -1048,7 +789,7 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
-- instances aren't repeated. Use a custom merge -- instances aren't repeated. Use a custom merge
-- where we can unionBy or LO.unionBy whenever both -- where we can unionBy or LO.unionBy whenever both
-- lists have the same instance. -- lists have the same instance.
, map (second $ NE.nubBy ((==) `on` fst)) $ mergeConcat teamRemotes fsRemotes , map (second $ NE.nubBy ((==) `on` fst3)) $ mergeConcat teamRemotes fsRemotes
) )
lift $ for_ (union recipPids morePids) $ \ pid -> insert_ $ InboxItemLocal pid obid lift $ for_ (union recipPids morePids) $ \ pid -> insert_ $ InboxItemLocal pid obid
return remotes return remotes
@ -1068,11 +809,11 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
case id_ of case id_ of
Left pid -> return pid Left pid -> return pid
Right _gid -> throwE "Local Note addresses a local group" Right _gid -> throwE "Local Note addresses a local group"
groupRemotes :: [(InstanceId, Text, RemoteActorId, LocalURI)] -> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI))] groupRemotes :: [(InstanceId, Text, RemoteActorId, LocalURI, Maybe UTCTime)] -> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, Maybe UTCTime))]
groupRemotes = groupWithExtractBy ((==) `on` fst) fst snd . map toPairs groupRemotes = groupWithExtractBy ((==) `on` fst) fst snd . map toTuples
where where
toPairs (iid, h, rsid, lu) = ((iid, h), (rsid, lu)) toTuples (iid, h, rsid, lu, ms) = ((iid, h), (rsid, lu, ms))
getTicketTeam :: SharerId -> AppDB ([PersonId], [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI))]) getTicketTeam :: SharerId -> AppDB ([PersonId], [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, Maybe UTCTime))])
getTicketTeam sid = do getTicketTeam sid = do
id_ <- getPersonOrGroupId sid id_ <- getPersonOrGroupId sid
(,[]) <$> case id_ of (,[]) <$> case id_ of
@ -1080,7 +821,7 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
Right gid -> Right gid ->
map (groupMemberPerson . entityVal) <$> map (groupMemberPerson . entityVal) <$>
selectList [GroupMemberGroup ==. gid] [] selectList [GroupMemberGroup ==. gid] []
getFollowers :: FollowerSetId -> AppDB ([PersonId], [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI))]) getFollowers :: FollowerSetId -> AppDB ([PersonId], [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, Maybe UTCTime))])
getFollowers fsid = do getFollowers fsid = do
local <- selectList [FollowTarget ==. fsid] [] local <- selectList [FollowTarget ==. fsid] []
remote <- E.select $ E.from $ \ (rf `E.InnerJoin` rs `E.InnerJoin` i) -> do remote <- E.select $ E.from $ \ (rf `E.InnerJoin` rs `E.InnerJoin` i) -> do
@ -1093,12 +834,13 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
, i E.^. InstanceHost , i E.^. InstanceHost
, rs E.^. RemoteActorId , rs E.^. RemoteActorId
, rs E.^. RemoteActorInbox , rs E.^. RemoteActorInbox
, rs E.^. RemoteActorErrorSince
) )
return return
( map (followPerson . entityVal) local ( map (followPerson . entityVal) local
, groupRemotes $ , groupRemotes $
map (\ (E.Value iid, E.Value h, E.Value rsid, E.Value luInbox) -> map (\ (E.Value iid, E.Value h, E.Value rsid, E.Value luInbox, E.Value msince) ->
(iid, h, rsid, luInbox) (iid, h, rsid, luInbox, msince)
) )
remote remote
) )
@ -1118,56 +860,328 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c
Left pid -> lift $ insert_ $ InboxItemLocal pid obid Left pid -> lift $ insert_ $ InboxItemLocal pid obid
Right _gid -> throwE "Local Note addresses a local group" Right _gid -> throwE "Local Note addresses a local group"
-- TODO NEXT: So far, we have 2 groups of remote actors to handle, deliverRemoteDB
-- 'allKnown' and 'stillUnknown'. We could be done with DB and proceed to :: OutboxItemId
-- launch HTTP requests, but we haven't considered something: Some actors -> [FedURI]
-- are known to be unreachable: -> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, Maybe UTCTime))]
-- -> AppDB
-- (1) There are actors we've never reached, for whom there are pending ( [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, DeliveryId))]
-- deliveries , [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
-- (2) There are actors we already fetched, but for whom there are , [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
-- pending deliveries because lately their inboxes are unreachable )
-- deliverRemoteDB obid recips known = do
-- And this brings us to 2 potential things to do:
--
-- (1) Skip the request for some actors, and instead insert a delivery to
-- the DB
-- (2) Insert/update reachability records for actors we try to reach but
-- fail
-- (3) Insert/update reachability records for actors we suddenly succeed
-- to reach
--
-- So, for each RemoteActor, we're going to add a field 'errorSince'.
-- Its type will be Maybe UTCTime, and the meaning is:
--
-- - Nothing: We haven't observed the inbox being down
-- - Just t: The time t denotes a time we couldn't reach the inbox, and
-- since that time all our following attempts failed too
--
-- In this context, inbox error means any result that isn't a 2xx status.
deliverRemote :: Doc Activity -> [FedURI] -> [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI))] -> Handler ()
deliverRemote doc recips known = runDB $ do
recips' <- for (groupByHost recips) $ \ (h, lus) -> do recips' <- for (groupByHost recips) $ \ (h, lus) -> do
let lus' = NE.nub lus let lus' = NE.nub lus
(iid, inew) <- idAndNew <$> insertBy' (Instance h) (iid, inew) <- idAndNew <$> insertBy' (Instance h)
if inew if inew
then return ((iid, h), (Nothing, Just lus')) then return ((iid, h), (Nothing, Nothing, Just lus'))
else do else do
es <- for lus' $ \ lu -> do es <- for lus' $ \ lu -> do
mers <- getBy $ UniqueRemoteActor iid lu ma <- runMaybeT
$ Left <$> MaybeT (getBy $ UniqueRemoteActor iid lu)
<|> Right <$> MaybeT (getBy $ UniqueUnfetchedRemoteActor iid lu)
return $ return $
case mers of case ma of
Just (Entity rsid rs) -> Left (rsid, remoteActorInbox rs) Nothing -> Left lu
Nothing -> Right lu Just e ->
let (newKnown, unknown) = partitionEithers $ NE.toList es Right $ case e of
return ((iid, h), (nonEmpty newKnown, nonEmpty unknown)) Left (Entity raid ra) -> Left (raid, remoteActorInbox ra, remoteActorErrorSince ra)
let moreKnown = mapMaybe (\ (i, (k, _)) -> (i,) <$> k) recips' Right (Entity uraid ura) -> Right (uraid, unfetchedRemoteActorIdent ura, unfetchedRemoteActorSince ura)
stillUnknown = mapMaybe (\ (i, (_, u)) -> (i,) <$> u) recips' let (unknown, newKnown) = partitionEithers $ NE.toList es
-- ^ [ ( (iid, h) , NonEmpty luActor ) ] (fetched, unfetched) = partitionEithers newKnown
return ((iid, h), (nonEmpty fetched, nonEmpty unfetched, nonEmpty unknown))
let moreKnown = mapMaybe (\ (i, (f, _, _)) -> (i,) <$> f) recips'
unfetched = mapMaybe (\ (i, (_, uf, _)) -> (i,) <$> uf) recips'
stillUnknown = mapMaybe (\ (i, (_, _, uk)) -> (i,) <$> uk) recips'
-- TODO see the earlier TODO about merge, it applies here too -- TODO see the earlier TODO about merge, it applies here too
allKnown = map (second $ NE.nubBy ((==) `on` fst)) $ mergeConcat known moreKnown allFetched = map (second $ NE.nubBy ((==) `on` fst3)) $ mergeConcat known moreKnown
-- ^ [ ( (iid, h) , NonEmpty (rsid, inb) ) ] fetchedDeliv <- for allFetched $ \ (i, rs) ->
error "TODO CONTINUE" (i,) <$> insertMany' (\ (raid, _, msince) -> Delivery raid obid $ isNothing msince) rs
unfetchedDeliv <- for unfetched $ \ (i, rs) ->
(i,) <$> insertMany' (\ (uraid, _, msince) -> UnlinkedDelivery uraid obid $ isNothing msince) rs
unknownDeliv <- for stillUnknown $ \ (i, lus) -> do
-- TODO maybe for URA insertion we should do insertUnique?
rs <- insertMany' (\ lu -> UnfetchedRemoteActor (fst i) lu Nothing) lus
(i,) <$> insertMany' (\ (_, uraid) -> UnlinkedDelivery uraid obid True) rs
return
( takeNoError fetchedDeliv
, takeNoError unfetchedDeliv
, map
(second $ NE.map $ \ ((lu, ak), dlk) -> (ak, lu, dlk))
unknownDeliv
)
where where
groupByHost :: [FedURI] -> [(Text, NonEmpty LocalURI)] groupByHost :: [FedURI] -> [(Text, NonEmpty LocalURI)]
groupByHost = groupAllExtract furiHost (snd . f2l) groupByHost = groupAllExtract furiHost (snd . f2l)
insertMany' mk xs = zip' xs <$> insertMany (NE.toList $ mk <$> xs)
where
zip' x y =
case nonEmpty y of
Just y' | length x == length y' -> NE.zip x y'
_ -> error "insertMany' returned different length!"
takeNoError = mapMaybe $ \ (i, rs) -> (i,) <$> nonEmpty (mapMaybe noError $ NE.toList rs)
where
noError ((ak, lu, Nothing), dlk) = Just (ak, lu, dlk)
noError ((_ , _ , Just _ ), _ ) = Nothing
deliverRemoteHttp
:: OutboxItemId
-> Doc Activity
-> ( [((InstanceId, Text), NonEmpty (RemoteActorId, LocalURI, DeliveryId))]
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
, [((InstanceId, Text), NonEmpty (UnfetchedRemoteActorId, LocalURI, UnlinkedDeliveryId))]
)
-> Handler ()
deliverRemoteHttp obid doc (fetched, unfetched, unknown) = do
sign <- getHttpSign
let deliver = deliverHttp sign doc
now <- liftIO getCurrentTime
traverse_ (fork . deliverFetched deliver now) fetched
traverse_ (fork . deliverUnfetched deliver now) unfetched
traverse_ (fork . deliverUnfetched deliver now) unknown
where
fork = forkHandler $ \ e -> logError $ "Outbox POST handler: delivery failed! " <> T.pack (displayException e)
deliverFetched deliver now ((_, h), recips@(r :| rs)) = do
let (raid, luInbox, dlid) = r
e <- deliver h luInbox
let e' = case e of
Left err ->
if isInstanceErrorP err
then Nothing
else Just False
Right _resp -> Just True
case e' of
Nothing -> runDB $ do
let recips' = NE.toList recips
updateWhere [RemoteActorId <-. map fst3 recips', RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
updateWhere [DeliveryId <-. map thd3 recips'] [DeliveryRunning =. False]
Just success -> do
runDB $
if success
then delete dlid
else do
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
update dlid [DeliveryRunning =. False]
for_ rs $ \ (raid, luInbox, dlid) ->
fork $ do
e <- deliver h luInbox
runDB $
case e of
Left _err -> do
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
update dlid [DeliveryRunning =. False]
Right _resp -> delete dlid
deliverUnfetched deliver now ((iid, h), recips@(r :| rs)) = do
let (uraid, luActor, udlid) = r
e <- fetchRemoteActor iid h luActor
let e' = case e of
Left err ->
if isInstanceErrorG err
then Nothing
else Just Nothing
Right era -> Just $ Just era
case e' of
Nothing -> runDB $ do
let recips' = NE.toList recips
updateWhere [UnfetchedRemoteActorId <-. map fst3 recips', UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
updateWhere [UnlinkedDeliveryId <-. map thd3 recips'] [UnlinkedDeliveryRunning =. False]
Just mera -> do
for_ rs $ \ (uraid, luActor, udlid) ->
fork $ do
e <- fetchRemoteActor iid h luActor
case e of
Left _ -> runDB $ do
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
update udlid [UnlinkedDeliveryRunning =. False]
Right (Entity raid ra) -> do
e' <- deliver h $ remoteActorInbox ra
runDB $
case e' of
Left _ -> do
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
delete udlid
insert_ $ Delivery raid obid False
Right _ -> delete udlid
case mera of
Nothing -> runDB $ do
updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
update udlid [UnlinkedDeliveryRunning =. False]
Just (Entity raid ra) -> do
e'' <- deliver h $ remoteActorInbox ra
runDB $
case e'' of
Left _ -> do
updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
delete udlid
insert_ $ Delivery raid obid False
Right _ -> delete udlid
retryOutboxDelivery :: Handler ()
retryOutboxDelivery = do
now <- liftIO getCurrentTime
(udls, dls) <- runDB $ do
-- Get all unlinked deliveries which aren't running already in outbox
-- post handlers
unlinked' <- E.select $ E.from $ \ (udl `E.InnerJoin` ob `E.InnerJoin` ura `E.InnerJoin` i `E.LeftOuterJoin` ra) -> do
E.on $ E.just (ura E.^. UnfetchedRemoteActorInstance) E.==. ra E.?. RemoteActorInstance
E.&&. E.just (ura E.^. UnfetchedRemoteActorIdent) E.==. ra E.?. RemoteActorIdent
E.on $ ura E.^. UnfetchedRemoteActorInstance E.==. i E.^. InstanceId
E.on $ udl E.^. UnlinkedDeliveryRecipient E.==. ura E.^. UnfetchedRemoteActorId
E.on $ udl E.^. UnlinkedDeliveryActivity E.==. ob E.^. OutboxItemId
E.where_ $ udl E.^. UnlinkedDeliveryRunning E.==. E.val False
E.orderBy [E.asc $ ura E.^. UnfetchedRemoteActorInstance, E.asc $ ura E.^. UnfetchedRemoteActorId]
return
( i E.^. InstanceId
, i E.^. InstanceHost
, ura E.^. UnfetchedRemoteActorId
, ura E.^. UnfetchedRemoteActorIdent
, ura E.^. UnfetchedRemoteActorSince
, udl E.^. UnlinkedDeliveryId
, udl E.^. UnlinkedDeliveryActivity
, ob E.^. OutboxItemActivity
, ra E.?. RemoteActorId
)
-- Strip the E.Value wrappers and organize the records for the
-- filtering and grouping we'll need to do
let unlinked = map adaptUnlinked unlinked'
-- Split into found (recipient has been reached) and lonely (recipient
-- hasn't been reached
(found, lonely) = partitionMaybes unlinked
-- Turn the found ones into linked deliveries
deleteWhere [UnlinkedDeliveryId <-. map (unlinkedID . snd) found]
insertMany_ $ map toLinked found
-- We're left with the lonely ones. We'll check which actors have been
-- unreachable for too long, and we'll delete deliveries for them. The
-- rest of the actors we'll try to reach by HTTP.
dropAfter <- getsYesod $ appDropDeliveryAfter . appSettings
let (lonelyOld, lonelyNew) = partitionEithers $ map (decideBySinceUDL dropAfter now) lonely
deleteWhere [UnlinkedDeliveryId <-. lonelyOld]
-- Now let's grab the linked deliveries, and similarly delete old ones
-- and return the rest for HTTP delivery.
linked <- E.select $ E.from $ \ (dl `E.InnerJoin` ra `E.InnerJoin` i `E.InnerJoin` ob) -> do
E.on $ dl E.^. DeliveryActivity E.==. ob E.^. OutboxItemId
E.on $ ra E.^. RemoteActorInstance E.==. i E.^. InstanceId
E.on $ dl E.^. DeliveryRecipient E.==. ra E.^. RemoteActorId
E.where_ $ dl E.^. DeliveryRunning E.==. E.val False
E.orderBy [E.asc $ ra E.^. RemoteActorInstance, E.asc $ ra E.^. RemoteActorId]
return
( i E.^. InstanceId
, i E.^. InstanceHost
, ra E.^. RemoteActorId
, ra E.^. RemoteActorInbox
, ra E.^. RemoteActorErrorSince
, dl E.^. DeliveryId
, ob E.^. OutboxItemActivity
)
let (linkedOld, linkedNew) = partitionEithers $ map (decideBySinceDL dropAfter now . adaptLinked) linked
deleteWhere [DeliveryId <-. linkedOld]
return (groupUnlinked lonelyNew, groupLinked linkedNew)
sign <- getHttpSign
let deliver = deliverHttp sign
waitsDL <- traverse (fork . deliverLinked deliver now) dls
waitsUDL <- traverse (fork . deliverUnlinked deliver now) udls
resultsDL <- sequence waitsDL
unless (and resultsDL) $ logError "Periodic delivery DL error"
resultsUDL <- sequence waitsUDL
unless (and resultsUDL) $ logError "Periodic delivery UDL error"
where
adaptUnlinked
(E.Value iid, E.Value h, E.Value uraid, E.Value luRecip, E.Value since, E.Value udlid, E.Value obid, E.Value act, E.Value mraid) =
( mraid
, ( ( (iid, h)
, ((uraid, luRecip), (udlid, obid, persistJSONValue act))
)
, since
)
)
unlinkedID ((_, (_, (udlid, _, _))), _) = udlid
toLinked (raid, ((_, (_, (_, obid, _))), _)) = Delivery raid obid False
relevant dropAfter now since = addUTCTime dropAfter since > now
decideBySinceUDL dropAfter now (udl@(_, (_, (udlid, _, _))), msince) =
case msince of
Nothing -> Right udl
Just since ->
if relevant dropAfter now since
then Right udl
else Left udlid
groupUnlinked
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
. groupWithExtractBy ((==) `on` fst) fst snd
adaptLinked
(E.Value iid, E.Value h, E.Value raid, E.Value inbox, E.Value since, E.Value dlid, E.Value act) =
( ( (iid, h)
, ((raid, inbox), (dlid, persistJSONValue act))
)
, since
)
decideBySinceDL dropAfter now (dl@(_, (_, (dlid, _))), msince) =
case msince of
Nothing -> Right dl
Just since ->
if relevant dropAfter now since
then Right dl
else Left dlid
groupLinked
= map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd)
. groupWithExtractBy ((==) `on` fst) fst snd
fork action = do
mvar <- liftIO newEmptyMVar
let handle e = do
liftIO $ putMVar mvar False
logError $ "Periodic delivery error! " <> T.pack (displayException e)
forkHandler handle $ do
success <- action
liftIO $ putMVar mvar success
return $ liftIO $ readMVar mvar
deliverLinked deliver now ((_, h), recips) = do
waitsR <- for recips $ \ ((raid, inbox), delivs) -> fork $ do
waitsD <- for delivs $ \ (dlid, doc) -> fork $ do
e <- deliver doc h inbox
case e of
Left _err -> return False
Right _resp -> do
runDB $ delete dlid
return True
results <- sequence waitsD
runDB $
if and results
then update raid [RemoteActorErrorSince =. Nothing]
else if or results
then update raid [RemoteActorErrorSince =. Just now]
else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
return True
results <- sequence waitsR
unless (and results) $
logError $ "Periodic DL delivery error for host " <> h
return True
deliverUnlinked deliver now ((iid, h), recips) = do
waitsR <- for recips $ \ ((uraid, luRecip), delivs) -> fork $ do
e <- fetchRemoteActor iid h luRecip
case e of
Left _ -> runDB $ updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now]
Right (Entity raid ra) -> do
waitsD <- for delivs $ \ (udlid, obid, doc) -> fork $ do
e' <- deliver doc h $ remoteActorInbox ra
case e' of
Left _err -> do
runDB $ do
delete udlid
insert_ $ Delivery raid obid False
return False
Right _resp -> do
runDB $ delete udlid
return True
results <- sequence waitsD
runDB $
if and results
then update raid [RemoteActorErrorSince =. Nothing]
else if or results
then update raid [RemoteActorErrorSince =. Just now]
else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now]
return True
results <- sequence waitsR
unless (and results) $
logError $ "Periodic UDL delivery error for host " <> h
return True

View file

@ -64,6 +64,7 @@ import Yesod.Mail.Send
import qualified Network.HTTP.Signature as S (Algorithm (..)) import qualified Network.HTTP.Signature as S (Algorithm (..))
import Control.Concurrent.ResultShare
import Crypto.PublicVerifKey import Crypto.PublicVerifKey
import Network.FedURI import Network.FedURI
import Web.ActivityAccess import Web.ActivityAccess
@ -104,6 +105,7 @@ data App = App
, appInstanceMutex :: InstanceMutex , appInstanceMutex :: InstanceMutex
, appCapSignKey :: AccessTokenSecretKey , appCapSignKey :: AccessTokenSecretKey
, appHashidsContext :: HashidsContext , appHashidsContext :: HashidsContext
, appActorFetchShare :: ResultShare (HandlerFor App) FedURI (Either (Maybe APGetError) (Entity RemoteActor)) InstanceId
, appActivities :: TVar (Vector (UTCTime, ActivityReport)) , appActivities :: TVar (Vector (UTCTime, ActivityReport))
} }
@ -625,6 +627,7 @@ instance YesodRemoteActorStore App where
siteInstanceRoomMode = appMaxInstanceKeys . appSettings siteInstanceRoomMode = appMaxInstanceKeys . appSettings
siteActorRoomMode = appMaxActorKeys . appSettings siteActorRoomMode = appMaxActorKeys . appSettings
siteRejectOnMaxKeys = appRejectOnMaxKeys . appSettings siteRejectOnMaxKeys = appRejectOnMaxKeys . appSettings
siteActorFetchShare = appActorFetchShare
data ActorDetail = ActorDetail data ActorDetail = ActorDetail
{ actorDetailId :: FedURI { actorDetailId :: FedURI

View file

@ -243,6 +243,8 @@ changes =
"RemoteActor" "RemoteActor"
(Nothing :: Maybe UTCTime) (Nothing :: Maybe UTCTime)
"errorSince" "errorSince"
-- 59
, addEntities model_2019_04_12
] ]
migrateDB :: MonadIO m => ReaderT SqlBackend m (Either Text (Int, Int)) migrateDB :: MonadIO m => ReaderT SqlBackend m (Either Text (Int, Int))

View file

@ -36,6 +36,7 @@ module Vervis.Migration.Model
, FollowerSet2019Generic (..) , FollowerSet2019Generic (..)
, Ticket2019 , Ticket2019
, model_2019_04_11 , model_2019_04_11
, model_2019_04_12
) )
where where
@ -94,3 +95,6 @@ makeEntitiesMigration "2019"
model_2019_04_11 :: [Entity SqlBackend] model_2019_04_11 :: [Entity SqlBackend]
model_2019_04_11 = $(schema "2019_04_11") model_2019_04_11 = $(schema "2019_04_11")
model_2019_04_12 :: [Entity SqlBackend]
model_2019_04_12 = $(schema "2019_04_12")

View file

@ -23,6 +23,9 @@ module Vervis.RemoteActorStore
, keyListedByActorShared , keyListedByActorShared
, VerifKeyDetail (..) , VerifKeyDetail (..)
, addVerifKey , addVerifKey
, actorFetchShareSettings
, fetchRemoteActor
, deleteUnusedURAs
) )
where where
@ -46,11 +49,12 @@ import Database.Persist
import Database.Persist.Sql import Database.Persist.Sql
import Network.HTTP.Client import Network.HTTP.Client
import UnliftIO.MVar (withMVar) import UnliftIO.MVar (withMVar)
import Yesod.Core hiding (logError) import Yesod.Core hiding (logWarn, logError)
import Yesod.Persist.Core import Yesod.Persist.Core
import qualified Data.HashMap.Strict as M import qualified Data.HashMap.Strict as M
import qualified Data.Text as T import qualified Data.Text as T
import qualified Database.Esqueleto as E
import Crypto.PublicVerifKey import Crypto.PublicVerifKey
import Database.Persist.Local import Database.Persist.Local
@ -78,7 +82,7 @@ class Yesod site => YesodRemoteActorStore site where
siteActorRoomMode :: site -> Maybe Int siteActorRoomMode :: site -> Maybe Int
siteRejectOnMaxKeys :: site -> Bool siteRejectOnMaxKeys :: site -> Bool
siteActorFetchShare :: site -> ResultShare (HandlerFor site) FedURI (Either String (Entity RemoteActor)) InstanceId siteActorFetchShare :: site -> ResultShare (HandlerFor site) FedURI (Either (Maybe APGetError) (Entity RemoteActor)) InstanceId
-- TODO this is copied from stm-2.5, remove when we upgrade LTS -- TODO this is copied from stm-2.5, remove when we upgrade LTS
stateTVar :: TVar s -> (s -> (a, s)) -> STM a stateTVar :: TVar s -> (s -> (a, s)) -> STM a
@ -452,7 +456,7 @@ actorFetchShareSettings
, BaseBackend (YesodPersistBackend site) ~ SqlBackend , BaseBackend (YesodPersistBackend site) ~ SqlBackend
, HasHttpManager site , HasHttpManager site
) )
=> ResultShareSettings (HandlerFor site) FedURI (Either String (Entity RemoteActor)) InstanceId => ResultShareSettings (HandlerFor site) FedURI (Either (Maybe APGetError) (Entity RemoteActor)) InstanceId
actorFetchShareSettings = ResultShareSettings actorFetchShareSettings = ResultShareSettings
{ resultShareFork = forkHandler $ \ e -> logError $ "ActorFetchShare action failed! " <> T.pack (displayException e) { resultShareFork = forkHandler $ \ e -> logError $ "ActorFetchShare action failed! " <> T.pack (displayException e)
, resultShareAction = \ u iid -> do , resultShareAction = \ u iid -> do
@ -462,7 +466,7 @@ actorFetchShareSettings = ResultShareSettings
Just ers -> return $ Right ers Just ers -> return $ Right ers
Nothing -> do Nothing -> do
manager <- getsYesod getHttpManager manager <- getsYesod getHttpManager
eactor <- fetchAPID manager actorId h lu eactor <- fetchAPID' manager actorId h lu
for eactor $ \ actor -> runDB $ for eactor $ \ actor -> runDB $
insertEntity $ RemoteActor lu iid (actorInbox actor) Nothing insertEntity $ RemoteActor lu iid (actorInbox actor) Nothing
} }
@ -473,7 +477,7 @@ fetchRemoteActor
, BaseBackend (YesodPersistBackend site) ~ SqlBackend , BaseBackend (YesodPersistBackend site) ~ SqlBackend
, YesodRemoteActorStore site , YesodRemoteActorStore site
) )
=> InstanceId -> Text -> LocalURI -> HandlerFor site (Either String (Entity RemoteActor)) => InstanceId -> Text -> LocalURI -> HandlerFor site (Either (Maybe APGetError) (Entity RemoteActor))
fetchRemoteActor iid host luActor = do fetchRemoteActor iid host luActor = do
mers <- runDB $ getBy $ UniqueRemoteActor iid luActor mers <- runDB $ getBy $ UniqueRemoteActor iid luActor
case mers of case mers of
@ -481,3 +485,12 @@ fetchRemoteActor iid host luActor = do
Nothing -> do Nothing -> do
afs <- getsYesod siteActorFetchShare afs <- getsYesod siteActorFetchShare
runShared afs (l2f host luActor) iid runShared afs (l2f host luActor) iid
deleteUnusedURAs = do
uraids <- E.select $ E.from $ \ ura -> do
E.where_ $ E.notExists $ E.from $ \ udl ->
E.where_ $ ura E.^. UnfetchedRemoteActorId E.==. udl E.^. UnlinkedDeliveryRecipient
return $ ura E.^. UnfetchedRemoteActorId
unless (null uraids) $ do
deleteWhere [UnfetchedRemoteActorId <-. map E.unValue uraids]
logWarn $ T.pack (show $ length uraids) <> " unused URAs deleted"

View file

@ -53,6 +53,7 @@ module Web.ActivityPub
, httpPostAP , httpPostAP
, Fetched (..) , Fetched (..)
, fetchAPID , fetchAPID
, fetchAPID'
, keyListedByActor , keyListedByActor
, fetchUnknownKey , fetchUnknownKey
, fetchKnownPersonalKey , fetchKnownPersonalKey
@ -680,8 +681,11 @@ data Fetched = Fetched
-- we received. -- we received.
} }
fetchAP' :: (MonadIO m, FromJSON a) => Manager -> FedURI -> ExceptT APGetError m a
fetchAP' m u = ExceptT $ second responseBody <$> httpGetAP m u
fetchAP :: (MonadIO m, FromJSON a) => Manager -> FedURI -> ExceptT String m a fetchAP :: (MonadIO m, FromJSON a) => Manager -> FedURI -> ExceptT String m a
fetchAP m u = ExceptT $ bimap displayException responseBody <$> httpGetAP m u fetchAP m u = withExceptT displayException $ fetchAP' m u
{- {-
fetchAPH :: (MonadIO m, ActivityPub a) => Manager -> Text -> LocalURI -> ExceptT String m a fetchAPH :: (MonadIO m, ActivityPub a) => Manager -> Text -> LocalURI -> ExceptT String m a
@ -692,12 +696,18 @@ fetchAPH m h lu = do
else throwE "Object @id URI's host doesn't match the URI we fetched" else throwE "Object @id URI's host doesn't match the URI we fetched"
-} -}
fetchAPID :: (MonadIO m, ActivityPub a) => Manager -> (a -> LocalURI) -> Text -> LocalURI -> m (Either String a) fetchAPID' :: (MonadIO m, ActivityPub a) => Manager -> (a -> LocalURI) -> Text -> LocalURI -> m (Either (Maybe APGetError) a)
fetchAPID m getId h lu = runExceptT $ do fetchAPID' m getId h lu = runExceptT $ do
Doc h' v <- fetchAP m $ l2f h lu Doc h' v <- withExceptT Just $ fetchAP' m $ l2f h lu
if h == h' && getId v == lu if h == h' && getId v == lu
then return v then return v
else throwE "Object @id doesn't match the URI we fetched" else throwE Nothing
fetchAPID :: (MonadIO m, ActivityPub a) => Manager -> (a -> LocalURI) -> Text -> LocalURI -> m (Either String a)
fetchAPID m getId h lu = first showError <$> fetchAPID' m getId h lu
where
showError Nothing = "Object @id doesn't match the URI we fetched"
showError (Just e) = displayException e
fetchAPIDOrH fetchAPIDOrH
:: (MonadIO m, ActivityPub a, ActivityPub b) :: (MonadIO m, ActivityPub a, ActivityPub b)

View file

@ -337,6 +337,7 @@ library
, time-interval , time-interval
, time-interval-aeson , time-interval-aeson
, time-units , time-units
, tls
, transformers , transformers
-- probably should be replaced with lenses once I learn -- probably should be replaced with lenses once I learn
, tuple , tuple