Files
indiekit-endpoint-activitypub/lib/inbox-queue.js
Ricardo 1567b7c4e5 feat: operational resilience hardening — server blocking, caching, key refresh, async inbox (v2.14.0)
- Server-level blocking: O(1) Redis SISMEMBER check in all inbox listeners,
  admin UI for blocking/unblocking servers by hostname, MongoDB fallback
- Redis caching for collection dispatchers: 300s TTL on followers/following/liked
  counters and paginated pages, one-shot followers recipients cache
- Proactive key refresh: daily cron re-fetches actor documents for followers
  with 7+ day stale keys using lookupWithSecurity()
- Async inbox processing: MongoDB-backed queue with 3s polling, retry (3 attempts),
  24h TTL auto-prune. Follow keeps synchronous Accept, Block keeps synchronous
  follower removal. All other activity types fully deferred to background processor.

Inspired by wafrn's battle-tested multi-user AP implementation.

Confab-Link: http://localhost:8080/sessions/af5f8b45-6b8d-442d-8f25-78c326190709
2026-03-17 09:16:05 +01:00

100 lines
3.0 KiB
JavaScript

/**
* MongoDB-backed inbox processing queue.
* Runs a setInterval-based processor that dequeues and processes
* one activity at a time from ap_inbox_queue.
* @module inbox-queue
*/
import { routeToHandler } from "./inbox-handlers.js";
/**
* Process the next pending item from the inbox queue.
* Uses findOneAndUpdate for atomic claim (prevents double-processing).
*
* @param {object} collections - MongoDB collections
* @param {object} ctx - Fedify context
* @param {string} handle - Our actor handle
*/
async function processNextItem(collections, ctx, handle) {
const { ap_inbox_queue } = collections;
if (!ap_inbox_queue) return;
const item = await ap_inbox_queue.findOneAndUpdate(
{ status: "pending" },
{ $set: { status: "processing" } },
{ sort: { receivedAt: 1 }, returnDocument: "after" },
);
if (!item) return;
try {
await routeToHandler(item, collections, ctx, handle);
await ap_inbox_queue.updateOne(
{ _id: item._id },
{ $set: { status: "completed", processedAt: new Date().toISOString() } },
);
} catch (error) {
const attempts = (item.attempts || 0) + 1;
await ap_inbox_queue.updateOne(
{ _id: item._id },
{
$set: {
status: attempts >= (item.maxAttempts || 3) ? "failed" : "pending",
attempts,
error: error.message,
},
},
);
console.error(`[inbox-queue] Failed processing ${item.activityType} from ${item.actorUrl}: ${error.message}`);
}
}
/**
* Enqueue an activity for async processing.
* @param {object} collections - MongoDB collections
* @param {object} params
* @param {string} params.activityType - Activity type name
* @param {string} params.actorUrl - Actor URL
* @param {string} [params.objectUrl] - Object URL
* @param {object} params.rawJson - Full activity JSON-LD
*/
export async function enqueueActivity(collections, { activityType, actorUrl, objectUrl, rawJson }) {
const { ap_inbox_queue } = collections;
if (!ap_inbox_queue) return;
await ap_inbox_queue.insertOne({
activityType,
actorUrl: actorUrl || "",
objectUrl: objectUrl || "",
rawJson,
status: "pending",
attempts: 0,
maxAttempts: 3,
receivedAt: new Date().toISOString(),
processedAt: null,
error: null,
});
}
/**
* Start the background inbox processor.
* @param {object} collections - MongoDB collections
* @param {Function} getCtx - Function returning a Fedify context
* @param {string} handle - Our actor handle
* @returns {NodeJS.Timeout} Interval ID (for cleanup)
*/
export function startInboxProcessor(collections, getCtx, handle) {
const intervalId = setInterval(async () => {
try {
const ctx = getCtx();
if (ctx) {
await processNextItem(collections, ctx, handle);
}
} catch (error) {
console.error("[inbox-queue] Processor error:", error.message);
}
}, 3_000); // Every 3 seconds
console.info("[ActivityPub] Inbox queue processor started (3s interval)");
return intervalId;
}