mirror of
https://github.com/svemagie/indiekit-endpoint-microsub.git
synced 2026-04-02 15:35:00 +02:00
feat: add ActivityPub integration - actor profiles, follow/unfollow, timeline items
- Add actor profile page with outbox fetcher for viewing AP actor posts - Add follow/unfollow buttons on actor profile (delegates to AP plugin) - Add AP actor link on item cards for posts from ActivityPub sources - Add ensureActivityPubChannel() for auto-creating Fediverse channel - Add AP-aware item storage with dedup, attachments, and categories - Add CSS styles for actor profile cards and AP-specific UI elements - Bump version to 1.0.31
This commit is contained in:
255
lib/activitypub/outbox-fetcher.js
Normal file
255
lib/activitypub/outbox-fetcher.js
Normal file
@@ -0,0 +1,255 @@
|
||||
/**
|
||||
* Fetch a remote ActivityPub actor's outbox for on-demand reading.
|
||||
* Returns ephemeral jf2 items — nothing is stored in MongoDB.
|
||||
*
|
||||
* @module activitypub/outbox-fetcher
|
||||
*/
|
||||
|
||||
const AP_ACCEPT =
|
||||
'application/activity+json, application/ld+json; profile="https://www.w3.org/ns/activitystreams"';
|
||||
const FETCH_TIMEOUT = 10_000;
|
||||
const USER_AGENT = "Indiekit/1.0 (Microsub reader)";
|
||||
|
||||
/**
|
||||
* Fetch a remote actor's profile and recent posts from their outbox.
|
||||
*
|
||||
* @param {string} actorUrl - Full URL of the AP actor
|
||||
* @param {object} [options]
|
||||
* @param {number} [options.limit=20] - Max items to return
|
||||
* @returns {Promise<{ actor: object, items: Array }>}
|
||||
*/
|
||||
export async function fetchActorOutbox(actorUrl, options = {}) {
|
||||
const limit = options.limit || 20;
|
||||
|
||||
// 1. Fetch actor profile
|
||||
const actor = await fetchJson(actorUrl);
|
||||
if (!actor || !actor.outbox) {
|
||||
throw new Error("Could not resolve actor or outbox URL");
|
||||
}
|
||||
|
||||
const actorInfo = {
|
||||
name:
|
||||
actor.name ||
|
||||
actor.preferredUsername ||
|
||||
new URL(actorUrl).pathname.split("/").pop(),
|
||||
url: actor.url || actor.id || actorUrl,
|
||||
photo: actor.icon?.url || actor.icon || "",
|
||||
summary: stripHtml(actor.summary || ""),
|
||||
handle: actor.preferredUsername || "",
|
||||
followersCount: 0,
|
||||
followingCount: 0,
|
||||
};
|
||||
|
||||
// Resolve follower/following counts if available
|
||||
if (typeof actor.followers === "string") {
|
||||
try {
|
||||
const followersCollection = await fetchJson(actor.followers);
|
||||
actorInfo.followersCount = followersCollection?.totalItems || 0;
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
if (typeof actor.following === "string") {
|
||||
try {
|
||||
const followingCollection = await fetchJson(actor.following);
|
||||
actorInfo.followingCount = followingCollection?.totalItems || 0;
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Fetch outbox (OrderedCollection)
|
||||
const outboxUrl =
|
||||
typeof actor.outbox === "string" ? actor.outbox : actor.outbox?.id;
|
||||
const outbox = await fetchJson(outboxUrl);
|
||||
if (!outbox) {
|
||||
return { actor: actorInfo, items: [] };
|
||||
}
|
||||
|
||||
// 3. Get items — may be inline or on a first page
|
||||
let activities = [];
|
||||
|
||||
if (outbox.orderedItems?.length > 0) {
|
||||
activities = outbox.orderedItems;
|
||||
} else if (outbox.first) {
|
||||
const firstPageUrl =
|
||||
typeof outbox.first === "string" ? outbox.first : outbox.first?.id;
|
||||
if (firstPageUrl) {
|
||||
const firstPage = await fetchJson(firstPageUrl);
|
||||
activities = firstPage?.orderedItems || firstPage?.items || [];
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Convert Create activities to jf2 items
|
||||
const items = [];
|
||||
for (const activity of activities) {
|
||||
if (items.length >= limit) break;
|
||||
|
||||
const item = activityToJf2(activity, actorInfo);
|
||||
if (item) items.push(item);
|
||||
}
|
||||
|
||||
return { actor: actorInfo, items };
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a single AP activity (or bare object) to jf2 format.
|
||||
* @param {object} activity - AP activity or object
|
||||
* @param {object} actorInfo - Actor profile info
|
||||
* @returns {object|null} jf2 item or null if not displayable
|
||||
*/
|
||||
function activityToJf2(activity, actorInfo) {
|
||||
// Unwrap Create/Announce — the displayable content is the inner object
|
||||
let object = activity;
|
||||
const activityType = activity.type;
|
||||
|
||||
if (activityType === "Create" || activityType === "Announce") {
|
||||
object = activity.object;
|
||||
if (!object || typeof object === "string") return null; // Unresolved reference
|
||||
}
|
||||
|
||||
// Skip non-content types (Follow, Like, etc.)
|
||||
const contentTypes = new Set([
|
||||
"Note",
|
||||
"Article",
|
||||
"Page",
|
||||
"Video",
|
||||
"Audio",
|
||||
"Image",
|
||||
"Event",
|
||||
"Question",
|
||||
]);
|
||||
if (!contentTypes.has(object.type)) return null;
|
||||
|
||||
const contentHtml = object.content || "";
|
||||
const contentText = stripHtml(contentHtml);
|
||||
|
||||
const jf2 = {
|
||||
type: "entry",
|
||||
url: object.url || object.id || "",
|
||||
uid: object.id || object.url || "",
|
||||
name: object.name || undefined,
|
||||
content: contentHtml ? { text: contentText, html: contentHtml } : undefined,
|
||||
summary: object.summary ? stripHtml(object.summary) : undefined,
|
||||
published: object.published || activity.published || undefined,
|
||||
author: {
|
||||
name: actorInfo.name,
|
||||
url: actorInfo.url,
|
||||
photo: actorInfo.photo,
|
||||
},
|
||||
category: extractTags(object.tag),
|
||||
photo: extractMedia(object.attachment, "image"),
|
||||
video: extractMedia(object.attachment, "video"),
|
||||
audio: extractMedia(object.attachment, "audio"),
|
||||
_source: { type: "activitypub", actorUrl: actorInfo.url },
|
||||
};
|
||||
|
||||
// Boost attribution
|
||||
if (activityType === "Announce" && activity.actor) {
|
||||
jf2._boostedBy = actorInfo;
|
||||
// The inner object may have its own author
|
||||
if (object.attributedTo) {
|
||||
const attributedUrl =
|
||||
typeof object.attributedTo === "string"
|
||||
? object.attributedTo
|
||||
: object.attributedTo?.id || object.attributedTo?.url;
|
||||
if (attributedUrl) {
|
||||
jf2.author = {
|
||||
name:
|
||||
object.attributedTo?.name ||
|
||||
object.attributedTo?.preferredUsername ||
|
||||
attributedUrl,
|
||||
url: attributedUrl,
|
||||
photo: object.attributedTo?.icon?.url || "",
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (object.inReplyTo) {
|
||||
const replyUrl =
|
||||
typeof object.inReplyTo === "string"
|
||||
? object.inReplyTo
|
||||
: object.inReplyTo?.id;
|
||||
if (replyUrl) jf2["in-reply-to"] = [replyUrl];
|
||||
}
|
||||
|
||||
return jf2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract hashtags from AP tag array.
|
||||
* @param {Array} tags - AP tag objects
|
||||
* @returns {Array<string>}
|
||||
*/
|
||||
function extractTags(tags) {
|
||||
if (!Array.isArray(tags)) return [];
|
||||
return tags
|
||||
.filter((t) => t.type === "Hashtag" || t.type === "Tag")
|
||||
.map((t) => (t.name || "").replace(/^#/, ""))
|
||||
.filter(Boolean);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract media URLs from AP attachment array.
|
||||
* @param {Array} attachments - AP attachment objects
|
||||
* @param {string} mediaPrefix - "image", "video", or "audio"
|
||||
* @returns {Array<string>}
|
||||
*/
|
||||
function extractMedia(attachments, mediaPrefix) {
|
||||
if (!Array.isArray(attachments)) return [];
|
||||
return attachments
|
||||
.filter((a) => (a.mediaType || "").startsWith(`${mediaPrefix}/`))
|
||||
.map((a) => a.url || a.href || "")
|
||||
.filter(Boolean);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch a URL as ActivityPub JSON.
|
||||
* @param {string} url
|
||||
* @returns {Promise<object|null>}
|
||||
*/
|
||||
async function fetchJson(url) {
|
||||
if (!url) return null;
|
||||
|
||||
const controller = new AbortController();
|
||||
const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT);
|
||||
|
||||
try {
|
||||
const response = await fetch(url, {
|
||||
headers: {
|
||||
Accept: AP_ACCEPT,
|
||||
"User-Agent": USER_AGENT,
|
||||
},
|
||||
signal: controller.signal,
|
||||
redirect: "follow",
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
console.warn(
|
||||
`[Microsub] AP fetch failed: ${response.status} for ${url}`,
|
||||
);
|
||||
return null;
|
||||
}
|
||||
|
||||
return await response.json();
|
||||
} catch (error) {
|
||||
if (error.name === "AbortError") {
|
||||
console.warn(`[Microsub] AP fetch timeout for ${url}`);
|
||||
} else {
|
||||
console.warn(`[Microsub] AP fetch error for ${url}: ${error.message}`);
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Strip HTML tags for plain text.
|
||||
* @param {string} html
|
||||
* @returns {string}
|
||||
*/
|
||||
function stripHtml(html) {
|
||||
return (html || "").replace(/<[^>]*>/g, "").trim();
|
||||
}
|
||||
@@ -28,6 +28,7 @@ import {
|
||||
markItemsRead,
|
||||
countReadItems,
|
||||
} from "../storage/items.js";
|
||||
import { fetchActorOutbox } from "../activitypub/outbox-fetcher.js";
|
||||
import { getUserId } from "../utils/auth.js";
|
||||
import {
|
||||
validateChannelName,
|
||||
@@ -207,8 +208,8 @@ export async function deleteChannelAction(request, response) {
|
||||
const userId = getUserId(request);
|
||||
const { uid } = request.params;
|
||||
|
||||
// Don't allow deleting notifications channel
|
||||
if (uid === "notifications") {
|
||||
// Don't allow deleting system channels
|
||||
if (uid === "notifications" || uid === "activitypub") {
|
||||
return response.redirect(`${request.baseUrl}/channels`);
|
||||
}
|
||||
|
||||
@@ -909,6 +910,118 @@ export async function refreshFeed(request, response) {
|
||||
response.redirect(`${request.baseUrl}/channels/${uid}/feeds`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Actor profile — fetch and display a remote AP actor's recent posts
|
||||
* @param {object} request - Express request
|
||||
* @param {object} response - Express response
|
||||
*/
|
||||
/**
|
||||
* Find the ActivityPub plugin instance from installed plugins.
|
||||
* @param {object} request - Express request
|
||||
* @returns {object|undefined} The AP plugin instance
|
||||
*/
|
||||
function getApPlugin(request) {
|
||||
const installedPlugins = request.app.locals.installedPlugins;
|
||||
if (!installedPlugins) return undefined;
|
||||
return [...installedPlugins].find(
|
||||
(p) => p.name === "ActivityPub endpoint",
|
||||
);
|
||||
}
|
||||
|
||||
export async function actorProfile(request, response) {
|
||||
const actorUrl = request.query.url;
|
||||
if (!actorUrl) {
|
||||
return response.status(400).render("404");
|
||||
}
|
||||
|
||||
// Check if we already follow this actor
|
||||
const { application } = request.app.locals;
|
||||
const apFollowing = application?.collections?.get("ap_following");
|
||||
let isFollowing = false;
|
||||
if (apFollowing) {
|
||||
const existing = await apFollowing.findOne({ actorUrl });
|
||||
isFollowing = !!existing;
|
||||
}
|
||||
|
||||
// Check if AP plugin is available (for follow button visibility)
|
||||
const apPlugin = getApPlugin(request);
|
||||
const canFollow = !!apPlugin;
|
||||
|
||||
try {
|
||||
const { actor, items } = await fetchActorOutbox(actorUrl, { limit: 30 });
|
||||
|
||||
response.render("actor", {
|
||||
title: actor.name || "Actor",
|
||||
actor,
|
||||
items,
|
||||
actorUrl,
|
||||
isFollowing,
|
||||
canFollow,
|
||||
baseUrl: request.baseUrl,
|
||||
});
|
||||
} catch (error) {
|
||||
console.error(`[Microsub] Actor profile fetch failed: ${error.message}`);
|
||||
response.render("actor", {
|
||||
title: "Actor",
|
||||
actor: { name: actorUrl, url: actorUrl, photo: "", summary: "" },
|
||||
items: [],
|
||||
actorUrl,
|
||||
isFollowing,
|
||||
canFollow,
|
||||
baseUrl: request.baseUrl,
|
||||
error: "Could not fetch this actor's profile. They may have restricted access.",
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export async function followActorAction(request, response) {
|
||||
const { actorUrl, actorName } = request.body;
|
||||
if (!actorUrl) {
|
||||
return response.status(400).redirect(request.baseUrl + "/channels/activitypub");
|
||||
}
|
||||
|
||||
const apPlugin = getApPlugin(request);
|
||||
if (!apPlugin) {
|
||||
console.error("[Microsub] Cannot follow: ActivityPub plugin not installed");
|
||||
return response.redirect(
|
||||
`${request.baseUrl}/actor?url=${encodeURIComponent(actorUrl)}`,
|
||||
);
|
||||
}
|
||||
|
||||
const result = await apPlugin.followActor(actorUrl, { name: actorName });
|
||||
if (!result.ok) {
|
||||
console.error(`[Microsub] Follow via AP plugin failed: ${result.error}`);
|
||||
}
|
||||
|
||||
return response.redirect(
|
||||
`${request.baseUrl}/actor?url=${encodeURIComponent(actorUrl)}`,
|
||||
);
|
||||
}
|
||||
|
||||
export async function unfollowActorAction(request, response) {
|
||||
const { actorUrl } = request.body;
|
||||
if (!actorUrl) {
|
||||
return response.status(400).redirect(request.baseUrl + "/channels/activitypub");
|
||||
}
|
||||
|
||||
const apPlugin = getApPlugin(request);
|
||||
if (!apPlugin) {
|
||||
console.error("[Microsub] Cannot unfollow: ActivityPub plugin not installed");
|
||||
return response.redirect(
|
||||
`${request.baseUrl}/actor?url=${encodeURIComponent(actorUrl)}`,
|
||||
);
|
||||
}
|
||||
|
||||
const result = await apPlugin.unfollowActor(actorUrl);
|
||||
if (!result.ok) {
|
||||
console.error(`[Microsub] Unfollow via AP plugin failed: ${result.error}`);
|
||||
}
|
||||
|
||||
return response.redirect(
|
||||
`${request.baseUrl}/actor?url=${encodeURIComponent(actorUrl)}`,
|
||||
);
|
||||
}
|
||||
|
||||
export const readerController = {
|
||||
index,
|
||||
channels,
|
||||
@@ -933,4 +1046,7 @@ export const readerController = {
|
||||
searchPage,
|
||||
searchFeeds,
|
||||
subscribe,
|
||||
actorProfile,
|
||||
followActorAction,
|
||||
unfollowActorAction,
|
||||
};
|
||||
|
||||
@@ -115,6 +115,7 @@ export async function getChannels(application, userId) {
|
||||
channelId: channel._id,
|
||||
readBy: { $ne: userId },
|
||||
published: { $gte: cutoffDate },
|
||||
_stripped: { $ne: true },
|
||||
});
|
||||
|
||||
return {
|
||||
@@ -206,8 +207,8 @@ export async function deleteChannel(application, uid, userId) {
|
||||
const query = { uid };
|
||||
if (userId) query.userId = userId;
|
||||
|
||||
// Don't allow deleting notifications channel
|
||||
if (uid === "notifications") {
|
||||
// Don't allow deleting system channels
|
||||
if (uid === "notifications" || uid === "activitypub") {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -305,3 +306,39 @@ export async function ensureNotificationsChannel(application, userId) {
|
||||
await collection.insertOne(channel);
|
||||
return channel;
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure ActivityPub channel exists
|
||||
* @param {object} application - Indiekit application
|
||||
* @param {string} [userId] - User ID
|
||||
* @returns {Promise<object>} ActivityPub channel
|
||||
*/
|
||||
export async function ensureActivityPubChannel(application, userId) {
|
||||
const collection = getCollection(application);
|
||||
|
||||
const existing = await collection.findOne({
|
||||
uid: "activitypub",
|
||||
...(userId && { userId }),
|
||||
});
|
||||
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
|
||||
const channel = {
|
||||
uid: "activitypub",
|
||||
name: "Fediverse",
|
||||
userId,
|
||||
source: "activitypub",
|
||||
order: -0.5, // After notifications (-1), before user channels (0+)
|
||||
settings: {
|
||||
excludeTypes: [],
|
||||
excludeRegex: undefined,
|
||||
},
|
||||
createdAt: new Date().toISOString(),
|
||||
updatedAt: new Date().toISOString(),
|
||||
};
|
||||
|
||||
await collection.insertOne(channel);
|
||||
return channel;
|
||||
}
|
||||
|
||||
@@ -87,8 +87,9 @@ export async function getTimelineItems(application, channelId, options = {}) {
|
||||
typeof channelId === "string" ? new ObjectId(channelId) : channelId;
|
||||
const limit = parseLimit(options.limit);
|
||||
|
||||
// Base query - filter out read items unless showRead is true
|
||||
const baseQuery = { channelId: objectId };
|
||||
// Base query - filter out read items unless showRead is true,
|
||||
// and always exclude stripped dedup skeletons (no content to display)
|
||||
const baseQuery = { channelId: objectId, _stripped: { $ne: true } };
|
||||
if (options.userId && !options.showRead) {
|
||||
baseQuery.readBy = { $ne: options.userId };
|
||||
}
|
||||
@@ -288,61 +289,109 @@ export async function countReadItems(application, channelId, userId) {
|
||||
* @param {string} userId - User ID
|
||||
* @returns {Promise<number>} Number of items updated
|
||||
*/
|
||||
// Maximum number of read items to keep per channel
|
||||
const MAX_READ_ITEMS = 30;
|
||||
// Maximum number of full read items to keep per channel before stripping content.
|
||||
// Items beyond this limit are converted to lightweight dedup skeletons (channelId,
|
||||
// uid, readBy) so the poller doesn't re-ingest them as new unread entries.
|
||||
const MAX_FULL_READ_ITEMS = 200;
|
||||
|
||||
/**
|
||||
* Cleanup old read items, keeping only the most recent MAX_READ_ITEMS
|
||||
* Cleanup old read items by stripping content but preserving dedup skeletons.
|
||||
* This prevents the vicious cycle where deleted read items get re-ingested as
|
||||
* unread by the poller because the dedup record (channelId + uid) was destroyed.
|
||||
*
|
||||
* AP items (feedId: null) are hard-deleted instead of stripped, since no poller
|
||||
* re-ingests them — they arrive via inbox push and don't need dedup skeletons.
|
||||
*
|
||||
* @param {object} collection - MongoDB collection
|
||||
* @param {ObjectId} channelObjectId - Channel ObjectId
|
||||
* @param {string} userId - User ID
|
||||
*/
|
||||
async function cleanupOldReadItems(collection, channelObjectId, userId) {
|
||||
// Count read items in this channel
|
||||
const readCount = await collection.countDocuments({
|
||||
channelId: channelObjectId,
|
||||
readBy: userId,
|
||||
});
|
||||
|
||||
if (readCount > MAX_READ_ITEMS) {
|
||||
// Find the oldest read items to delete
|
||||
const itemsToDelete = await collection
|
||||
if (readCount > MAX_FULL_READ_ITEMS) {
|
||||
// Find old read items beyond the retention limit
|
||||
const itemsToCleanup = await collection
|
||||
.find({
|
||||
channelId: channelObjectId,
|
||||
readBy: userId,
|
||||
_stripped: { $ne: true },
|
||||
})
|
||||
.sort({ published: -1, _id: -1 }) // Newest first
|
||||
.skip(MAX_READ_ITEMS) // Skip the ones we want to keep
|
||||
.project({ _id: 1 })
|
||||
.sort({ published: -1, _id: -1 })
|
||||
.skip(MAX_FULL_READ_ITEMS)
|
||||
.project({ _id: 1, feedId: 1 })
|
||||
.toArray();
|
||||
|
||||
if (itemsToDelete.length > 0) {
|
||||
const idsToDelete = itemsToDelete.map((item) => item._id);
|
||||
const deleteResult = await collection.deleteMany({
|
||||
_id: { $in: idsToDelete },
|
||||
if (itemsToCleanup.length === 0) return;
|
||||
|
||||
// Separate AP items (feedId: null) from RSS items (feedId: ObjectId)
|
||||
const apItemIds = [];
|
||||
const rssItemIds = [];
|
||||
for (const item of itemsToCleanup) {
|
||||
if (item.feedId) {
|
||||
rssItemIds.push(item._id);
|
||||
} else {
|
||||
apItemIds.push(item._id);
|
||||
}
|
||||
}
|
||||
|
||||
// Hard-delete AP items — no poller to re-ingest, skeletons are useless
|
||||
if (apItemIds.length > 0) {
|
||||
const deleted = await collection.deleteMany({
|
||||
_id: { $in: apItemIds },
|
||||
});
|
||||
console.info(
|
||||
`[Microsub] Cleaned up ${deleteResult.deletedCount} old read items (keeping ${MAX_READ_ITEMS})`,
|
||||
`[Microsub] Deleted ${deleted.deletedCount} old AP read items`,
|
||||
);
|
||||
}
|
||||
|
||||
// Strip RSS items to dedup skeletons — poller would re-ingest if deleted
|
||||
if (rssItemIds.length > 0) {
|
||||
const stripped = await collection.updateMany(
|
||||
{ _id: { $in: rssItemIds } },
|
||||
{
|
||||
$set: { _stripped: true },
|
||||
$unset: {
|
||||
name: "",
|
||||
content: "",
|
||||
summary: "",
|
||||
author: "",
|
||||
category: "",
|
||||
photo: "",
|
||||
video: "",
|
||||
audio: "",
|
||||
likeOf: "",
|
||||
repostOf: "",
|
||||
bookmarkOf: "",
|
||||
inReplyTo: "",
|
||||
source: "",
|
||||
},
|
||||
},
|
||||
);
|
||||
console.info(
|
||||
`[Microsub] Stripped ${stripped.modifiedCount} old RSS read items (keeping ${MAX_FULL_READ_ITEMS} full)`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup all read items across all channels (startup cleanup)
|
||||
* Cleanup all read items across all channels (startup cleanup).
|
||||
* RSS items are stripped to dedup skeletons; AP items are hard-deleted.
|
||||
* @param {object} application - Indiekit application
|
||||
* @returns {Promise<number>} Total number of items deleted
|
||||
* @returns {Promise<number>} Total number of items cleaned up
|
||||
*/
|
||||
export async function cleanupAllReadItems(application) {
|
||||
const collection = getCollection(application);
|
||||
const channelsCollection = application.collections.get("microsub_channels");
|
||||
|
||||
// Get all channels
|
||||
const channels = await channelsCollection.find({}).toArray();
|
||||
let totalDeleted = 0;
|
||||
let totalCleaned = 0;
|
||||
|
||||
for (const channel of channels) {
|
||||
// Get unique userIds who have read items in this channel
|
||||
const readByUsers = await collection.distinct("readBy", {
|
||||
channelId: channel._id,
|
||||
readBy: { $exists: true, $ne: [] },
|
||||
@@ -354,40 +403,83 @@ export async function cleanupAllReadItems(application) {
|
||||
const readCount = await collection.countDocuments({
|
||||
channelId: channel._id,
|
||||
readBy: userId,
|
||||
_stripped: { $ne: true },
|
||||
});
|
||||
|
||||
if (readCount > MAX_READ_ITEMS) {
|
||||
const itemsToDelete = await collection
|
||||
if (readCount > MAX_FULL_READ_ITEMS) {
|
||||
const itemsToCleanup = await collection
|
||||
.find({
|
||||
channelId: channel._id,
|
||||
readBy: userId,
|
||||
_stripped: { $ne: true },
|
||||
})
|
||||
.sort({ published: -1, _id: -1 })
|
||||
.skip(MAX_READ_ITEMS)
|
||||
.project({ _id: 1 })
|
||||
.skip(MAX_FULL_READ_ITEMS)
|
||||
.project({ _id: 1, feedId: 1 })
|
||||
.toArray();
|
||||
|
||||
if (itemsToDelete.length > 0) {
|
||||
const idsToDelete = itemsToDelete.map((item) => item._id);
|
||||
const deleteResult = await collection.deleteMany({
|
||||
_id: { $in: idsToDelete },
|
||||
});
|
||||
totalDeleted += deleteResult.deletedCount;
|
||||
console.info(
|
||||
`[Microsub] Startup cleanup: deleted ${deleteResult.deletedCount} old items from channel "${channel.name}"`,
|
||||
);
|
||||
if (itemsToCleanup.length > 0) {
|
||||
const apItemIds = [];
|
||||
const rssItemIds = [];
|
||||
for (const item of itemsToCleanup) {
|
||||
if (item.feedId) {
|
||||
rssItemIds.push(item._id);
|
||||
} else {
|
||||
apItemIds.push(item._id);
|
||||
}
|
||||
}
|
||||
|
||||
// Hard-delete AP items
|
||||
if (apItemIds.length > 0) {
|
||||
const deleted = await collection.deleteMany({
|
||||
_id: { $in: apItemIds },
|
||||
});
|
||||
totalCleaned += deleted.deletedCount;
|
||||
console.info(
|
||||
`[Microsub] Startup cleanup: deleted ${deleted.deletedCount} AP items from channel "${channel.name}"`,
|
||||
);
|
||||
}
|
||||
|
||||
// Strip RSS items to skeletons
|
||||
if (rssItemIds.length > 0) {
|
||||
const stripped = await collection.updateMany(
|
||||
{ _id: { $in: rssItemIds } },
|
||||
{
|
||||
$set: { _stripped: true },
|
||||
$unset: {
|
||||
name: "",
|
||||
content: "",
|
||||
summary: "",
|
||||
author: "",
|
||||
category: "",
|
||||
photo: "",
|
||||
video: "",
|
||||
audio: "",
|
||||
likeOf: "",
|
||||
repostOf: "",
|
||||
bookmarkOf: "",
|
||||
inReplyTo: "",
|
||||
source: "",
|
||||
},
|
||||
},
|
||||
);
|
||||
totalCleaned += stripped.modifiedCount;
|
||||
console.info(
|
||||
`[Microsub] Startup cleanup: stripped ${stripped.modifiedCount} RSS items from channel "${channel.name}"`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (totalDeleted > 0) {
|
||||
if (totalCleaned > 0) {
|
||||
console.info(
|
||||
`[Microsub] Startup cleanup complete: ${totalDeleted} total items deleted`,
|
||||
`[Microsub] Startup cleanup complete: ${totalCleaned} total items cleaned`,
|
||||
);
|
||||
}
|
||||
|
||||
return totalDeleted;
|
||||
return totalCleaned;
|
||||
}
|
||||
|
||||
export async function markItemsRead(application, channelId, entryIds, userId) {
|
||||
@@ -446,9 +538,6 @@ export async function markItemsRead(application, channelId, entryIds, userId) {
|
||||
`[Microsub] markItemsRead result: ${result.modifiedCount} items updated`,
|
||||
);
|
||||
|
||||
// Cleanup old read items, keeping only the most recent
|
||||
await cleanupOldReadItems(collection, channelObjectId, userId);
|
||||
|
||||
return result.modifiedCount;
|
||||
}
|
||||
|
||||
@@ -577,7 +666,7 @@ export async function getUnreadCount(application, channelId, userId) {
|
||||
const objectId =
|
||||
typeof channelId === "string" ? new ObjectId(channelId) : channelId;
|
||||
|
||||
// Only count items from the last UNREAD_RETENTION_DAYS
|
||||
// Only count items from the last UNREAD_RETENTION_DAYS, exclude stripped skeletons
|
||||
const cutoffDate = new Date();
|
||||
cutoffDate.setDate(cutoffDate.getDate() - UNREAD_RETENTION_DAYS);
|
||||
|
||||
@@ -585,6 +674,7 @@ export async function getUnreadCount(application, channelId, userId) {
|
||||
channelId: objectId,
|
||||
readBy: { $ne: userId },
|
||||
published: { $gte: cutoffDate },
|
||||
_stripped: { $ne: true },
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user