From f37b9b3f52b2b9ed0209c4cf3f2943cf5f8f3962 Mon Sep 17 00:00:00 2001 From: fr33domlover Date: Thu, 18 Apr 2019 10:38:01 +0000 Subject: [PATCH] Run the delivery worker priodically, settings control how often to run --- config/settings-default.yaml | 5 ++ src/Control/Concurrent/Local.hs | 13 +++ src/Control/Concurrent/ResultShare.hs | 36 +++----- src/Vervis/ActorKey.hs | 25 ++---- src/Vervis/Application.hs | 14 ++- src/Vervis/Federation.hs | 69 ++++++++------ src/Vervis/Foundation.hs | 23 +++-- src/Vervis/Handler/Inbox.hs | 21 +++-- src/Vervis/RemoteActorStore.hs | 59 +++++++----- src/Vervis/Settings.hs | 3 + src/Yesod/MonadSite.hs | 125 ++++++++++++++++++++++++++ vervis.cabal | 1 + 12 files changed, 285 insertions(+), 109 deletions(-) create mode 100644 src/Yesod/MonadSite.hs diff --git a/config/settings-default.yaml b/config/settings-default.yaml index d2f38a4..1d8ef43 100644 --- a/config/settings-default.yaml +++ b/config/settings-default.yaml @@ -155,3 +155,8 @@ reject-on-max-keys: true drop-delivery-after: amount: 25 unit: weeks + +# How often to retry failed deliveries +retry-delivery-every: + amount: 1 + unit: hours diff --git a/src/Control/Concurrent/Local.hs b/src/Control/Concurrent/Local.hs index 2229148..97b89ec 100644 --- a/src/Control/Concurrent/Local.hs +++ b/src/Control/Concurrent/Local.hs @@ -15,13 +15,17 @@ module Control.Concurrent.Local ( forkCheck + , periodically ) where import Prelude import Control.Concurrent +import Control.Monad +import Control.Monad.IO.Class import Data.Functor (void) +import Data.Time.Interval -- | Like 'forkIO', but if the new thread terminates with an exception, -- re-throw it in the current thread. @@ -29,3 +33,12 @@ forkCheck :: IO () -> IO () forkCheck run = do tid <- myThreadId void $ forkFinally run $ either (throwTo tid) (const $ return ()) + +periodically :: MonadIO m => TimeInterval -> m () -> m () +periodically interval action = + let micros = microseconds interval + in if 0 < micros && micros <= toInteger (maxBound :: Int) + then + let micros' = fromInteger micros + in forever $ liftIO (threadDelay micros') >> action + else error $ "periodically: interval out of range: " ++ show micros diff --git a/src/Control/Concurrent/ResultShare.hs b/src/Control/Concurrent/ResultShare.hs index 540c77f..55f8eea 100644 --- a/src/Control/Concurrent/ResultShare.hs +++ b/src/Control/Concurrent/ResultShare.hs @@ -32,8 +32,7 @@ -- * It could be nice to provide defaults for plain IO and for UnliftIO -- * The action is constant, could make it more flexible module Control.Concurrent.ResultShare - ( ResultShareSettings (..) - , ResultShare () + ( ResultShare () , newResultShare , runShared ) @@ -41,7 +40,7 @@ where import Prelude -import Control.Concurrent.MVar +import Control.Concurrent import Control.Concurrent.STM.TVar import Control.Monad import Control.Monad.IO.Class @@ -51,22 +50,16 @@ import Data.HashMap.Strict (HashMap) import qualified Data.HashMap.Strict as M -data ResultShareSettings m k v a = ResultShareSettings - { resultShareFork :: m () -> m () - , resultShareAction :: k -> a -> m v - } - -data ResultShare m k v a = ResultShare +data ResultShare k v a = ResultShare { _rsMap :: TVar (HashMap k (MVar v)) - , _rsFork :: m () -> m () - , _rsAction :: k -> a -> m v + , _rsAction :: k -> a -> IO v } newResultShare - :: MonadIO n => ResultShareSettings m k v a -> n (ResultShare m k v a) -newResultShare (ResultShareSettings fork action) = do + :: MonadIO m => (k -> a -> IO v) -> m (ResultShare k v a) +newResultShare action = do tvar <- liftIO $ newTVarIO M.empty - return $ ResultShare tvar fork action + return $ ResultShare tvar action -- TODO this is copied from stm-2.5, remove when we upgrade LTS stateTVar :: TVar s -> (s -> (a, s)) -> STM a @@ -77,9 +70,9 @@ stateTVar var f = do return a runShared - :: (MonadIO m, Eq k, Hashable k) => ResultShare m k v a -> k -> a -> m v -runShared (ResultShare tvar fork action) key param = do - (mvar, new) <- liftIO $ do + :: (MonadIO m, Eq k, Hashable k) => ResultShare k v a -> k -> a -> m v +runShared (ResultShare tvar action) key param = liftIO $ do + (mvar, new) <- do existing <- M.lookup key <$> readTVarIO tvar case existing of Just v -> return (v, False) @@ -89,9 +82,8 @@ runShared (ResultShare tvar fork action) key param = do case M.lookup key m of Just v' -> ((v', False), m) Nothing -> ((v , True) , M.insert key v m) - when new $ fork $ do + when new $ void $ forkIO $ do result <- action key param - liftIO $ do - atomically $ modifyTVar' tvar $ M.delete key - putMVar mvar result - liftIO $ readMVar mvar + atomically $ modifyTVar' tvar $ M.delete key + putMVar mvar result + readMVar mvar diff --git a/src/Vervis/ActorKey.hs b/src/Vervis/ActorKey.hs index 82bab7d..784022b 100644 --- a/src/Vervis/ActorKey.hs +++ b/src/Vervis/ActorKey.hs @@ -44,6 +44,8 @@ import qualified Data.ByteString as B (writeFile, readFile) import Crypto.PublicVerifKey import Data.KeyFile +import Control.Concurrent.Local + -- | Ed25519 signing key, we generate it on the server and use for signing. We -- also make its public key available to whoever wishes to verify our -- signatures. @@ -157,22 +159,13 @@ generateActorKey = mk <$> generateSecretKey -- storing them in a 'TVar'. It manages a pait of keys, and each time it toggles -- which key gets rotated. actorKeyRotator :: TimeInterval -> TVar (ActorKey, ActorKey, Bool) -> IO () -actorKeyRotator interval keys = - let micros = microseconds interval - in if 0 < micros && micros <= toInteger (maxBound :: Int) - then - let micros' = fromInteger micros - in forever $ do - threadDelay micros' - fresh <- generateActorKey - atomically $ - modifyTVar' keys $ \ (k1, k2, new1) -> - if new1 - then (k1 , fresh, False) - else (fresh, k2 , True) - else - error $ - "actorKeyRotator: interval out of range: " ++ show micros +actorKeyRotator interval keys = periodically interval $ do + fresh <- generateActorKey + atomically $ + modifyTVar' keys $ \ (k1, k2, new1) -> + if new1 + then (k1 , fresh, False) + else (fresh, k2 , True) actorKeyPublicBin :: ActorKey -> PublicVerifKey actorKeyPublicBin = fromEd25519 . actorKeyPublic diff --git a/src/Vervis/Application.hs b/src/Vervis/Application.hs index e716faa..7b8c440 100644 --- a/src/Vervis/Application.hs +++ b/src/Vervis/Application.hs @@ -56,13 +56,13 @@ import Yesod.Mail.Send (runMailer) import qualified Data.Text as T (unpack) import qualified Data.HashMap.Strict as M (empty) -import Control.Concurrent.Local (forkCheck) - import Database.Persist.Schema.PostgreSQL (schemaBackend) import Control.Concurrent.ResultShare import Data.KeyFile +import Yesod.MonadSite +import Control.Concurrent.Local import Web.Hashids.Local import Vervis.ActorKey (generateActorKey, actorKeyRotator) @@ -128,7 +128,7 @@ makeFoundation appSettings = do appInstanceMutex <- newInstanceMutex - appActorFetchShare <- newResultShare actorFetchShareSettings + appActorFetchShare <- newResultShare actorFetchShareAction appActivities <- newTVarIO mempty @@ -239,6 +239,11 @@ actorKeyPeriodicRotator :: App -> IO () actorKeyPeriodicRotator app = actorKeyRotator (appActorKeyRotation $ appSettings app) (appActorKeys app) +deliveryRunner :: App -> IO () +deliveryRunner app = + let interval = appDeliveryRetryFreq $ appSettings app + in runWorker (periodically interval retryOutboxDelivery) app + sshServer :: App -> IO () sshServer foundation = runSsh @@ -280,6 +285,9 @@ appMain = do -- Run actor signature key periodic generation thread forkCheck $ actorKeyPeriodicRotator foundation + -- Run periodic activity delivery retry runner + forkCheck $ deliveryRunner foundation + -- Run SSH server forkCheck $ sshServer foundation diff --git a/src/Vervis/Federation.hs b/src/Vervis/Federation.hs index 944444e..ed61372 100644 --- a/src/Vervis/Federation.hs +++ b/src/Vervis/Federation.hs @@ -34,6 +34,7 @@ import Control.Monad.Trans.Maybe import Control.Monad.Trans.Reader import Data.Aeson (Object) import Data.Bifunctor +import Data.ByteString (ByteString) import Data.Either import Data.Foldable import Data.Function @@ -49,6 +50,7 @@ import Data.Tuple import Database.Persist hiding (deleteBy) import Database.Persist.Sql hiding (deleteBy) import Network.HTTP.Client +import Network.HTTP.Signature import Network.HTTP.Types.Header import Network.HTTP.Types.URI import Network.TLS @@ -69,6 +71,7 @@ import Web.ActivityPub import Yesod.Auth.Unverified import Yesod.FedURI import Yesod.Hashids +import Yesod.MonadSite import Data.Either.Local import Data.List.Local @@ -356,17 +359,26 @@ newtype FedError = FedError Text deriving Show instance Exception FedError +getHttpSign + :: (MonadSite m, SiteEnv m ~ App) => m (ByteString -> (KeyId, Signature)) getHttpSign = do - (akey1, akey2, new1) <- liftIO . readTVarIO =<< getsYesod appActorKeys - renderUrl <- getUrlRender + (akey1, akey2, new1) <- liftIO . readTVarIO =<< asksSite appActorKeys + renderUrl <- askUrlRender let (keyID, akey) = if new1 then (renderUrl ActorKey1R, akey1) else (renderUrl ActorKey2R, akey2) return $ \ b -> (KeyId $ encodeUtf8 keyID, actorKeySign akey b) +deliverHttp + :: (MonadSite m, SiteEnv m ~ App) + => (ByteString -> (KeyId, Signature)) + -> Doc Activity + -> Text + -> LocalURI + -> m (Either APPostError (Response ())) deliverHttp sign doc h luInbox = do - manager <- getsYesod appHttpManager + manager <- asksSite appHttpManager let inbox = l2f h luInbox headers = hRequestTarget :| [hHost, hDate, hActivityPubActor] httpPostAP manager inbox headers sign docActor doc @@ -965,11 +977,12 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c let (uraid, luActor, udlid) = r e <- fetchRemoteActor iid h luActor let e' = case e of - Left err -> + Left err -> Just Nothing + Right (Left err) -> if isInstanceErrorG err then Nothing else Just Nothing - Right era -> Just $ Just era + Right (Right era) -> Just $ Just era case e' of Nothing -> runDB $ do let recips' = NE.toList recips @@ -980,10 +993,7 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c fork $ do e <- fetchRemoteActor iid h luActor case e of - Left _ -> runDB $ do - updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now] - update udlid [UnlinkedDeliveryRunning =. False] - Right (Entity raid ra) -> do + Right (Right (Entity raid ra)) -> do e' <- deliver h $ remoteActorInbox ra runDB $ case e' of @@ -992,6 +1002,9 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c delete udlid insert_ $ Delivery raid obid False Right _ -> delete udlid + _ -> runDB $ do + updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now] + update udlid [UnlinkedDeliveryRunning =. False] case mera of Nothing -> runDB $ do updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now] @@ -1006,10 +1019,10 @@ handleOutboxNote host (Note mluNote luAttrib aud muParent muContext mpublished c insert_ $ Delivery raid obid False Right _ -> delete udlid -retryOutboxDelivery :: Handler () +retryOutboxDelivery :: Worker () retryOutboxDelivery = do - now <- liftIO getCurrentTime - (udls, dls) <- runDB $ do + now <- liftIO $ getCurrentTime + (udls, dls) <- runSiteDB $ do -- Get all unlinked deliveries which aren't running already in outbox -- post handlers unlinked' <- E.select $ E.from $ \ (udl `E.InnerJoin` ob `E.InnerJoin` ura `E.InnerJoin` i `E.LeftOuterJoin` ra) -> do @@ -1043,7 +1056,7 @@ retryOutboxDelivery = do -- We're left with the lonely ones. We'll check which actors have been -- unreachable for too long, and we'll delete deliveries for them. The -- rest of the actors we'll try to reach by HTTP. - dropAfter <- getsYesod $ appDropDeliveryAfter . appSettings + dropAfter <- lift $ asksSite $ appDropDeliveryAfter . appSettings let (lonelyOld, lonelyNew) = partitionEithers $ map (decideBySinceUDL dropAfter now) lonely deleteWhere [UnlinkedDeliveryId <-. lonelyOld] -- Now let's grab the linked deliveries, and similarly delete old ones @@ -1115,14 +1128,14 @@ retryOutboxDelivery = do = map (second $ groupWithExtractBy1 ((==) `on` fst) fst snd) . groupWithExtractBy ((==) `on` fst) fst snd fork action = do - mvar <- liftIO newEmptyMVar - let handle e = do - liftIO $ putMVar mvar False - logError $ "Periodic delivery error! " <> T.pack (displayException e) - forkHandler handle $ do - success <- action - liftIO $ putMVar mvar success - return $ liftIO $ readMVar mvar + wait <- asyncSite action + return $ do + result <- wait + case result of + Left e -> do + logError $ "Periodic delivery error! " <> T.pack (displayException e) + return False + Right success -> return success deliverLinked deliver now ((_, h), recips) = do waitsR <- for recips $ \ ((raid, inbox), delivs) -> fork $ do waitsD <- for delivs $ \ (dlid, doc) -> fork $ do @@ -1130,10 +1143,10 @@ retryOutboxDelivery = do case e of Left _err -> return False Right _resp -> do - runDB $ delete dlid + runSiteDB $ delete dlid return True results <- sequence waitsD - runDB $ + runSiteDB $ if and results then update raid [RemoteActorErrorSince =. Nothing] else if or results @@ -1148,26 +1161,26 @@ retryOutboxDelivery = do waitsR <- for recips $ \ ((uraid, luRecip), delivs) -> fork $ do e <- fetchRemoteActor iid h luRecip case e of - Left _ -> runDB $ updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now] - Right (Entity raid ra) -> do + Right (Right (Entity raid ra)) -> do waitsD <- for delivs $ \ (udlid, obid, doc) -> fork $ do e' <- deliver doc h $ remoteActorInbox ra case e' of Left _err -> do - runDB $ do + runSiteDB $ do delete udlid insert_ $ Delivery raid obid False return False Right _resp -> do - runDB $ delete udlid + runSiteDB $ delete udlid return True results <- sequence waitsD - runDB $ + runSiteDB $ if and results then update raid [RemoteActorErrorSince =. Nothing] else if or results then update raid [RemoteActorErrorSince =. Just now] else updateWhere [RemoteActorId ==. raid, RemoteActorErrorSince ==. Nothing] [RemoteActorErrorSince =. Just now] + _ -> runSiteDB $ updateWhere [UnfetchedRemoteActorId ==. uraid, UnfetchedRemoteActorSince ==. Nothing] [UnfetchedRemoteActorSince =. Just now] return True results <- sequence waitsR unless (and results) $ diff --git a/src/Vervis/Foundation.hs b/src/Vervis/Foundation.hs index 1d94e2c..7d8db78 100644 --- a/src/Vervis/Foundation.hs +++ b/src/Vervis/Foundation.hs @@ -32,6 +32,7 @@ import Data.PEM (pemContent) import Data.Text.Encoding (decodeUtf8') import Data.Time.Interval (TimeInterval, fromTimeUnit, toTimeUnit) import Data.Time.Units (Second, Minute, Day) +import Database.Persist.Postgresql import Database.Persist.Sql (ConnectionPool, runSqlPool) import Graphics.SVGFonts.ReadFont (PreparedFont) import Network.HTTP.Client @@ -70,6 +71,7 @@ import Network.FedURI import Web.ActivityAccess import Web.ActivityPub import Yesod.Hashids +import Yesod.MonadSite import Text.Email.Local import Text.Jasmine.Local (discardm) @@ -105,7 +107,7 @@ data App = App , appInstanceMutex :: InstanceMutex , appCapSignKey :: AccessTokenSecretKey , appHashidsContext :: HashidsContext - , appActorFetchShare :: ResultShare (HandlerFor App) FedURI (Either (Maybe APGetError) (Entity RemoteActor)) InstanceId + , appActorFetchShare :: ActorFetchShare App , appActivities :: TVar (Vector (UTCTime, ActivityReport)) } @@ -135,14 +137,23 @@ type Form a = Html -> MForm (HandlerT App IO) (FormResult a, Widget) type AppDB = YesodDB App +type Worker = WorkerFor App + +type WorkerDB = PersistConfigBackend (SitePersistConfig App) Worker + +instance Site App where + type SitePersistConfig App = PostgresConf + siteApproot = ("https://" <>) . appInstanceHost . appSettings + sitePersistConfig = appDatabaseConf . appSettings + sitePersistPool = appConnPool + siteLogger = appLogger + -- Please see the documentation for the Yesod typeclass. There are a number -- of settings which can be configured by overriding methods here. instance Yesod App where -- Controls the base of generated URLs. For more information on modifying, -- see: https://github.com/yesodweb/yesod/wiki/Overriding-approot - approot = ApprootMaster $ mkroot . appInstanceHost . appSettings - where - mkroot h = "https://" <> h + approot = ApprootMaster siteApproot -- Store session data on the client in encrypted cookies, -- default session idle timeout is 120 minutes @@ -445,9 +456,7 @@ instance Yesod App where -- How to run database actions. instance YesodPersist App where type YesodPersistBackend App = SqlBackend - runDB action = do - master <- getYesod - runSqlPool action $ appConnPool master + runDB = runSiteDB instance YesodPersistRunner App where getDBRunner = defaultGetDBRunner appConnPool diff --git a/src/Vervis/Handler/Inbox.hs b/src/Vervis/Handler/Inbox.hs index ece0859..5403523 100644 --- a/src/Vervis/Handler/Inbox.hs +++ b/src/Vervis/Handler/Inbox.hs @@ -296,15 +296,18 @@ postOutboxR shr = do iid <- runDB $ either entityKey id <$> insertBy' (Instance h) result <- fetchRemoteActor iid h lto case result of - Left err -> do - setMessage $ toHtml $ T.concat - [ "Tried to fetch recipient actor <" - , renderFedURI $ l2f h lto - , "> and got an error: " - , T.pack (show err) - ] - return Nothing - Right (Entity _ ra) -> return $ Just $ remoteActorInbox ra + Left err -> setErrorMsg $ displayException err + Right (Left err) -> setErrorMsg $ show err + Right (Right (Entity _ ra)) -> return $ Just $ remoteActorInbox ra + where + setErrorMsg err = do + setMessage $ toHtml $ T.concat + [ "Tried to fetch recipient actor <" + , renderFedURI $ l2f h lto + , "> and got an error: " + , T.pack err + ] + return Nothing getActorKey :: ((ActorKey, ActorKey, Bool) -> ActorKey) -> Route App -> Handler TypedContent getActorKey choose route = selectRep $ provideAP $ do diff --git a/src/Vervis/RemoteActorStore.hs b/src/Vervis/RemoteActorStore.hs index d4c1123..c17db26 100644 --- a/src/Vervis/RemoteActorStore.hs +++ b/src/Vervis/RemoteActorStore.hs @@ -18,12 +18,13 @@ module Vervis.RemoteActorStore ( InstanceMutex () , newInstanceMutex + , ActorFetchShare , YesodRemoteActorStore (..) , withHostLock , keyListedByActorShared , VerifKeyDetail (..) , addVerifKey - , actorFetchShareSettings + , actorFetchShareAction , fetchRemoteActor , deleteUnusedURAs ) @@ -31,6 +32,7 @@ where import Prelude +import Control.Concurrent (forkIO) import Control.Concurrent.MVar (MVar, newMVar) import Control.Concurrent.ResultShare import Control.Concurrent.STM.TVar @@ -60,6 +62,7 @@ import Crypto.PublicVerifKey import Database.Persist.Local import Network.FedURI import Web.ActivityPub +import Yesod.MonadSite import Vervis.Model @@ -76,13 +79,15 @@ data RoomMode = RoomModeInstant | RoomModeCached RoomModeDB +type ActorFetchShare site = ResultShare FedURI (Either SomeException (Either (Maybe APGetError) (Entity RemoteActor))) (site, InstanceId) + class Yesod site => YesodRemoteActorStore site where siteInstanceMutex :: site -> InstanceMutex siteInstanceRoomMode :: site -> Maybe Int siteActorRoomMode :: site -> Maybe Int siteRejectOnMaxKeys :: site -> Bool - siteActorFetchShare :: site -> ResultShare (HandlerFor site) FedURI (Either (Maybe APGetError) (Entity RemoteActor)) InstanceId + siteActorFetchShare :: site -> ActorFetchShare site -- TODO this is copied from stm-2.5, remove when we upgrade LTS stateTVar :: TVar s -> (s -> (a, s)) -> STM a @@ -454,42 +459,48 @@ addVerifKey h uinb vkd = lift $ insert_ $ VerifKey luKey iid mexpires key (Just rsid) return (iid, rsid) -actorFetchShareSettings - :: ( YesodPersist site +actorFetchShareAction + :: ( Yesod site + , YesodPersist site , PersistUniqueWrite (YesodPersistBackend site) , BaseBackend (YesodPersistBackend site) ~ SqlBackend , HasHttpManager site + , Site site + , PersistConfigPool (SitePersistConfig site) ~ ConnectionPool + , PersistConfigBackend (SitePersistConfig site) ~ SqlPersistT ) - => ResultShareSettings (HandlerFor site) FedURI (Either (Maybe APGetError) (Entity RemoteActor)) InstanceId -actorFetchShareSettings = ResultShareSettings - { resultShareFork = forkHandler $ \ e -> logError $ "ActorFetchShare action failed! " <> T.pack (displayException e) - , resultShareAction = \ u iid -> do - let (h, lu) = f2l u - mers <- runDB $ getBy $ UniqueRemoteActor iid lu - case mers of - Just ers -> return $ Right ers - Nothing -> do - manager <- getsYesod getHttpManager - eactor <- fetchAPID' manager actorId h lu - for eactor $ \ actor -> runDB $ - let ra = RemoteActor lu iid (actorInbox actor) Nothing - in either id (flip Entity ra) <$> insertBy' ra - } + => FedURI -> (site, InstanceId) -> IO (Either SomeException (Either (Maybe APGetError) (Entity RemoteActor))) +actorFetchShareAction u (site, iid) = try $ flip runWorkerT site $ do + let (h, lu) = f2l u + mers <- runSiteDB $ getBy $ UniqueRemoteActor iid lu + case mers of + Just ers -> return $ Right ers + Nothing -> do + manager <- asksSite getHttpManager + eactor <- fetchAPID' manager actorId h lu + for eactor $ \ actor -> runSiteDB $ + let ra = RemoteActor lu iid (actorInbox actor) Nothing + in either id (flip Entity ra) <$> insertBy' ra fetchRemoteActor :: ( YesodPersist site , PersistUniqueRead (YesodPersistBackend site) , BaseBackend (YesodPersistBackend site) ~ SqlBackend , YesodRemoteActorStore site + , MonadSite m + , SiteEnv m ~ site + , Site site + , PersistConfigPool (SitePersistConfig site) ~ ConnectionPool + , PersistConfigBackend (SitePersistConfig site) ~ SqlPersistT ) - => InstanceId -> Text -> LocalURI -> HandlerFor site (Either (Maybe APGetError) (Entity RemoteActor)) + => InstanceId -> Text -> LocalURI -> m (Either SomeException (Either (Maybe APGetError) (Entity RemoteActor))) fetchRemoteActor iid host luActor = do - mers <- runDB $ getBy $ UniqueRemoteActor iid luActor + mers <- runSiteDB $ getBy $ UniqueRemoteActor iid luActor case mers of - Just ers -> return $ Right ers + Just ers -> return $ Right $ Right ers Nothing -> do - afs <- getsYesod siteActorFetchShare - runShared afs (l2f host luActor) iid + site <- askSite + liftIO $ runShared (siteActorFetchShare site) (l2f host luActor) (site, iid) deleteUnusedURAs = do uraids <- E.select $ E.from $ \ ura -> do diff --git a/src/Vervis/Settings.hs b/src/Vervis/Settings.hs index 769b29d..38e6911 100644 --- a/src/Vervis/Settings.hs +++ b/src/Vervis/Settings.hs @@ -146,6 +146,8 @@ data AppSettings = AppSettings -- time, we stop trying to deliver and we remove them from follower lists -- of local actors. , appDropDeliveryAfter :: NominalDiffTime + -- | How much time to wait between retries of failed deliveries. + , appDeliveryRetryFreq :: TimeInterval } instance FromJSON AppSettings where @@ -193,6 +195,7 @@ instance FromJSON AppSettings where appHashidsSaltFile <- o .: "hashids-salt-file" appRejectOnMaxKeys <- o .: "reject-on-max-keys" appDropDeliveryAfter <- ndt <$> o .: "drop-delivery-after" + appDeliveryRetryFreq <- interval <$> o .: "retry-delivery-every" return AppSettings {..} where diff --git a/src/Yesod/MonadSite.hs b/src/Yesod/MonadSite.hs new file mode 100644 index 0000000..b84b1c4 --- /dev/null +++ b/src/Yesod/MonadSite.hs @@ -0,0 +1,125 @@ +{- This file is part of Vervis. + - + - Written in 2019 by fr33domlover . + - + - ♡ Copying is an act of love. Please copy, reuse and share. + - + - The author(s) have dedicated all copyright and related and neighboring + - rights to this software to the public domain worldwide. This software is + - distributed without any warranty. + - + - You should have received a copy of the CC0 Public Domain Dedication along + - with this software. If not, see + - . + -} + +-- | A typeclass providing a subset of what 'HandlerFor' does, allowing to +-- write monadic actions that can run both inside a request handler and outside +-- of the web server context. +module Yesod.MonadSite + ( Site (..) + , MonadSite (..) + , asksSite + , runSiteDB + , WorkerT () + , runWorkerT + , WorkerFor + , runWorker + ) +where + +import Prelude + +import Control.Exception +import Control.Monad.Fail +import Control.Monad.IO.Class +import Control.Monad.IO.Unlift +import Control.Monad.Logger.CallStack +import Control.Monad.Trans.Class +import Control.Monad.Trans.Reader +import Data.Functor +import Data.Text (Text) +import Database.Persist.Sql +import UnliftIO.Async +import UnliftIO.Concurrent +import Yesod.Core +import Yesod.Core.Types +import Yesod.Persist.Core + +class PersistConfig (SitePersistConfig site) => Site site where + type SitePersistConfig site + siteApproot :: site -> Text + sitePersistConfig :: site -> SitePersistConfig site + sitePersistPool :: site -> PersistConfigPool (SitePersistConfig site) + siteLogger :: site -> Logger + +class (MonadUnliftIO m, MonadLogger m) => MonadSite m where + type SiteEnv m + askSite :: m (SiteEnv m) + askUrlRender :: m (Route (SiteEnv m) -> Text) + forkSite :: (SomeException -> m ()) -> m () -> m () + asyncSite :: m a -> m (m (Either SomeException a)) + +asksSite :: MonadSite m => (SiteEnv m -> a) -> m a +asksSite f = f <$> askSite + +runSiteDB + :: (MonadSite m, Site (SiteEnv m)) + => PersistConfigBackend (SitePersistConfig (SiteEnv m)) m a + -> m a +runSiteDB action = do + site <- askSite + runPool (sitePersistConfig site) action (sitePersistPool site) + +instance MonadSite (HandlerFor site) where + type SiteEnv (HandlerFor site) = site + askSite = getYesod + askUrlRender = getUrlRender + forkSite = forkHandler + asyncSite action = do + mvar <- newEmptyMVar + let handle e = putMVar mvar $ Left e + forkHandler handle $ do + result <- action + putMVar mvar $ Right result + return $ liftIO $ readMVar mvar + +newtype WorkerT site m a = WorkerT + { unWorkerT :: LoggingT (ReaderT site m) a + } + deriving + ( Functor, Applicative, Monad, MonadFail, MonadIO, MonadLogger + , MonadLoggerIO + ) + +instance MonadUnliftIO m => MonadUnliftIO (WorkerT site m) where + askUnliftIO = + WorkerT $ withUnliftIO $ \ u -> + return $ UnliftIO $ unliftIO u . unWorkerT + withRunInIO inner = + WorkerT $ withRunInIO $ \ run -> inner (run . unWorkerT) + +instance MonadTrans (WorkerT site) where + lift = WorkerT . lift . lift + +instance (MonadUnliftIO m, Yesod site, Site site) => MonadSite (WorkerT site m) where + type SiteEnv (WorkerT site m) = site + askSite = WorkerT $ lift ask + askUrlRender = do + site <- askSite + return $ \ route -> yesodRender site (siteApproot site) route [] + forkSite handler action = void $ forkFinally action handler' + where + handler' (Left e) = handler e + handler' (Right _) = pure () + asyncSite action = waitCatch <$> async action + +runWorkerT :: (Yesod site, Site site) => WorkerT site m a -> site -> m a +runWorkerT (WorkerT action) site = runReaderT (runLoggingT action logFunc) site + where + logFunc = messageLoggerSource site (siteLogger site) + +type WorkerFor site = WorkerT site IO + +runWorker :: (Yesod site, Site site) => WorkerFor site a -> site -> IO a +runWorker = runWorkerT diff --git a/vervis.cabal b/vervis.cabal index c31773b..f23a996 100644 --- a/vervis.cabal +++ b/vervis.cabal @@ -100,6 +100,7 @@ library Yesod.Auth.Unverified.Internal Yesod.FedURI Yesod.Hashids + Yesod.MonadSite Yesod.Paginate.Local Yesod.Persist.Local Yesod.SessionEntity