feat: operational resilience hardening — server blocking, caching, key refresh, async inbox (v2.14.0)

- Server-level blocking: O(1) Redis SISMEMBER check in all inbox listeners,
  admin UI for blocking/unblocking servers by hostname, MongoDB fallback
- Redis caching for collection dispatchers: 300s TTL on followers/following/liked
  counters and paginated pages, one-shot followers recipients cache
- Proactive key refresh: daily cron re-fetches actor documents for followers
  with 7+ day stale keys using lookupWithSecurity()
- Async inbox processing: MongoDB-backed queue with 3s polling, retry (3 attempts),
  24h TTL auto-prune. Follow keeps synchronous Accept, Block keeps synchronous
  follower removal. All other activity types fully deferred to background processor.

Inspired by wafrn's battle-tested multi-user AP implementation.

Confab-Link: http://localhost:8080/sessions/af5f8b45-6b8d-442d-8f25-78c326190709
This commit is contained in:
Ricardo
2026-03-17 09:16:05 +01:00
parent 9a61145d97
commit 1567b7c4e5
10 changed files with 1833 additions and 757 deletions

View File

@@ -36,6 +36,8 @@ import {
unmuteController,
blockController,
unblockController,
blockServerController,
unblockServerController,
moderationController,
filterModeController,
} from "./lib/controllers/moderation.js";
@@ -103,6 +105,9 @@ import { startBatchRefollow } from "./lib/batch-refollow.js";
import { logActivity } from "./lib/activity-log.js";
import { scheduleCleanup } from "./lib/timeline-cleanup.js";
import { runSeparateMentionsMigration } from "./lib/migrations/separate-mentions.js";
import { loadBlockedServersToRedis } from "./lib/storage/server-blocks.js";
import { scheduleKeyRefresh } from "./lib/key-refresh.js";
import { startInboxProcessor } from "./lib/inbox-queue.js";
import { deleteFederationController } from "./lib/controllers/federation-delete.js";
import {
federationMgmtController,
@@ -308,6 +313,8 @@ export default class ActivityPubEndpoint {
router.post("/admin/reader/unmute", unmuteController(mp, this));
router.post("/admin/reader/block", blockController(mp, this));
router.post("/admin/reader/unblock", unblockController(mp, this));
router.post("/admin/reader/block-server", blockServerController(mp));
router.post("/admin/reader/unblock-server", unblockServerController(mp));
router.get("/admin/followers", followersController(mp));
router.post("/admin/followers/approve", approveFollowController(mp, this));
router.post("/admin/followers/reject", rejectFollowController(mp, this));
@@ -1124,6 +1131,12 @@ export default class ActivityPubEndpoint {
Indiekit.addCollection("ap_reports");
// Pending follow requests (manual approval)
Indiekit.addCollection("ap_pending_follows");
// Server-level blocks
Indiekit.addCollection("ap_blocked_servers");
// Key freshness tracking for proactive refresh
Indiekit.addCollection("ap_key_freshness");
// Async inbox processing queue
Indiekit.addCollection("ap_inbox_queue");
// Store collection references (posts resolved lazily)
const indiekitCollections = Indiekit.collections;
@@ -1151,6 +1164,12 @@ export default class ActivityPubEndpoint {
ap_reports: indiekitCollections.get("ap_reports"),
// Pending follow requests (manual approval)
ap_pending_follows: indiekitCollections.get("ap_pending_follows"),
// Server-level blocks
ap_blocked_servers: indiekitCollections.get("ap_blocked_servers"),
// Key freshness tracking
ap_key_freshness: indiekitCollections.get("ap_key_freshness"),
// Async inbox processing queue
ap_inbox_queue: indiekitCollections.get("ap_inbox_queue"),
get posts() {
return indiekitCollections.get("posts");
},
@@ -1351,6 +1370,27 @@ export default class ActivityPubEndpoint {
{ requestedAt: -1 },
{ background: true },
);
// Server-level blocks
this._collections.ap_blocked_servers.createIndex(
{ hostname: 1 },
{ unique: true, background: true },
);
// Key freshness tracking
this._collections.ap_key_freshness.createIndex(
{ actorUrl: 1 },
{ unique: true, background: true },
);
// Inbox queue indexes
this._collections.ap_inbox_queue.createIndex(
{ status: 1, receivedAt: 1 },
{ background: true },
);
// TTL: auto-prune completed items after 24h
this._collections.ap_inbox_queue.createIndex(
{ processedAt: 1 },
{ expireAfterSeconds: 86_400, background: true },
);
} catch {
// Index creation failed — collections not yet available.
// Indexes already exist from previous startups; non-fatal.
@@ -1446,6 +1486,34 @@ export default class ActivityPubEndpoint {
if (this.options.timelineRetention > 0) {
scheduleCleanup(this._collections, this.options.timelineRetention);
}
// Load server blocks into Redis for fast inbox checks
loadBlockedServersToRedis(this._collections).catch((error) => {
console.warn("[ActivityPub] Failed to load blocked servers to Redis:", error.message);
});
// Schedule proactive key refresh for stale follower keys (runs on startup + every 24h)
const keyRefreshHandle = this.options.actor.handle;
const keyRefreshFederation = this._federation;
const keyRefreshPubUrl = this._publicationUrl;
scheduleKeyRefresh(
this._collections,
() => keyRefreshFederation?.createContext(new URL(keyRefreshPubUrl), {
handle: keyRefreshHandle,
publicationUrl: keyRefreshPubUrl,
}),
keyRefreshHandle,
);
// Start async inbox queue processor (processes one item every 3s)
this._inboxProcessorInterval = startInboxProcessor(
this._collections,
() => this._federation?.createContext(new URL(this._publicationUrl), {
handle: this.options.actor.handle,
publicationUrl: this._publicationUrl,
}),
this.options.actor.handle,
);
}
/**

View File

@@ -14,6 +14,11 @@ import {
getFilterMode,
setFilterMode,
} from "../storage/moderation.js";
import {
addBlockedServer,
removeBlockedServer,
getAllBlockedServers,
} from "../storage/server-blocks.js";
/**
* Helper to get moderation collections from request.
@@ -23,6 +28,7 @@ function getModerationCollections(request) {
return {
ap_muted: application?.collections?.get("ap_muted"),
ap_blocked: application?.collections?.get("ap_blocked"),
ap_blocked_servers: application?.collections?.get("ap_blocked_servers"),
ap_timeline: application?.collections?.get("ap_timeline"),
ap_profile: application?.collections?.get("ap_profile"),
};
@@ -282,6 +288,77 @@ export function unblockController(mountPath, plugin) {
};
}
/**
* POST /admin/reader/block-server — Block a server by hostname.
*/
export function blockServerController(mountPath) {
return async (request, response, next) => {
try {
if (!validateToken(request)) {
return response.status(403).json({
success: false,
error: "Invalid CSRF token",
});
}
const { hostname, reason } = request.body;
if (!hostname) {
return response.status(400).json({
success: false,
error: "Missing hostname",
});
}
const collections = getModerationCollections(request);
await addBlockedServer(collections, hostname, reason);
console.info(`[ActivityPub] Blocked server: ${hostname}`);
return response.json({ success: true, type: "block-server", hostname });
} catch (error) {
console.error("[ActivityPub] Block server failed:", error.message);
return response.status(500).json({
success: false,
error: "Operation failed. Please try again later.",
});
}
};
}
/**
* POST /admin/reader/unblock-server — Unblock a server.
*/
export function unblockServerController(mountPath) {
return async (request, response, next) => {
try {
if (!validateToken(request)) {
return response.status(403).json({
success: false,
error: "Invalid CSRF token",
});
}
const { hostname } = request.body;
if (!hostname) {
return response.status(400).json({
success: false,
error: "Missing hostname",
});
}
const collections = getModerationCollections(request);
await removeBlockedServer(collections, hostname);
console.info(`[ActivityPub] Unblocked server: ${hostname}`);
return response.json({ success: true, type: "unblock-server", hostname });
} catch (error) {
return response.status(500).json({
success: false,
error: "Operation failed. Please try again later.",
});
}
};
}
/**
* GET /admin/reader/moderation — View muted/blocked lists.
*/
@@ -291,9 +368,10 @@ export function moderationController(mountPath) {
const collections = getModerationCollections(request);
const csrfToken = getToken(request.session);
const [muted, blocked, filterMode] = await Promise.all([
const [muted, blocked, blockedServers, filterMode] = await Promise.all([
getAllMuted(collections),
getAllBlocked(collections),
getAllBlockedServers(collections),
getFilterMode(collections),
]);
@@ -305,6 +383,7 @@ export function moderationController(mountPath) {
readerParent: { href: `${mountPath}/admin/reader`, text: response.locals.__("activitypub.reader.title") },
muted,
blocked,
blockedServers,
mutedActors,
mutedKeywords,
filterMode,

View File

@@ -40,6 +40,9 @@ import Redis from "ioredis";
import { MongoKvStore } from "./kv-store.js";
import { registerInboxListeners } from "./inbox-listeners.js";
import { jf2ToAS2Activity, resolvePostUrl } from "./jf2-to-as2.js";
import { cachedQuery } from "./redis-cache.js";
const COLLECTION_CACHE_TTL = 300; // 5 minutes
/**
* Create and configure a Fedify Federation instance.
@@ -404,10 +407,12 @@ function setupFollowers(federation, mountPath, handle, collections) {
// as Recipient objects so sendActivity("followers") can deliver.
// See: https://fedify.dev/manual/collections#one-shot-followers-collection-for-gathering-recipients
if (cursor == null) {
const docs = await collections.ap_followers
.find()
.sort({ followedAt: -1 })
.toArray();
const docs = await cachedQuery("col:followers:recipients", COLLECTION_CACHE_TTL, async () => {
return await collections.ap_followers
.find()
.sort({ followedAt: -1 })
.toArray();
});
return {
items: docs.map((f) => ({
id: new URL(f.actorUrl),
@@ -422,13 +427,16 @@ function setupFollowers(federation, mountPath, handle, collections) {
// Paginated collection: for remote browsing of /followers endpoint
const pageSize = 20;
const skip = Number.parseInt(cursor, 10);
const docs = await collections.ap_followers
.find()
.sort({ followedAt: -1 })
.skip(skip)
.limit(pageSize)
.toArray();
const total = await collections.ap_followers.countDocuments();
const [docs, total] = await cachedQuery(`col:followers:page:${cursor}`, COLLECTION_CACHE_TTL, async () => {
const d = await collections.ap_followers
.find()
.sort({ followedAt: -1 })
.skip(skip)
.limit(pageSize)
.toArray();
const t = await collections.ap_followers.countDocuments();
return [d, t];
});
return {
items: docs.map((f) => new URL(f.actorUrl)),
@@ -439,7 +447,9 @@ function setupFollowers(federation, mountPath, handle, collections) {
)
.setCounter(async (ctx, identifier) => {
if (identifier !== handle) return 0;
return await collections.ap_followers.countDocuments();
return await cachedQuery("col:followers:count", COLLECTION_CACHE_TTL, async () => {
return await collections.ap_followers.countDocuments();
});
})
.setFirstCursor(async () => "0");
}
@@ -452,13 +462,16 @@ function setupFollowing(federation, mountPath, handle, collections) {
if (identifier !== handle) return null;
const pageSize = 20;
const skip = cursor ? Number.parseInt(cursor, 10) : 0;
const docs = await collections.ap_following
.find()
.sort({ followedAt: -1 })
.skip(skip)
.limit(pageSize)
.toArray();
const total = await collections.ap_following.countDocuments();
const [docs, total] = await cachedQuery(`col:following:page:${cursor}`, COLLECTION_CACHE_TTL, async () => {
const d = await collections.ap_following
.find()
.sort({ followedAt: -1 })
.skip(skip)
.limit(pageSize)
.toArray();
const t = await collections.ap_following.countDocuments();
return [d, t];
});
return {
items: docs.map((f) => new URL(f.actorUrl)),
@@ -469,7 +482,9 @@ function setupFollowing(federation, mountPath, handle, collections) {
)
.setCounter(async (ctx, identifier) => {
if (identifier !== handle) return 0;
return await collections.ap_following.countDocuments();
return await cachedQuery("col:following:count", COLLECTION_CACHE_TTL, async () => {
return await collections.ap_following.countDocuments();
});
})
.setFirstCursor(async () => "0");
}
@@ -485,13 +500,16 @@ function setupLiked(federation, mountPath, handle, collections) {
const pageSize = 20;
const skip = cursor ? Number.parseInt(cursor, 10) : 0;
const query = { "properties.post-type": "like" };
const docs = await collections.posts
.find(query)
.sort({ "properties.published": -1 })
.skip(skip)
.limit(pageSize)
.toArray();
const total = await collections.posts.countDocuments(query);
const [docs, total] = await cachedQuery(`col:liked:page:${cursor}`, COLLECTION_CACHE_TTL, async () => {
const d = await collections.posts
.find(query)
.sort({ "properties.published": -1 })
.skip(skip)
.limit(pageSize)
.toArray();
const t = await collections.posts.countDocuments(query);
return [d, t];
});
const items = docs
.map((d) => {
@@ -510,8 +528,10 @@ function setupLiked(federation, mountPath, handle, collections) {
.setCounter(async (ctx, identifier) => {
if (identifier !== handle) return 0;
if (!collections.posts) return 0;
return await collections.posts.countDocuments({
"properties.post-type": "like",
return await cachedQuery("col:liked:count", COLLECTION_CACHE_TTL, async () => {
return await collections.posts.countDocuments({
"properties.post-type": "like",
});
});
})
.setFirstCursor(async () => "0");

1021
lib/inbox-handlers.js Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,15 +1,17 @@
/**
* Inbox listener registrations for the Fedify Federation instance.
*
* Each listener handles a specific ActivityPub activity type received
* in the actor's inbox (Follow, Undo, Like, Announce, Create, Delete, Move).
* Each listener is a thin shim that:
* 1. Checks server-level blocks (Redis, O(1))
* 2. Updates key freshness tracking
* 3. Performs synchronous-only work (Follow Accept, Block follower removal)
* 4. Enqueues the activity for async processing
*/
import {
Accept,
Add,
Announce,
Article,
Block,
Create,
Delete,
@@ -17,20 +19,17 @@ import {
Follow,
Like,
Move,
Note,
Reject,
Remove,
Undo,
Update,
} from "@fedify/fedify/vocab";
import { logActivity as logActivityShared } from "./activity-log.js";
import { sanitizeContent, extractActorInfo, extractObjectData } from "./timeline-store.js";
import { addTimelineItem, deleteTimelineItem, updateTimelineItem } from "./storage/timeline.js";
import { isServerBlocked } from "./storage/server-blocks.js";
import { touchKeyFreshness } from "./key-refresh.js";
import { enqueueActivity } from "./inbox-queue.js";
import { extractActorInfo } from "./timeline-store.js";
import { addNotification } from "./storage/notifications.js";
import { addMessage } from "./storage/messages.js";
import { fetchAndStorePreviews, fetchAndStoreQuote } from "./og-unfurl.js";
import { getFollowedTags } from "./storage/followed-tags.js";
/**
* Register all inbox listeners on a federation's inbox chain.
@@ -41,54 +40,20 @@ import { getFollowedTags } from "./storage/followed-tags.js";
* @param {string} options.handle - Actor handle
* @param {boolean} options.storeRawActivities - Whether to store raw JSON
*/
/** @type {string} ActivityStreams Public Collection constant */
const PUBLIC = "https://www.w3.org/ns/activitystreams#Public";
/**
* Determine if an object is a direct message (DM).
* A DM is addressed only to specific actors — no PUBLIC_COLLECTION,
* no followers collection, and includes our actor URL.
*
* @param {object} object - Fedify object (Note, Article, etc.)
* @param {string} ourActorUrl - Our actor's URL
* @param {string} followersUrl - Our followers collection URL
* @returns {boolean}
*/
function isDirectMessage(object, ourActorUrl, followersUrl) {
const allAddressed = [
...object.toIds.map((u) => u.href),
...object.ccIds.map((u) => u.href),
...object.btoIds.map((u) => u.href),
...object.bccIds.map((u) => u.href),
];
// Must be addressed to us
if (!allAddressed.includes(ourActorUrl)) return false;
// Must NOT include public collection
if (allAddressed.some((u) => u === PUBLIC || u === "as:Public")) return false;
// Must NOT include our followers collection
if (followersUrl && allAddressed.includes(followersUrl)) return false;
return true;
}
export function registerInboxListeners(inboxChain, options) {
const { collections, handle, storeRawActivities } = options;
const { collections, handle } = options;
/**
* Get an authenticated DocumentLoader that signs outbound fetches with
* our actor's key. This allows .getActor()/.getObject() to succeed
* against Authorized Fetch (Secure Mode) servers like hachyderm.io.
*
* @param {import("@fedify/fedify").Context} ctx - Fedify context
* @returns {Promise<import("@fedify/fedify").DocumentLoader>}
*/
const getAuthLoader = (ctx) => ctx.getDocumentLoader({ identifier: handle });
inboxChain
// ── Follow ──────────────────────────────────────────────────────
// Synchronous: Accept/Reject + follower storage (federation requirement)
// Async: notification + activity log
.on(Follow, async (ctx, follow) => {
const actorUrl = follow.actorId?.href || "";
if (await isServerBlocked(actorUrl, collections)) return;
await touchKeyFreshness(collections, actorUrl);
const authLoader = await getAuthLoader(ctx);
const followerActor = await follow.getActor({ documentLoader: authLoader });
if (!followerActor?.id) return;
@@ -99,7 +64,6 @@ export function registerInboxListeners(inboxChain, options) {
followerActor.preferredUsername?.toString() ||
followerUrl;
// Build common follower data
const followerData = {
actorUrl: followerUrl,
handle: followerActor.preferredUsername?.toString() || "",
@@ -111,12 +75,10 @@ export function registerInboxListeners(inboxChain, options) {
sharedInbox: followerActor.endpoints?.sharedInbox?.href || "",
};
// Check if manual approval is enabled
const profile = await collections.ap_profile.findOne({});
const manualApproval = profile?.manuallyApprovesFollowers || false;
if (manualApproval && collections.ap_pending_follows) {
// Store as pending — do NOT send Accept yet
await collections.ap_pending_follows.updateOne(
{ actorUrl: followerUrl },
{
@@ -129,15 +91,7 @@ export function registerInboxListeners(inboxChain, options) {
{ upsert: true },
);
await logActivity(collections, storeRawActivities, {
direction: "inbound",
type: "Follow",
actorUrl: followerUrl,
actorName: followerName,
summary: `${followerName} requested to follow you`,
});
// Notification with type "follow_request"
// Notification for follow request (synchronous — needed for UI)
const followerInfo = await extractActorInfo(followerActor, { documentLoader: authLoader });
await addNotification(collections, {
uid: follow.id?.href || `follow_request:${followerUrl}`,
@@ -150,7 +104,6 @@ export function registerInboxListeners(inboxChain, options) {
createdAt: new Date().toISOString(),
});
} else {
// Auto-accept: store follower + send Accept back
await collections.ap_followers.updateOne(
{ actorUrl: followerUrl },
{
@@ -172,15 +125,7 @@ export function registerInboxListeners(inboxChain, options) {
{ orderingKey: followerUrl },
);
await logActivity(collections, storeRawActivities, {
direction: "inbound",
type: "Follow",
actorUrl: followerUrl,
actorName: followerName,
summary: `${followerName} followed you`,
});
// Store notification
// Notification for follow (synchronous — needed for UI)
const followerInfo = await extractActorInfo(followerActor, { documentLoader: authLoader });
await addNotification(collections, {
uid: follow.id?.href || `follow:${followerUrl}`,
@@ -193,680 +138,172 @@ export function registerInboxListeners(inboxChain, options) {
createdAt: new Date().toISOString(),
});
}
// Enqueue async portion (activity log)
await enqueueActivity(collections, {
activityType: "Follow",
actorUrl,
rawJson: await follow.toJsonLd(),
});
})
// ── Undo ────────────────────────────────────────────────────────
.on(Undo, async (ctx, undo) => {
const actorUrl = undo.actorId?.href || "";
const authLoader = await getAuthLoader(ctx);
let inner;
try {
inner = await undo.getObject({ documentLoader: authLoader });
} catch {
// Inner activity not dereferenceable — can't determine what was undone
return;
}
if (await isServerBlocked(actorUrl, collections)) return;
await touchKeyFreshness(collections, actorUrl);
if (inner instanceof Follow) {
await collections.ap_followers.deleteOne({ actorUrl });
await logActivity(collections, storeRawActivities, {
direction: "inbound",
type: "Undo(Follow)",
actorUrl,
summary: `${actorUrl} unfollowed you`,
});
} else if (inner instanceof Like) {
const objectId = inner.objectId?.href || "";
await collections.ap_activities.deleteOne({
type: "Like",
actorUrl,
objectUrl: objectId,
});
} else if (inner instanceof Announce) {
const objectId = inner.objectId?.href || "";
await collections.ap_activities.deleteOne({
type: "Announce",
actorUrl,
objectUrl: objectId,
});
} else {
const typeName = inner?.constructor?.name || "unknown";
await logActivity(collections, storeRawActivities, {
direction: "inbound",
type: `Undo(${typeName})`,
actorUrl,
summary: `${actorUrl} undid ${typeName}`,
});
}
})
.on(Accept, async (ctx, accept) => {
// Handle Accept(Follow) — remote server accepted our Follow request.
// We don't inspect the inner object type because Fedify often resolves
// it to a Person (the Follow's target) rather than the Follow itself.
// Instead, we match directly against ap_following — if we have a
// pending follow for this actor, any Accept from them confirms it.
const authLoader = await getAuthLoader(ctx);
const actorObj = await accept.getActor({ documentLoader: authLoader });
const actorUrl = actorObj?.id?.href || "";
if (!actorUrl) return;
const result = await collections.ap_following.findOneAndUpdate(
{
actorUrl,
source: { $in: ["refollow:sent", "reader", "microsub-reader"] },
},
{
$set: {
source: "federation",
acceptedAt: new Date().toISOString(),
},
$unset: {
refollowAttempts: "",
refollowLastAttempt: "",
refollowError: "",
},
},
{ returnDocument: "after" },
);
if (result) {
const actorName =
result.name || result.handle || actorUrl;
await logActivity(collections, storeRawActivities, {
direction: "inbound",
type: "Accept(Follow)",
actorUrl,
actorName,
summary: `${actorName} accepted our Follow`,
});
}
})
.on(Reject, async (ctx, reject) => {
const authLoader = await getAuthLoader(ctx);
const actorObj = await reject.getActor({ documentLoader: authLoader });
const actorUrl = actorObj?.id?.href || "";
if (!actorUrl) return;
// Mark rejected follow in ap_following
const result = await collections.ap_following.findOneAndUpdate(
{
actorUrl,
source: { $in: ["refollow:sent", "reader", "microsub-reader"] },
},
{
$set: {
source: "rejected",
rejectedAt: new Date().toISOString(),
},
},
{ returnDocument: "after" },
);
if (result) {
const actorName = result.name || result.handle || actorUrl;
await logActivity(collections, storeRawActivities, {
direction: "inbound",
type: "Reject(Follow)",
actorUrl,
actorName,
summary: `${actorName} rejected our Follow`,
});
}
})
.on(Like, async (ctx, like) => {
// Use .objectId (non-fetching) for the liked URL — we only need the
// URL to filter and log, not the full remote object.
const objectId = like.objectId?.href || "";
// Only log likes of our own content
const pubUrl = collections._publicationUrl;
if (!objectId || (pubUrl && !objectId.startsWith(pubUrl))) return;
const authLoader = await getAuthLoader(ctx);
const actorUrl = like.actorId?.href || "";
let actorObj;
try {
actorObj = await like.getActor({ documentLoader: authLoader });
} catch {
actorObj = null;
}
const actorName =
actorObj?.name?.toString() ||
actorObj?.preferredUsername?.toString() ||
actorUrl;
// Extract actor info (including avatar) before logging so we can store it
const actorInfo = await extractActorInfo(actorObj, { documentLoader: authLoader });
await logActivity(collections, storeRawActivities, {
direction: "inbound",
type: "Like",
await enqueueActivity(collections, {
activityType: "Undo",
actorUrl,
actorName,
actorAvatar: actorInfo.photo || "",
objectUrl: objectId,
summary: `${actorName} liked ${objectId}`,
});
// Store notification
await addNotification(collections, {
uid: like.id?.href || `like:${actorUrl}:${objectId}`,
type: "like",
actorUrl: actorInfo.url,
actorName: actorInfo.name,
actorPhoto: actorInfo.photo,
actorHandle: actorInfo.handle,
targetUrl: objectId,
targetName: "", // Could fetch post title, but not critical
published: like.published ? String(like.published) : new Date().toISOString(),
createdAt: new Date().toISOString(),
rawJson: await undo.toJsonLd(),
});
})
// ── Accept ──────────────────────────────────────────────────────
.on(Accept, async (ctx, accept) => {
const actorUrl = accept.actorId?.href || "";
if (await isServerBlocked(actorUrl, collections)) return;
await touchKeyFreshness(collections, actorUrl);
await enqueueActivity(collections, {
activityType: "Accept",
actorUrl,
rawJson: await accept.toJsonLd(),
});
})
// ── Reject ──────────────────────────────────────────────────────
.on(Reject, async (ctx, reject) => {
const actorUrl = reject.actorId?.href || "";
if (await isServerBlocked(actorUrl, collections)) return;
await touchKeyFreshness(collections, actorUrl);
await enqueueActivity(collections, {
activityType: "Reject",
actorUrl,
rawJson: await reject.toJsonLd(),
});
})
// ── Like ────────────────────────────────────────────────────────
.on(Like, async (ctx, like) => {
const actorUrl = like.actorId?.href || "";
if (await isServerBlocked(actorUrl, collections)) return;
await touchKeyFreshness(collections, actorUrl);
await enqueueActivity(collections, {
activityType: "Like",
actorUrl,
objectUrl: like.objectId?.href || "",
rawJson: await like.toJsonLd(),
});
})
// ── Announce ────────────────────────────────────────────────────
.on(Announce, async (ctx, announce) => {
const objectId = announce.objectId?.href || "";
if (!objectId) return;
const authLoader = await getAuthLoader(ctx);
const actorUrl = announce.actorId?.href || "";
const pubUrl = collections._publicationUrl;
if (await isServerBlocked(actorUrl, collections)) return;
await touchKeyFreshness(collections, actorUrl);
// Dual path logic: Notification vs Timeline
// PATH 1: Boost of OUR content → Notification
if (pubUrl && objectId.startsWith(pubUrl)) {
let actorObj;
try {
actorObj = await announce.getActor({ documentLoader: authLoader });
} catch {
actorObj = null;
}
const actorName =
actorObj?.name?.toString() ||
actorObj?.preferredUsername?.toString() ||
actorUrl;
// Extract actor info (including avatar) before logging so we can store it
const actorInfo = await extractActorInfo(actorObj, { documentLoader: authLoader });
// Log the boost activity
await logActivity(collections, storeRawActivities, {
direction: "inbound",
type: "Announce",
actorUrl,
actorName,
actorAvatar: actorInfo.photo || "",
objectUrl: objectId,
summary: `${actorName} boosted ${objectId}`,
});
// Create notification
await addNotification(collections, {
uid: announce.id?.href || `${actorUrl}#boost-${objectId}`,
type: "boost",
actorUrl: actorInfo.url,
actorName: actorInfo.name,
actorPhoto: actorInfo.photo,
actorHandle: actorInfo.handle,
targetUrl: objectId,
targetName: "", // Could fetch post title, but not critical
published: announce.published ? String(announce.published) : new Date().toISOString(),
createdAt: new Date().toISOString(),
});
// Don't return — fall through to check if actor is also followed
}
// PATH 2: Boost from someone we follow → Timeline (store original post)
const following = await collections.ap_following.findOne({ actorUrl });
if (following) {
try {
// Fetch the original object being boosted (authenticated for Secure Mode servers)
const object = await announce.getObject({ documentLoader: authLoader });
if (!object) return;
// Skip non-content objects (Lemmy/PieFed like/create activities
// that resolve to activity IDs instead of actual Note/Article posts)
const hasContent = object.content?.toString() || object.name?.toString();
if (!hasContent) return;
// Get booster actor info
const boosterActor = await announce.getActor({ documentLoader: authLoader });
const boosterInfo = await extractActorInfo(boosterActor, { documentLoader: authLoader });
// Extract and store with boost metadata
const timelineItem = await extractObjectData(object, {
boostedBy: boosterInfo,
boostedAt: announce.published ? String(announce.published) : new Date().toISOString(),
documentLoader: authLoader,
});
await addTimelineItem(collections, timelineItem);
// Fire-and-forget quote enrichment for boosted posts
if (timelineItem.quoteUrl) {
fetchAndStoreQuote(collections, timelineItem.uid, timelineItem.quoteUrl, ctx, authLoader)
.catch((error) => {
console.error(`[inbox] Quote fetch failed for ${timelineItem.uid}:`, error.message);
});
}
} catch (error) {
// Remote object unreachable (timeout, Authorized Fetch, deleted, etc.) — skip
const cause = error?.cause?.code || error?.message || "unknown";
console.warn(`[AP] Skipped boost from ${actorUrl}: ${cause}`);
}
}
})
.on(Create, async (ctx, create) => {
const authLoader = await getAuthLoader(ctx);
let object;
try {
object = await create.getObject({ documentLoader: authLoader });
} catch {
// Remote object not dereferenceable (deleted, etc.)
return;
}
if (!object) return;
const actorUrl = create.actorId?.href || "";
let actorObj;
try {
actorObj = await create.getActor({ documentLoader: authLoader });
} catch {
// Actor not dereferenceable — use URL as fallback
actorObj = null;
}
const actorName =
actorObj?.name?.toString() ||
actorObj?.preferredUsername?.toString() ||
actorUrl;
// --- DM detection ---
// Check if this is a direct message before processing as reply/mention/timeline.
// DMs are handled separately and stored in ap_messages instead of ap_timeline.
const ourActorUrl = ctx.getActorUri(handle).href;
const followersUrl = ctx.getFollowersUri(handle)?.href || "";
if (isDirectMessage(object, ourActorUrl, followersUrl)) {
const actorInfo = await extractActorInfo(actorObj, { documentLoader: authLoader });
const rawHtml = object.content?.toString() || "";
const contentHtml = sanitizeContent(rawHtml);
const contentText = rawHtml.replace(/<[^>]*>/g, "").substring(0, 500);
const published = object.published ? String(object.published) : new Date().toISOString();
const inReplyToDM = object.replyTargetId?.href || null;
// Store as message
await addMessage(collections, {
uid: object.id?.href || `dm:${actorUrl}:${Date.now()}`,
actorUrl: actorInfo.url,
actorName: actorInfo.name,
actorPhoto: actorInfo.photo,
actorHandle: actorInfo.handle,
content: {
text: contentText,
html: contentHtml,
},
inReplyTo: inReplyToDM,
conversationId: actorInfo.url,
direction: "inbound",
published,
createdAt: new Date().toISOString(),
});
// Also create a notification so DMs appear in the notification tab
await addNotification(collections, {
uid: `dm:${object.id?.href || `${actorUrl}:${Date.now()}`}`,
url: object.url?.href || object.id?.href || "",
type: "dm",
actorUrl: actorInfo.url,
actorName: actorInfo.name,
actorPhoto: actorInfo.photo,
actorHandle: actorInfo.handle,
content: {
text: contentText,
html: contentHtml,
},
published,
createdAt: new Date().toISOString(),
});
await logActivity(collections, storeRawActivities, {
direction: "inbound",
type: "DirectMessage",
actorUrl,
actorName,
actorAvatar: actorInfo.photo || "",
objectUrl: object.id?.href || "",
content: contentText.substring(0, 100),
summary: `${actorName} sent a direct message`,
});
return; // Don't process DMs as timeline/mention/reply
}
// Use replyTargetId (non-fetching) for the inReplyTo URL
const inReplyTo = object.replyTargetId?.href || null;
// Log replies to our posts (existing behavior for conversations)
const pubUrl = collections._publicationUrl;
if (inReplyTo) {
const content = object.content?.toString() || "";
// Extract actor info (including avatar) before logging so we can store it
const actorInfo = await extractActorInfo(actorObj, { documentLoader: authLoader });
await logActivity(collections, storeRawActivities, {
direction: "inbound",
type: "Reply",
actorUrl,
actorName,
actorAvatar: actorInfo.photo || "",
objectUrl: object.id?.href || "",
targetUrl: inReplyTo,
content,
summary: `${actorName} replied to ${inReplyTo}`,
});
// Create notification if reply is to one of OUR posts
if (pubUrl && inReplyTo.startsWith(pubUrl)) {
const rawHtml = object.content?.toString() || "";
const contentHtml = sanitizeContent(rawHtml);
const contentText = rawHtml.replace(/<[^>]*>/g, "").substring(0, 200);
await addNotification(collections, {
uid: object.id?.href || `reply:${actorUrl}:${inReplyTo}`,
url: object.url?.href || object.id?.href || "",
type: "reply",
actorUrl: actorInfo.url,
actorName: actorInfo.name,
actorPhoto: actorInfo.photo,
actorHandle: actorInfo.handle,
targetUrl: inReplyTo,
targetName: "",
content: {
text: contentText,
html: contentHtml,
},
published: object.published ? String(object.published) : new Date().toISOString(),
createdAt: new Date().toISOString(),
});
}
}
// Check for mentions of our actor
if (object.tag) {
const tags = Array.isArray(object.tag) ? object.tag : [object.tag];
const ourActorUrl = ctx.getActorUri(handle).href;
for (const tag of tags) {
if (tag.type === "Mention" && tag.href?.href === ourActorUrl) {
const actorInfo = await extractActorInfo(actorObj, { documentLoader: authLoader });
const rawMentionHtml = object.content?.toString() || "";
const mentionHtml = sanitizeContent(rawMentionHtml);
const contentText = rawMentionHtml.replace(/<[^>]*>/g, "").substring(0, 200);
await addNotification(collections, {
uid: object.id?.href || `mention:${actorUrl}:${object.id?.href}`,
url: object.url?.href || object.id?.href || "",
type: "mention",
actorUrl: actorInfo.url,
actorName: actorInfo.name,
actorPhoto: actorInfo.photo,
actorHandle: actorInfo.handle,
content: {
text: contentText,
html: mentionHtml,
},
published: object.published ? String(object.published) : new Date().toISOString(),
createdAt: new Date().toISOString(),
});
break; // Only create one mention notification per post
}
}
}
// Store timeline items from accounts we follow (native storage)
const following = await collections.ap_following.findOne({ actorUrl });
if (following) {
try {
const timelineItem = await extractObjectData(object, {
actorFallback: actorObj,
documentLoader: authLoader,
});
await addTimelineItem(collections, timelineItem);
// Fire-and-forget OG unfurling for notes and articles (not boosts)
if (timelineItem.type === "note" || timelineItem.type === "article") {
fetchAndStorePreviews(collections, timelineItem.uid, timelineItem.content.html)
.catch((error) => {
console.error(`[inbox] OG unfurl failed for ${timelineItem.uid}:`, error);
});
}
// Fire-and-forget quote enrichment
if (timelineItem.quoteUrl) {
fetchAndStoreQuote(collections, timelineItem.uid, timelineItem.quoteUrl, ctx, authLoader)
.catch((error) => {
console.error(`[inbox] Quote fetch failed for ${timelineItem.uid}:`, error.message);
});
}
} catch (error) {
// Log extraction errors but don't fail the entire handler
console.error("Failed to store timeline item:", error);
}
} else if (collections.ap_followed_tags) {
// Not a followed account — check if the post's hashtags match any followed tags
// so tagged posts from across the fediverse appear in the timeline
try {
const objectTags = Array.isArray(object.tag) ? object.tag : (object.tag ? [object.tag] : []);
const postHashtags = objectTags
.filter((t) => t.type === "Hashtag" && t.name)
.map((t) => t.name.toString().replace(/^#/, "").toLowerCase());
if (postHashtags.length > 0) {
const followedTags = await getFollowedTags(collections);
const followedSet = new Set(followedTags.map((t) => t.toLowerCase()));
const hasMatchingTag = postHashtags.some((tag) => followedSet.has(tag));
if (hasMatchingTag) {
const timelineItem = await extractObjectData(object, {
actorFallback: actorObj,
documentLoader: authLoader,
});
await addTimelineItem(collections, timelineItem);
}
}
} catch (error) {
// Non-critical — don't fail the handler
console.error("[inbox] Followed tag check failed:", error.message);
}
}
})
.on(Delete, async (ctx, del) => {
const objectId = del.objectId?.href || "";
if (objectId) {
// Remove from activity log
await collections.ap_activities.deleteMany({ objectUrl: objectId });
// Remove from timeline
await deleteTimelineItem(collections, objectId);
}
})
.on(Move, async (ctx, move) => {
const authLoader = await getAuthLoader(ctx);
const oldActorObj = await move.getActor({ documentLoader: authLoader });
const oldActorUrl = oldActorObj?.id?.href || "";
const target = await move.getTarget({ documentLoader: authLoader });
const newActorUrl = target?.id?.href || "";
if (oldActorUrl && newActorUrl) {
await collections.ap_followers.updateOne(
{ actorUrl: oldActorUrl },
{ $set: { actorUrl: newActorUrl, movedFrom: oldActorUrl } },
);
}
await logActivity(collections, storeRawActivities, {
direction: "inbound",
type: "Move",
actorUrl: oldActorUrl,
objectUrl: newActorUrl,
summary: `${oldActorUrl} moved to ${newActorUrl}`,
await enqueueActivity(collections, {
activityType: "Announce",
actorUrl,
objectUrl: announce.objectId?.href || "",
rawJson: await announce.toJsonLd(),
});
})
.on(Update, async (ctx, update) => {
// Update can be for a profile OR for a post (edited content)
const authLoader = await getAuthLoader(ctx);
// Try to get the object being updated
let object;
try {
object = await update.getObject({ documentLoader: authLoader });
} catch {
object = null;
}
// ── Create ──────────────────────────────────────────────────────
.on(Create, async (ctx, create) => {
const actorUrl = create.actorId?.href || "";
if (await isServerBlocked(actorUrl, collections)) return;
await touchKeyFreshness(collections, actorUrl);
// PATH 1: If object is a Note/Article → Update timeline item content
if (object && (object instanceof Note || object instanceof Article)) {
const objectUrl = object.id?.href || "";
if (objectUrl) {
try {
// Extract updated content
const contentHtml = object.content?.toString() || "";
const contentText = object.source?.content?.toString() || contentHtml.replace(/<[^>]*>/g, "");
const updates = {
content: {
text: contentText,
html: contentHtml,
},
name: object.name?.toString() || "",
summary: object.summary?.toString() || "",
sensitive: object.sensitive || false,
};
await updateTimelineItem(collections, objectUrl, updates);
} catch (error) {
console.error("Failed to update timeline item:", error);
}
}
return;
}
// PATH 2: Otherwise, assume profile update — refresh stored follower data
const actorObj = await update.getActor({ documentLoader: authLoader });
const actorUrl = actorObj?.id?.href || "";
if (!actorUrl) return;
const existing = await collections.ap_followers.findOne({ actorUrl });
if (existing) {
await collections.ap_followers.updateOne(
{ actorUrl },
{
$set: {
name:
actorObj.name?.toString() ||
actorObj.preferredUsername?.toString() ||
actorUrl,
handle: actorObj.preferredUsername?.toString() || "",
avatar: actorObj.icon
? (await actorObj.icon)?.url?.href || ""
: "",
updatedAt: new Date().toISOString(),
},
},
);
}
await enqueueActivity(collections, {
activityType: "Create",
actorUrl,
objectUrl: create.objectId?.href || "",
rawJson: await create.toJsonLd(),
});
})
// ── Delete ──────────────────────────────────────────────────────
.on(Delete, async (ctx, del) => {
const actorUrl = del.actorId?.href || "";
if (await isServerBlocked(actorUrl, collections)) return;
await touchKeyFreshness(collections, actorUrl);
await enqueueActivity(collections, {
activityType: "Delete",
actorUrl,
objectUrl: del.objectId?.href || "",
rawJson: await del.toJsonLd(),
});
})
// ── Move ────────────────────────────────────────────────────────
.on(Move, async (ctx, move) => {
const actorUrl = move.actorId?.href || "";
if (await isServerBlocked(actorUrl, collections)) return;
await touchKeyFreshness(collections, actorUrl);
await enqueueActivity(collections, {
activityType: "Move",
actorUrl,
rawJson: await move.toJsonLd(),
});
})
// ── Update ──────────────────────────────────────────────────────
.on(Update, async (ctx, update) => {
const actorUrl = update.actorId?.href || "";
if (await isServerBlocked(actorUrl, collections)) return;
await touchKeyFreshness(collections, actorUrl);
await enqueueActivity(collections, {
activityType: "Update",
actorUrl,
rawJson: await update.toJsonLd(),
});
})
// ── Block ───────────────────────────────────────────────────────
// Synchronous: remove from followers (immediate)
// Async: activity log
.on(Block, async (ctx, block) => {
// Remote actor blocked us — remove them from followers
const actorUrl = block.actorId?.href || "";
if (await isServerBlocked(actorUrl, collections)) return;
// Synchronous: remove from followers immediately
const authLoader = await getAuthLoader(ctx);
const actorObj = await block.getActor({ documentLoader: authLoader });
const actorUrl = actorObj?.id?.href || "";
if (actorUrl) {
await collections.ap_followers.deleteOne({ actorUrl });
const resolvedUrl = actorObj?.id?.href || "";
if (resolvedUrl) {
await collections.ap_followers.deleteOne({ actorUrl: resolvedUrl });
}
await enqueueActivity(collections, {
activityType: "Block",
actorUrl: resolvedUrl || actorUrl,
rawJson: await block.toJsonLd(),
});
})
.on(Add, async () => {
// Mastodon uses Add for pinning posts to featured collections — safe to ignore
})
.on(Remove, async () => {
// Mastodon uses Remove for unpinning posts from featured collections — safe to ignore
})
// ── Flag (Report) ──────────────────────────────────────────────
// ── Add / Remove (no-ops) ───────────────────────────────────────
.on(Add, async () => {})
.on(Remove, async () => {})
// ── Flag ────────────────────────────────────────────────────────
.on(Flag, async (ctx, flag) => {
try {
const authLoader = await getAuthLoader(ctx);
const actorObj = await flag.getActor({ documentLoader: authLoader }).catch(() => null);
const actorUrl = flag.actorId?.href || "";
if (await isServerBlocked(actorUrl, collections)) return;
await touchKeyFreshness(collections, actorUrl);
const reporterUrl = actorObj?.id?.href || flag.actorId?.href || "";
const reporterName = actorObj?.name?.toString() || actorObj?.preferredUsername?.toString() || reporterUrl;
// Extract reported objects — Flag can report actors or posts
const reportedIds = flag.objectIds?.map((u) => u.href) || [];
const reason = flag.content?.toString() || flag.summary?.toString() || "";
if (reportedIds.length === 0 && !reason) {
console.info("[ActivityPub] Ignoring empty Flag from", reporterUrl);
return;
}
// Store report
if (collections.ap_reports) {
await collections.ap_reports.insertOne({
reporterUrl,
reporterName,
reportedUrls: reportedIds,
reason,
createdAt: new Date().toISOString(),
read: false,
});
}
// Create notification
if (collections.ap_notifications) {
await addNotification(collections, {
uid: `flag:${reporterUrl}:${Date.now()}`,
type: "report",
actorUrl: reporterUrl,
actorName: reporterName,
actorPhoto: actorObj?.iconUrl?.href || actorObj?.icon?.url?.href || "",
actorHandle: actorObj?.preferredUsername
? `@${actorObj.preferredUsername}@${new URL(reporterUrl).hostname}`
: reporterUrl,
objectUrl: reportedIds[0] || "",
summary: reason ? reason.slice(0, 200) : "Report received",
published: new Date().toISOString(),
createdAt: new Date().toISOString(),
});
}
await logActivity(collections, storeRawActivities, {
direction: "inbound",
type: "Flag",
actorUrl: reporterUrl,
objectUrl: reportedIds[0] || "",
summary: `Report from ${reporterName}: ${reason.slice(0, 100)}`,
});
console.info(`[ActivityPub] Flag received from ${reporterName}${reportedIds.length} objects reported`);
} catch (error) {
console.warn("[ActivityPub] Flag handler error:", error.message);
}
await enqueueActivity(collections, {
activityType: "Flag",
actorUrl,
rawJson: await flag.toJsonLd(),
});
});
}
/**
* Log an activity to the ap_activities collection.
* Wrapper around the shared utility that accepts the (collections, storeRaw, record) signature
* used throughout this file.
*/
async function logActivity(collections, storeRaw, record, rawJson) {
await logActivityShared(
collections.ap_activities,
record,
storeRaw && rawJson ? { rawJson } : {},
);
}

99
lib/inbox-queue.js Normal file
View File

@@ -0,0 +1,99 @@
/**
* MongoDB-backed inbox processing queue.
* Runs a setInterval-based processor that dequeues and processes
* one activity at a time from ap_inbox_queue.
* @module inbox-queue
*/
import { routeToHandler } from "./inbox-handlers.js";
/**
* Process the next pending item from the inbox queue.
* Uses findOneAndUpdate for atomic claim (prevents double-processing).
*
* @param {object} collections - MongoDB collections
* @param {object} ctx - Fedify context
* @param {string} handle - Our actor handle
*/
async function processNextItem(collections, ctx, handle) {
const { ap_inbox_queue } = collections;
if (!ap_inbox_queue) return;
const item = await ap_inbox_queue.findOneAndUpdate(
{ status: "pending" },
{ $set: { status: "processing" } },
{ sort: { receivedAt: 1 }, returnDocument: "after" },
);
if (!item) return;
try {
await routeToHandler(item, collections, ctx, handle);
await ap_inbox_queue.updateOne(
{ _id: item._id },
{ $set: { status: "completed", processedAt: new Date().toISOString() } },
);
} catch (error) {
const attempts = (item.attempts || 0) + 1;
await ap_inbox_queue.updateOne(
{ _id: item._id },
{
$set: {
status: attempts >= (item.maxAttempts || 3) ? "failed" : "pending",
attempts,
error: error.message,
},
},
);
console.error(`[inbox-queue] Failed processing ${item.activityType} from ${item.actorUrl}: ${error.message}`);
}
}
/**
* Enqueue an activity for async processing.
* @param {object} collections - MongoDB collections
* @param {object} params
* @param {string} params.activityType - Activity type name
* @param {string} params.actorUrl - Actor URL
* @param {string} [params.objectUrl] - Object URL
* @param {object} params.rawJson - Full activity JSON-LD
*/
export async function enqueueActivity(collections, { activityType, actorUrl, objectUrl, rawJson }) {
const { ap_inbox_queue } = collections;
if (!ap_inbox_queue) return;
await ap_inbox_queue.insertOne({
activityType,
actorUrl: actorUrl || "",
objectUrl: objectUrl || "",
rawJson,
status: "pending",
attempts: 0,
maxAttempts: 3,
receivedAt: new Date().toISOString(),
processedAt: null,
error: null,
});
}
/**
* Start the background inbox processor.
* @param {object} collections - MongoDB collections
* @param {Function} getCtx - Function returning a Fedify context
* @param {string} handle - Our actor handle
* @returns {NodeJS.Timeout} Interval ID (for cleanup)
*/
export function startInboxProcessor(collections, getCtx, handle) {
const intervalId = setInterval(async () => {
try {
const ctx = getCtx();
if (ctx) {
await processNextItem(collections, ctx, handle);
}
} catch (error) {
console.error("[inbox-queue] Processor error:", error.message);
}
}, 3_000); // Every 3 seconds
console.info("[ActivityPub] Inbox queue processor started (3s interval)");
return intervalId;
}

138
lib/key-refresh.js Normal file
View File

@@ -0,0 +1,138 @@
/**
* Proactive key refresh for remote actors.
* Periodically re-fetches actor documents for active followers
* whose keys may have rotated, keeping Fedify's KV cache fresh.
* @module key-refresh
*/
import { lookupWithSecurity } from "./lookup-helpers.js";
/**
* Update key freshness tracking after successfully processing
* an activity from a remote actor.
* @param {object} collections - MongoDB collections
* @param {string} actorUrl - Remote actor URL
*/
export async function touchKeyFreshness(collections, actorUrl) {
if (!actorUrl || !collections.ap_key_freshness) return;
try {
await collections.ap_key_freshness.updateOne(
{ actorUrl },
{
$set: { lastSeenAt: new Date().toISOString() },
$setOnInsert: { lastRefreshedAt: new Date().toISOString() },
},
{ upsert: true },
);
} catch {
// Non-critical
}
}
/**
* Refresh stale keys for active followers.
* Finds followers whose keys haven't been refreshed in 7+ days
* and re-fetches their actor documents (up to 10 per cycle).
*
* @param {object} collections - MongoDB collections
* @param {object} ctx - Fedify context (for lookupObject)
* @param {string} handle - Our actor handle
*/
export async function refreshStaleKeys(collections, ctx, handle) {
if (!collections.ap_key_freshness || !collections.ap_followers) return;
const sevenDaysAgo = new Date(Date.now() - 7 * 86_400_000).toISOString();
// Find actors with stale keys who are still our followers
const staleActors = await collections.ap_key_freshness
.aggregate([
{
$match: {
lastRefreshedAt: { $lt: sevenDaysAgo },
},
},
{
$lookup: {
from: "ap_followers",
localField: "actorUrl",
foreignField: "actorUrl",
as: "follower",
},
},
{ $match: { "follower.0": { $exists: true } } },
{ $limit: 10 },
])
.toArray();
if (staleActors.length === 0) return;
console.info(`[ActivityPub] Refreshing keys for ${staleActors.length} stale actors`);
const documentLoader = await ctx.getDocumentLoader({ identifier: handle });
for (const entry of staleActors) {
try {
const result = await lookupWithSecurity(ctx, new URL(entry.actorUrl), {
documentLoader,
});
await collections.ap_key_freshness.updateOne(
{ actorUrl: entry.actorUrl },
{ $set: { lastRefreshedAt: new Date().toISOString() } },
);
if (!result) {
// Actor gone — log as stale
await collections.ap_activities?.insertOne({
direction: "system",
type: "StaleActor",
actorUrl: entry.actorUrl,
summary: `Actor ${entry.actorUrl} could not be resolved during key refresh`,
receivedAt: new Date().toISOString(),
});
}
} catch (error) {
const status = error?.cause?.status || error?.message || "unknown";
if (status === 410 || String(status).includes("410")) {
// 410 Gone — actor deleted
await collections.ap_activities?.insertOne({
direction: "system",
type: "StaleActor",
actorUrl: entry.actorUrl,
summary: `Actor ${entry.actorUrl} returned 410 Gone during key refresh`,
receivedAt: new Date().toISOString(),
});
}
// Update lastRefreshedAt even on failure to avoid retrying every cycle
await collections.ap_key_freshness.updateOne(
{ actorUrl: entry.actorUrl },
{ $set: { lastRefreshedAt: new Date().toISOString() } },
);
}
}
}
/**
* Schedule key refresh job (runs on startup + every 24h).
* @param {object} collections - MongoDB collections
* @param {Function} getCtx - Function returning a Fedify context
* @param {string} handle - Our actor handle
*/
export function scheduleKeyRefresh(collections, getCtx, handle) {
const run = async () => {
try {
const ctx = getCtx();
if (ctx) {
await refreshStaleKeys(collections, ctx, handle);
}
} catch (error) {
console.error("[ActivityPub] Key refresh failed:", error.message);
}
};
// Run once on startup (delayed to let federation initialize)
setTimeout(run, 30_000);
// Then every 24 hours
setInterval(run, 86_400_000);
}

View File

@@ -96,3 +96,19 @@ export async function cacheExists(key) {
return false;
}
}
/**
* Cache-aside wrapper for query functions.
* Returns cached result if available, otherwise runs queryFn and caches result.
* @param {string} key - Cache key (without prefix — cacheGet/cacheSet add it)
* @param {number} ttlSeconds - TTL in seconds
* @param {Function} queryFn - Async function to run on cache miss
* @returns {Promise<unknown>}
*/
export async function cachedQuery(key, ttlSeconds, queryFn) {
const cached = await cacheGet(key);
if (cached !== null) return cached;
const result = await queryFn();
await cacheSet(key, result, ttlSeconds);
return result;
}

View File

@@ -0,0 +1,121 @@
/**
* Server-level blocking storage operations.
* Blocks entire instances by hostname, checked in inbox listeners
* before any expensive work is done.
* @module storage/server-blocks
*/
import { getRedisClient } from "../redis-cache.js";
const REDIS_KEY = "indiekit:blocked_servers";
/**
* Add a server block by hostname.
* @param {object} collections - MongoDB collections
* @param {string} hostname - Hostname to block (lowercase, no protocol)
* @param {string} [reason] - Optional admin note
*/
export async function addBlockedServer(collections, hostname, reason) {
const { ap_blocked_servers } = collections;
const normalized = hostname.toLowerCase().trim();
await ap_blocked_servers.updateOne(
{ hostname: normalized },
{
$setOnInsert: {
hostname: normalized,
blockedAt: new Date().toISOString(),
...(reason ? { reason } : {}),
},
},
{ upsert: true },
);
// Incremental Redis update
const redis = getRedisClient();
if (redis) {
try {
await redis.sadd(REDIS_KEY, normalized);
} catch {
// Non-critical
}
}
}
/**
* Remove a server block by hostname.
* @param {object} collections - MongoDB collections
* @param {string} hostname - Hostname to unblock
*/
export async function removeBlockedServer(collections, hostname) {
const { ap_blocked_servers } = collections;
const normalized = hostname.toLowerCase().trim();
await ap_blocked_servers.deleteOne({ hostname: normalized });
const redis = getRedisClient();
if (redis) {
try {
await redis.srem(REDIS_KEY, normalized);
} catch {
// Non-critical
}
}
}
/**
* Get all blocked servers.
* @param {object} collections - MongoDB collections
* @returns {Promise<object[]>} Array of block entries
*/
export async function getAllBlockedServers(collections) {
const { ap_blocked_servers } = collections;
return await ap_blocked_servers.find({}).sort({ blockedAt: -1 }).toArray();
}
/**
* Check if a server is blocked by actor URL.
* Uses Redis Set (O(1)) with MongoDB fallback.
* @param {string} actorUrl - Full actor URL
* @param {object} collections - MongoDB collections (fallback only)
* @returns {Promise<boolean>}
*/
export async function isServerBlocked(actorUrl, collections) {
if (!actorUrl) return false;
try {
const hostname = new URL(actorUrl).hostname.toLowerCase();
const redis = getRedisClient();
if (redis) {
return (await redis.sismember(REDIS_KEY, hostname)) === 1;
}
// Fallback: direct MongoDB check
const { ap_blocked_servers } = collections;
return !!(await ap_blocked_servers.findOne({ hostname }));
} catch {
return false;
}
}
/**
* Load all blocked hostnames into Redis Set on startup.
* Replaces existing set contents entirely.
* @param {object} collections - MongoDB collections
*/
export async function loadBlockedServersToRedis(collections) {
const redis = getRedisClient();
if (!redis) return;
try {
const { ap_blocked_servers } = collections;
const docs = await ap_blocked_servers.find({}).toArray();
const hostnames = docs.map((d) => d.hostname);
// Replace: delete existing set, then add all
await redis.del(REDIS_KEY);
if (hostnames.length > 0) {
await redis.sadd(REDIS_KEY, ...hostnames);
}
} catch {
// Non-critical — isServerBlocked falls back to MongoDB
}
}

View File

@@ -26,6 +26,38 @@
</div>
</section>
{# Blocked servers #}
<section class="ap-moderation__section">
<h2>Blocked Servers</h2>
<p class="ap-moderation__hint">Block entire instances by hostname. Activities from blocked servers are rejected before any processing.</p>
{% if blockedServers and blockedServers.length > 0 %}
<ul class="ap-moderation__list" x-ref="serverList">
{% for entry in blockedServers %}
<li class="ap-moderation__entry" data-hostname="{{ entry.hostname }}">
<code>{{ entry.hostname }}</code>
{% if entry.reason %}<span class="ap-moderation__reason">({{ entry.reason }})</span>{% endif %}
<button class="ap-moderation__remove"
@click="removeEntry($el, 'unblock-server', { hostname: $el.closest('li').dataset.hostname })">
Unblock
</button>
</li>
{% endfor %}
</ul>
{% else %}
<p class="ap-moderation__empty" x-ref="serverEmpty">No servers blocked.</p>
{% endif %}
<form class="ap-moderation__add-form" @submit.prevent="addBlockedServer()">
<input type="text" x-model="newServerHostname"
placeholder="spam.instance.social"
class="ap-moderation__input"
x-ref="serverInput">
<button type="submit" :disabled="submitting" class="ap-moderation__add-btn">
Block Server
</button>
</form>
</section>
{# Blocked actors #}
<section class="ap-moderation__section">
<h2>{{ __("activitypub.moderation.blockedTitle") }}</h2>
@@ -108,6 +140,7 @@
document.addEventListener('alpine:init', () => {
Alpine.data('moderationPage', () => ({
newKeyword: '',
newServerHostname: '',
submitting: false,
error: '',
@@ -157,6 +190,50 @@
this.submitting = false;
},
async addBlockedServer() {
const hostname = this.newServerHostname.trim();
if (!hostname) return;
this.submitting = true;
this.error = '';
try {
const res = await fetch(this.mountPath + '/admin/reader/block-server', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRF-Token': this.csrfToken,
},
body: JSON.stringify({ hostname }),
});
const data = await res.json();
if (data.success) {
const list = this.$refs.serverList;
if (list) {
const li = document.createElement('li');
li.className = 'ap-moderation__entry';
li.dataset.hostname = hostname;
const code = document.createElement('code');
code.textContent = hostname;
const btn = document.createElement('button');
btn.className = 'ap-moderation__remove';
btn.textContent = 'Unblock';
btn.addEventListener('click', () => {
this.removeEntry(btn, 'unblock-server', { hostname });
});
li.append(code, btn);
list.appendChild(li);
}
if (this.$refs.serverEmpty) this.$refs.serverEmpty.remove();
this.newServerHostname = '';
this.$refs.serverInput.focus();
} else {
this.error = data.error || 'Failed to block server';
}
} catch (e) {
this.error = 'Request failed';
}
this.submitting = false;
},
async removeEntry(el, action, payload) {
const li = el.closest('li');
if (!li) return;