mirror of
https://code.naskya.net/repos/ndqEd
synced 2025-01-10 01:26:46 +09:00
DB: Avoid bulk-selecting specific-actor records
When looking up a specfic actor record for a given ActorId, you're pretty much guaranteed to find the actor if it exists, because there's 1 function in the codebase that handles this. Whenever a new actor type is added, which is a rare event, that function gets updated. But when mass-selecting actors using Esqueleto? Then, you need to LeftOuterJoin by yourself on each actor type. This is both ugly and error prone, because all those places in the codebase need to be updated when adding an actor type. The only downside is that it means O(n) DB queries instead of O(1). Perhaps there's some elegant way to "add" the specific-actor Joins to a given Esqueleto query. Something to do some other time, as an optimization, if the need arises.
This commit is contained in:
parent
118b787416
commit
6407aaf897
1 changed files with 32 additions and 94 deletions
|
@ -87,6 +87,7 @@ import Vervis.Data.Actor
|
|||
import Vervis.FedURI
|
||||
import Vervis.Foundation
|
||||
import Vervis.Model
|
||||
import Vervis.Persist.Actor
|
||||
import Vervis.Recipient
|
||||
import Vervis.RemoteActorStore
|
||||
import Vervis.Settings
|
||||
|
@ -893,14 +894,6 @@ fork action = do
|
|||
return False
|
||||
Right success -> return success
|
||||
|
||||
localActor Nothing Nothing Nothing Nothing Nothing = error "Found unused Actor"
|
||||
localActor (Just p) Nothing Nothing Nothing Nothing = LocalActorPerson p
|
||||
localActor Nothing (Just g) Nothing Nothing Nothing = LocalActorGroup g
|
||||
localActor Nothing Nothing (Just r) Nothing Nothing = LocalActorRepo r
|
||||
localActor Nothing Nothing Nothing (Just d) Nothing = LocalActorDeck d
|
||||
localActor Nothing Nothing Nothing Nothing (Just l) = LocalActorLoom l
|
||||
localActor _ _ _ _ _ = error "Found multiple-use Actor"
|
||||
|
||||
retryUnlinkedDelivery :: Worker ()
|
||||
retryUnlinkedDelivery = do
|
||||
now <- liftIO $ getCurrentTime
|
||||
|
@ -909,18 +902,7 @@ retryUnlinkedDelivery = do
|
|||
-- Get all unlinked deliveries which aren't running already in outbox
|
||||
-- post handlers
|
||||
unlinked' <- E.select $ E.from $
|
||||
\ (udl `E.InnerJoin` obi `E.InnerJoin` a `E.InnerJoin` ura `E.InnerJoin` ro `E.InnerJoin` i `E.LeftOuterJoin` ra `E.LeftOuterJoin` rc
|
||||
`E.LeftOuterJoin` p
|
||||
`E.LeftOuterJoin` g
|
||||
`E.LeftOuterJoin` r
|
||||
`E.LeftOuterJoin` d
|
||||
`E.LeftOuterJoin` l
|
||||
) -> do
|
||||
E.on $ E.just (a E.^. ActorId) E.==. l E.?. LoomActor
|
||||
E.on $ E.just (a E.^. ActorId) E.==. d E.?. DeckActor
|
||||
E.on $ E.just (a E.^. ActorId) E.==. r E.?. RepoActor
|
||||
E.on $ E.just (a E.^. ActorId) E.==. g E.?. GroupActor
|
||||
E.on $ E.just (a E.^. ActorId) E.==. p E.?. PersonActor
|
||||
\ (udl `E.InnerJoin` obi `E.InnerJoin` a `E.InnerJoin` ura `E.InnerJoin` ro `E.InnerJoin` i `E.LeftOuterJoin` ra `E.LeftOuterJoin` rc) -> do
|
||||
E.on $ E.just (ro E.^. RemoteObjectId) E.==. rc E.?. RemoteCollectionIdent
|
||||
E.on $ E.just (ro E.^. RemoteObjectId) E.==. ra E.?. RemoteActorIdent
|
||||
E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId
|
||||
|
@ -941,23 +923,16 @@ retryUnlinkedDelivery = do
|
|||
, obi E.^. OutboxItemActivity
|
||||
, ra E.?. RemoteActorId
|
||||
, rc E.?. RemoteCollectionId
|
||||
|
||||
, a E.^. ActorId
|
||||
|
||||
, p E.?. PersonId
|
||||
, g E.?. GroupId
|
||||
, r E.?. RepoId
|
||||
, d E.?. DeckId
|
||||
, l E.?. LoomId
|
||||
)
|
||||
|
||||
-- Strip the E.Value wrappers and organize the records for the
|
||||
-- filtering and grouping we'll need to do
|
||||
let unlinked = map adaptUnlinked unlinked'
|
||||
unlinked <- traverse adaptUnlinked unlinked'
|
||||
|
||||
-- Split into found (recipient has been reached) and lonely (recipient
|
||||
-- hasn't been reached
|
||||
(found, lonely) = partitionMaybes unlinked
|
||||
let (found, lonely) = partitionMaybes unlinked
|
||||
|
||||
-- Turn the found ones into linked deliveries
|
||||
deleteWhere [UnlinkedDeliveryId <-. map (unlinkedID . snd) found]
|
||||
|
@ -988,10 +963,9 @@ retryUnlinkedDelivery = do
|
|||
|
||||
where
|
||||
|
||||
adaptUnlinked
|
||||
( Entity iid (Instance h), E.Value uraid, E.Value luRecip, E.Value since, E.Value udlid, E.Value obid, E.Value fwd, E.Value act, E.Value mraid, E.Value mrcid
|
||||
, E.Value actorID, E.Value mp, E.Value mg, E.Value mr, E.Value md, E.Value ml
|
||||
) =
|
||||
adaptUnlinked (Entity iid (Instance h), E.Value uraid, E.Value luRecip, E.Value since, E.Value udlid, E.Value obid, E.Value fwd, E.Value act, E.Value mraid, E.Value mrcid, E.Value actorID) = do
|
||||
actorByKey <- getLocalActor actorID
|
||||
return
|
||||
( Left <$> mraid <|> Right <$> mrcid
|
||||
, ( ( (iid, h)
|
||||
, ( (uraid, luRecip)
|
||||
|
@ -1000,7 +974,7 @@ retryUnlinkedDelivery = do
|
|||
, obid
|
||||
, BL.fromStrict $ persistJSONBytes act
|
||||
, actorID
|
||||
, localActor mp mg mr md ml
|
||||
, actorByKey
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -1073,18 +1047,7 @@ retryLinkedDelivery = do
|
|||
-- Now let's grab the linked deliveries, and similarly delete old ones
|
||||
-- and return the rest for HTTP delivery.
|
||||
linked <- E.select $ E.from $
|
||||
\ (dl `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i `E.InnerJoin` obi `E.InnerJoin` a
|
||||
`E.LeftOuterJoin` p
|
||||
`E.LeftOuterJoin` g
|
||||
`E.LeftOuterJoin` r
|
||||
`E.LeftOuterJoin` d
|
||||
`E.LeftOuterJoin` l
|
||||
) -> do
|
||||
E.on $ E.just (a E.^. ActorId) E.==. l E.?. LoomActor
|
||||
E.on $ E.just (a E.^. ActorId) E.==. d E.?. DeckActor
|
||||
E.on $ E.just (a E.^. ActorId) E.==. r E.?. RepoActor
|
||||
E.on $ E.just (a E.^. ActorId) E.==. g E.?. GroupActor
|
||||
E.on $ E.just (a E.^. ActorId) E.==. p E.?. PersonActor
|
||||
\ (dl `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i `E.InnerJoin` obi `E.InnerJoin` a) -> do
|
||||
E.on $ obi E.^. OutboxItemOutbox E.==. a E.^. ActorOutbox
|
||||
E.on $ dl E.^. DeliveryActivity E.==. obi E.^. OutboxItemId
|
||||
E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId
|
||||
|
@ -1102,19 +1065,12 @@ retryLinkedDelivery = do
|
|||
, dl E.^. DeliveryId
|
||||
, dl E.^. DeliveryForwarding
|
||||
, obi E.^. OutboxItemActivity
|
||||
|
||||
, a E.^. ActorId
|
||||
|
||||
, p E.?. PersonId
|
||||
, g E.?. GroupId
|
||||
, r E.?. RepoId
|
||||
, d E.?. DeckId
|
||||
, l E.?. LoomId
|
||||
)
|
||||
dropAfter <- lift $ asksSite $ appDropDeliveryAfter . appSettings
|
||||
linked' <- traverse adaptLinked linked
|
||||
let (linkedOld, linkedNew) =
|
||||
partitionEithers $
|
||||
map (decideBySinceDL dropAfter now . adaptLinked) linked
|
||||
partitionEithers $ map (decideBySinceDL dropAfter now) linked'
|
||||
deleteWhere [DeliveryId <-. linkedOld]
|
||||
|
||||
return $ groupLinked linkedNew
|
||||
|
@ -1134,17 +1090,16 @@ retryLinkedDelivery = do
|
|||
|
||||
where
|
||||
|
||||
adaptLinked
|
||||
( E.Value iid, E.Value h, E.Value raid, E.Value ident, E.Value inbox, E.Value since, E.Value dlid, E.Value fwd, E.Value act
|
||||
, E.Value actorID, E.Value mp, E.Value mg, E.Value mr, E.Value md, E.Value ml
|
||||
) =
|
||||
adaptLinked (E.Value iid, E.Value h, E.Value raid, E.Value ident, E.Value inbox, E.Value since, E.Value dlid, E.Value fwd, E.Value act, E.Value actorID) = do
|
||||
actorByKey <- getLocalActor actorID
|
||||
return
|
||||
( ( (iid, h)
|
||||
, ( (raid, (ident, inbox))
|
||||
, ( dlid
|
||||
, fwd
|
||||
, BL.fromStrict $ persistJSONBytes act
|
||||
, actorID
|
||||
, localActor mp mg mr md ml
|
||||
, actorByKey
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -1205,36 +1160,18 @@ retryForwarding = do
|
|||
|
||||
-- Same for forwarding deliveries, which are always linked
|
||||
forwarding <- E.select $ E.from $
|
||||
\ (fw `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i
|
||||
`E.LeftOuterJoin` p
|
||||
`E.LeftOuterJoin` g
|
||||
`E.LeftOuterJoin` r
|
||||
`E.LeftOuterJoin` d
|
||||
`E.LeftOuterJoin` l
|
||||
) -> do
|
||||
E.on $ E.just (fw E.^. ForwardingForwarder) E.==. l E.?. LoomActor
|
||||
E.on $ E.just (fw E.^. ForwardingForwarder) E.==. d E.?. DeckActor
|
||||
E.on $ E.just (fw E.^. ForwardingForwarder) E.==. r E.?. RepoActor
|
||||
E.on $ E.just (fw E.^. ForwardingForwarder) E.==. g E.?. GroupActor
|
||||
E.on $ E.just (fw E.^. ForwardingForwarder) E.==. p E.?. PersonActor
|
||||
\ (fw `E.InnerJoin` ra `E.InnerJoin` ro `E.InnerJoin` i) -> do
|
||||
E.on $ ro E.^. RemoteObjectInstance E.==. i E.^. InstanceId
|
||||
E.on $ ra E.^. RemoteActorIdent E.==. ro E.^. RemoteObjectId
|
||||
E.on $ fw E.^. ForwardingRecipient E.==. ra E.^. RemoteActorId
|
||||
E.where_ $ fw E.^. ForwardingRunning E.==. E.val False
|
||||
E.orderBy [E.asc $ ro E.^. RemoteObjectInstance, E.asc $ ra E.^. RemoteActorId]
|
||||
return
|
||||
(i, ra, fw
|
||||
, p E.?. PersonId
|
||||
, g E.?. GroupId
|
||||
, r E.?. RepoId
|
||||
, d E.?. DeckId
|
||||
, l E.?. LoomId
|
||||
)
|
||||
return (i, ra, fw)
|
||||
dropAfter <- lift $ asksSite $ appDropDeliveryAfter . appSettings
|
||||
forwarding' <- traverse adaptForwarding forwarding
|
||||
let (forwardingOld, forwardingNew) =
|
||||
partitionEithers $
|
||||
map (decideBySinceFW dropAfter now . adaptForwarding)
|
||||
forwarding
|
||||
map (decideBySinceFW dropAfter now) forwarding'
|
||||
deleteWhere [ForwardingId <-. forwardingOld]
|
||||
|
||||
return $ groupForwarding forwardingNew
|
||||
|
@ -1258,20 +1195,21 @@ retryForwarding = do
|
|||
( Entity iid (Instance h)
|
||||
, Entity raid (RemoteActor _ _ inbox _ since)
|
||||
, Entity fwid (Forwarding _ _ body sig fwderID _)
|
||||
, E.Value mp, E.Value mg, E.Value mr, E.Value md, E.Value ml
|
||||
) =
|
||||
( ( (iid, h)
|
||||
, ( (raid, inbox)
|
||||
, ( fwid
|
||||
, BL.fromStrict body
|
||||
, localActor mp mg mr md ml
|
||||
, sig
|
||||
, fwderID
|
||||
) = do
|
||||
actorByKey <- getLocalActor fwderID
|
||||
return
|
||||
( ( (iid, h)
|
||||
, ( (raid, inbox)
|
||||
, ( fwid
|
||||
, BL.fromStrict body
|
||||
, actorByKey
|
||||
, sig
|
||||
, fwderID
|
||||
)
|
||||
)
|
||||
)
|
||||
, since
|
||||
)
|
||||
)
|
||||
, since
|
||||
)
|
||||
|
||||
decideBySinceFW dropAfter now (fw@(_, (_, (fwid, _, _, _, _))), msince) =
|
||||
case msince of
|
||||
|
|
Loading…
Reference in a new issue