Files
indiekit-endpoint-microsub/lib/polling/processor.js
Ricardo 114998bf03 feat: add source capability detection, protocol indicators, and reply routing
- Add lib/feeds/capabilities.js: detect feed source capabilities
  (webmention, micropub, platform type) on subscribe and first fetch
- Enrich timeline items with source_type from capabilities or URL inference
- Add protocol indicator icons (Bluesky/Mastodon/web) to item-card.njk
- Auto-select syndication target in compose based on interaction URL protocol
- Modified: follow.js, processor.js, reader.js, item-card.njk

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 18:26:18 +01:00

289 lines
8.3 KiB
JavaScript

/**
* Feed processing pipeline
* @module polling/processor
*/
import { getRedisClient, publishEvent } from "../cache/redis.js";
import { detectCapabilities } from "../feeds/capabilities.js";
import { fetchAndParseFeed } from "../feeds/fetcher.js";
import { getChannel } from "../storage/channels.js";
import {
updateFeed,
updateFeedAfterFetch,
updateFeedStatus,
updateFeedWebsub,
} from "../storage/feeds.js";
import { passesRegexFilter, passesTypeFilter } from "../storage/filters.js";
import { addItem } from "../storage/items.js";
import {
subscribe as websubSubscribe,
getCallbackUrl,
} from "../websub/subscriber.js";
import { calculateNewTier } from "./tier.js";
/**
* Process a single feed
* @param {object} application - Indiekit application
* @param {object} feed - Feed document from database
* @returns {Promise<object>} Processing result
*/
export async function processFeed(application, feed) {
const startTime = Date.now();
const result = {
feedId: feed._id,
url: feed.url,
success: false,
itemsAdded: 0,
error: undefined,
};
try {
// Get Redis client for caching
const redis = getRedisClient(application);
// Fetch and parse the feed
const parsed = await fetchAndParseFeed(feed.url, {
etag: feed.etag,
lastModified: feed.lastModified,
redis,
});
// Handle 304 Not Modified
if (parsed.notModified) {
const tierResult = calculateNewTier({
currentTier: feed.tier,
hasNewItems: false,
consecutiveUnchanged: feed.unmodified || 0,
});
await updateFeedAfterFetch(application, feed._id, false, {
tier: tierResult.tier,
unmodified: tierResult.consecutiveUnchanged,
nextFetchAt: tierResult.nextFetchAt,
});
result.success = true;
result.notModified = true;
return result;
}
// Get channel for filtering
const channel = await getChannel(application, feed.channelId);
// Process items
let newItemCount = 0;
for (const item of parsed.items) {
// Apply channel filters
if (channel?.settings && !passesFilters(item, channel.settings)) {
continue;
}
// Enrich item source with feed metadata
if (item._source) {
item._source.name = feed.title || parsed.name;
}
// Attach source_type from feed capabilities (for protocol indicators)
// Falls back to URL-based inference when capabilities haven't been detected yet
item._source = item._source || {};
if (feed.capabilities?.source_type) {
item._source.source_type = feed.capabilities.source_type;
} else {
item._source.source_type = inferSourceType(feed.url);
}
// Store the item
const stored = await addItem(application, {
channelId: feed.channelId,
feedId: feed._id,
uid: item.uid,
item,
});
if (stored) {
newItemCount++;
// Publish real-time event
if (redis) {
await publishEvent(redis, `microsub:${feed.channelId}`, {
type: "new-item",
channelId: feed.channelId.toString(),
item: stored,
});
}
}
}
result.itemsAdded = newItemCount;
// Update tier based on whether we found new items
const tierResult = calculateNewTier({
currentTier: feed.tier,
hasNewItems: newItemCount > 0,
consecutiveUnchanged: newItemCount > 0 ? 0 : feed.unmodified || 0,
});
// Update feed metadata
const updateData = {
tier: tierResult.tier,
unmodified: tierResult.consecutiveUnchanged,
nextFetchAt: tierResult.nextFetchAt,
etag: parsed.etag,
lastModified: parsed.lastModified,
};
// Update feed title/photo if discovered
if (parsed.name && !feed.title) {
updateData.title = parsed.name;
}
if (parsed.photo && !feed.photo) {
updateData.photo = parsed.photo;
}
await updateFeedAfterFetch(
application,
feed._id,
newItemCount > 0,
updateData,
);
// Handle WebSub hub discovery and auto-subscription
if (parsed.hub && (!feed.websub || feed.websub.hub !== parsed.hub)) {
await updateFeedWebsub(application, feed._id, {
hub: parsed.hub,
topic: parsed.self || feed.url,
});
// Auto-subscribe to WebSub hub if we have a callback URL
const baseUrl = application.url;
if (baseUrl) {
const callbackUrl = getCallbackUrl(baseUrl, feed._id.toString());
const updatedFeed = {
...feed,
websub: { hub: parsed.hub, topic: parsed.self || feed.url },
};
websubSubscribe(application, updatedFeed, callbackUrl)
.then((subscribed) => {
if (subscribed) {
console.info(
`[Microsub] WebSub subscription initiated for ${feed.url}`,
);
}
})
.catch((error) => {
console.error(
`[Microsub] WebSub subscription error for ${feed.url}:`,
error.message,
);
});
}
}
result.success = true;
result.tier = tierResult.tier;
// Update feed status to active on success
await updateFeedStatus(application, feed._id, {
success: true,
itemCount: parsed.items?.length || 0,
});
// Detect source capabilities on first successful fetch (if not yet detected)
if (!feed.capabilities) {
detectCapabilities(feed.url)
.then((capabilities) => {
updateFeed(application, feed._id, { capabilities }).catch(() => {});
})
.catch((error) => {
console.debug(
`[Microsub] Capability detection skipped for ${feed.url}:`,
error.message,
);
});
}
} catch (error) {
result.error = error.message;
// Update feed status to error
await updateFeedStatus(application, feed._id, {
success: false,
error: error.message,
});
// Still update the feed to prevent retry storms
try {
const tierResult = calculateNewTier({
currentTier: feed.tier,
hasNewItems: false,
consecutiveUnchanged: (feed.unmodified || 0) + 1,
});
await updateFeedAfterFetch(application, feed._id, false, {
tier: Math.min(tierResult.tier + 1, 10), // Increase tier on error
unmodified: tierResult.consecutiveUnchanged,
nextFetchAt: tierResult.nextFetchAt,
});
} catch {
// Ignore update errors
}
}
result.duration = Date.now() - startTime;
return result;
}
/**
* Infer source type from feed URL when capabilities haven't been detected yet
* @param {string} url - Feed URL
* @returns {string} Source type
*/
function inferSourceType(url) {
if (!url) return "web";
const lower = url.toLowerCase();
if (lower.includes("bsky.app") || lower.includes("bluesky")) return "bluesky";
if (lower.includes("mastodon.") || lower.includes("mstdn.") ||
lower.includes("fosstodon.") || lower.includes("pleroma.") ||
lower.includes("misskey.") || lower.includes("pixelfed.")) return "mastodon";
return "web";
}
/**
* Check if an item passes channel filters
* @param {object} item - Feed item
* @param {object} settings - Channel settings
* @returns {boolean} Whether the item passes filters
*/
function passesFilters(item, settings) {
return passesTypeFilter(item, settings) && passesRegexFilter(item, settings);
}
/**
* Process multiple feeds in batch
* @param {object} application - Indiekit application
* @param {Array} feeds - Array of feed documents
* @param {object} options - Processing options
* @returns {Promise<object>} Batch processing result
*/
export async function processFeedBatch(application, feeds, options = {}) {
const { concurrency = 5 } = options;
const results = [];
// Process in batches with limited concurrency
for (let index = 0; index < feeds.length; index += concurrency) {
const batch = feeds.slice(index, index + concurrency);
const batchResults = await Promise.all(
batch.map((feed) => processFeed(application, feed)),
);
results.push(...batchResults);
}
return {
total: feeds.length,
successful: results.filter((r) => r.success).length,
failed: results.filter((r) => !r.success).length,
itemsAdded: results.reduce((sum, r) => sum + r.itemsAdded, 0),
results,
};
}