Files
indiekit-endpoint-activitypub/lib/batch-refollow.js
Ricardo 9a61145d97 feat: FEP-8fcf/fe34 compliance, custom emoji, manual follow approval (v2.13.0)
- FEP-8fcf: add syncCollection to Undo(Announce) sendActivity
- FEP-fe34: centralized lookupWithSecurity() helper with crossOrigin: "ignore" on all 23 lookupObject call sites
- Custom emoji: replaceCustomEmoji() renders :shortcode: as inline <img> in content and actor display names
- Manual follow approval: profile toggle, ap_pending_follows collection, approve/reject controllers with federation, pending tab on followers page, follow_request notification type
- Coverage audit updated to v2.12.x (overall ~70% → ~82%)

Confab-Link: http://localhost:8080/sessions/1f1e729b-0087-499e-a991-f36f46211fe4
2026-03-17 08:21:36 +01:00

335 lines
9.5 KiB
JavaScript

import { lookupWithSecurity } from "./lookup-helpers.js";
/**
* Batch re-follow processor for imported accounts.
*
* After a Mastodon migration, imported accounts (source: "import") exist only
* locally — no Follow activities were sent. This module gradually sends Follow
* activities to all imported accounts so remote servers start delivering
* Create activities to our inbox.
*
* Source field state machine:
* import → refollow:sent → federation (happy path)
* import → refollow:sent → refollow:failed (after MAX_RETRIES)
*/
import { Follow } from "@fedify/fedify/vocab";
import { logActivity } from "./activity-log.js";
import { cacheGet, cacheSet } from "./redis-cache.js";
const BATCH_SIZE = 10;
const DELAY_PER_FOLLOW = 3_000;
const DELAY_BETWEEN_BATCHES = 30_000;
const STARTUP_DELAY = 30_000;
const RETRY_COOLDOWN = 60 * 60 * 1_000; // 1 hour
const MAX_RETRIES = 3;
const KV_KEY = "batch-refollow/state";
let _timer = null;
/**
* Start the batch re-follow processor.
*
* @param {object} options
* @param {import("@fedify/fedify").Federation} options.federation
* @param {object} options.collections - MongoDB collections
* @param {string} options.handle - Actor handle
* @param {string} options.publicationUrl - Publication base URL
*/
export async function startBatchRefollow(options) {
const { collections } = options;
// Restart recovery: reset any stale "refollow:pending" back to "import"
await collections.ap_following.updateMany(
{ source: "refollow:pending" },
{ $set: { source: "import" } },
);
// Check if there's work to do
const importCount = await collections.ap_following.countDocuments({
source: "import",
});
if (importCount === 0) {
console.info("[ActivityPub] Batch refollow: no imported accounts to process");
return;
}
console.info(
`[ActivityPub] Batch refollow: ${importCount} imported accounts to process`,
);
// Set job state to running
await setJobState("running");
// Schedule first batch after startup delay
_timer = setTimeout(() => processNextBatch(options), STARTUP_DELAY);
}
/**
* Pause the batch re-follow processor.
*
* @param {object} collections - MongoDB collections
*/
export async function pauseBatchRefollow(collections) {
if (_timer) {
clearTimeout(_timer);
_timer = null;
}
// Reset any pending back to import so they get picked up on resume
await collections.ap_following.updateMany(
{ source: "refollow:pending" },
{ $set: { source: "import" } },
);
await setJobState("paused");
console.info("[ActivityPub] Batch refollow: paused");
}
/**
* Resume the batch re-follow processor.
*
* @param {object} options
* @param {import("@fedify/fedify").Federation} options.federation
* @param {object} options.collections - MongoDB collections
* @param {string} options.handle - Actor handle
* @param {string} options.publicationUrl - Publication base URL
*/
export async function resumeBatchRefollow(options) {
if (_timer) {
clearTimeout(_timer);
_timer = null;
}
await setJobState("running");
_timer = setTimeout(() => processNextBatch(options), DELAY_BETWEEN_BATCHES);
console.info("[ActivityPub] Batch refollow: resumed");
}
/**
* Get current batch re-follow status.
*
* @param {object} collections - MongoDB collections
* @returns {Promise<object>} Status object
*/
export async function getBatchRefollowStatus(collections) {
const state = await cacheGet(KV_KEY);
const status = state?.status || "idle";
const [remaining, sent, failed, federated] = await Promise.all([
collections.ap_following.countDocuments({ source: "import" }),
collections.ap_following.countDocuments({ source: "refollow:sent" }),
collections.ap_following.countDocuments({ source: "refollow:failed" }),
collections.ap_following.countDocuments({ source: "federation" }),
]);
// Include federated in totals — accounts transition from refollow:sent
// to federation when Accept arrives, so they must stay in the math
const total = remaining + sent + failed + federated;
const completed = sent + failed + federated;
const progressPercent =
total > 0 ? Math.round((completed / total) * 100) : 100;
return {
status,
total,
remaining,
sent,
failed,
federated,
completed,
progressPercent,
startedAt: state?.startedAt || null,
updatedAt: state?.updatedAt || null,
};
}
// --- Internal helpers ---
/**
* Process the next batch of imported accounts.
*/
async function processNextBatch(options) {
const { federation, collections, handle, publicationUrl } = options;
_timer = null;
const state = await cacheGet(KV_KEY);
if (state?.status !== "running") return;
// Claim a batch atomically: set source to "refollow:pending"
const entries = [];
for (let i = 0; i < BATCH_SIZE; i++) {
const doc = await collections.ap_following.findOneAndUpdate(
{ source: "import" },
{ $set: { source: "refollow:pending" } },
{ returnDocument: "after" },
);
if (!doc) break;
entries.push(doc);
}
// Also pick up retryable entries (failed but not permanently)
const retryCutoff = new Date(Date.now() - RETRY_COOLDOWN).toISOString();
const retrySlots = BATCH_SIZE - entries.length;
for (let i = 0; i < retrySlots; i++) {
const doc = await collections.ap_following.findOneAndUpdate(
{
source: "refollow:sent",
refollowAttempts: { $lt: MAX_RETRIES },
refollowLastAttempt: { $lt: retryCutoff },
},
{ $set: { source: "refollow:pending" } },
{ returnDocument: "after" },
);
if (!doc) break;
entries.push(doc);
}
if (entries.length === 0) {
// Check if there are still sent entries awaiting Accept
const pendingAccepts = await collections.ap_following.countDocuments({
source: "refollow:sent",
});
if (pendingAccepts > 0) {
console.info(
`[ActivityPub] Batch refollow: all sent, ${pendingAccepts} awaiting Accept`,
);
}
await setJobState("completed");
console.info("[ActivityPub] Batch refollow: completed");
return;
}
console.info(
`[ActivityPub] Batch refollow: processing batch of ${entries.length}`,
);
for (const entry of entries) {
await processOneFollow(options, entry);
// Delay between individual follows
await sleep(DELAY_PER_FOLLOW);
}
// Update job state timestamp
await setJobState("running");
// Schedule next batch
_timer = setTimeout(() => processNextBatch(options), DELAY_BETWEEN_BATCHES);
}
/**
* Send a Follow activity for a single imported account.
*/
async function processOneFollow(options, entry) {
const { federation, collections, handle, publicationUrl } = options;
try {
const ctx = federation.createContext(new URL(publicationUrl), { handle, publicationUrl });
// Resolve the remote actor (signed request for Authorized Fetch)
const documentLoader = await ctx.getDocumentLoader({
identifier: handle,
});
const remoteActor = await lookupWithSecurity(ctx,entry.actorUrl, {
documentLoader,
});
if (!remoteActor) {
throw new Error("Could not resolve remote actor");
}
// Use the canonical actor URL (may differ from imported URL)
const canonicalUrl = remoteActor.id?.href || entry.actorUrl;
// Send Follow activity using canonical URL
const follow = new Follow({
actor: ctx.getActorUri(handle),
object: new URL(canonicalUrl),
});
await ctx.sendActivity({ identifier: handle }, remoteActor, follow, {
orderingKey: canonicalUrl,
});
// Mark as sent — update actorUrl to canonical form so Accept handler
// can match when the remote server responds
const updateFields = {
source: "refollow:sent",
refollowLastAttempt: new Date().toISOString(),
refollowError: null,
};
if (canonicalUrl !== entry.actorUrl) {
updateFields.actorUrl = canonicalUrl;
}
await collections.ap_following.updateOne(
{ _id: entry._id },
{
$set: updateFields,
$inc: { refollowAttempts: 1 },
},
);
console.info(
`[ActivityPub] Batch refollow: sent Follow to ${entry.actorUrl}`,
);
await logActivity(collections.ap_activities, {
direction: "outbound",
type: "Follow",
actorUrl: publicationUrl,
objectUrl: entry.actorUrl,
actorName: entry.name || entry.actorUrl,
summary: `Batch refollow: sent Follow to ${entry.name || entry.actorUrl}`,
});
} catch (error) {
const attempts = (entry.refollowAttempts || 0) + 1;
const newSource =
attempts >= MAX_RETRIES ? "refollow:failed" : "refollow:sent";
await collections.ap_following.updateOne(
{ _id: entry._id },
{
$set: {
source: newSource,
refollowLastAttempt: new Date().toISOString(),
refollowError: error.message,
},
$inc: { refollowAttempts: 1 },
},
);
console.warn(
`[ActivityPub] Batch refollow: failed for ${entry.actorUrl} (attempt ${attempts}/${MAX_RETRIES}): ${error.message}`,
);
}
}
/**
* Set the batch re-follow job state in Redis.
*/
async function setJobState(status) {
const now = new Date().toISOString();
const existing = (await cacheGet(KV_KEY)) || {};
const newState = {
...existing,
status,
updatedAt: now,
};
// Only set startedAt on initial start or resume
if (!existing.startedAt || (status === "running" && existing.status !== "running")) {
newState.startedAt = now;
}
await cacheSet(KV_KEY, newState);
}
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}