perf(mail): incremental UID-scan + daily quality full-sweep
Backend-Lag-Fix Phase 2 — entfernt die Scan-Grundlast an der Wurzel:
- mail_connections: +folder_scan_state JSONB, +last_full_sweep_at TIMESTAMPTZ
(additive Migration, DEFAULT '{}' deckt Bestandsrows; erster Lauf = Full-Sweep
wie bisher → null Verhaltens-/Qualitätsänderung initial).
- scan-internal: pro Ordner status(uidNext,uidValidity); inkrementeller
search(UID > lastUid) statt Last-200-Refetch. Leere Ordner → skip. UIDVALIDITY-
Wächter (Server-Renumber → einmal Full-Sweep). maxUid persistiert via JSONB-Merge.
- Quality-Full-Sweep 1x/Tag (last_full_sweep_at) re-scannt Last-200 → Blocklist-
Updates greifen rückwirkend. Klassifikation/Delete/Consent-Logik 1:1 erhalten.
- db/mail: patchFolderScanState (atomic ||-merge) + markFullSweepDone; toter
getAllActiveMailUserIds entfernt (Cron weg).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
5b57bea9c0
commit
04e2979b8d
@ -0,0 +1,40 @@
|
|||||||
|
-- Migration: mail_uid_scan_state
|
||||||
|
-- Adds incremental UID-scan state to mail_connections (Phase 2 performance).
|
||||||
|
--
|
||||||
|
-- Problem being solved:
|
||||||
|
-- scan-internal.post.ts fetched the last 200 messages per folder on EVERY trigger
|
||||||
|
-- (NOOP-tick every 2 minutes × folders × accounts = sustained CPU load 51-55%).
|
||||||
|
-- Phase 1 (in-flight guard) capped stacking. Phase 2 eliminates the root cost:
|
||||||
|
-- re-fetching already-seen messages.
|
||||||
|
--
|
||||||
|
-- New columns:
|
||||||
|
-- folder_scan_state JSONB:
|
||||||
|
-- Per-folder {lastUid, uidvalidity} map. On each scan run the daemon fetches
|
||||||
|
-- only UIDs > lastUid for each folder. After processing, lastUid is updated to
|
||||||
|
-- the highest UID seen.
|
||||||
|
-- UIDVALIDITY guard: if the server's uidValidity changed, lastUid is reset to 0
|
||||||
|
-- and a full 200-message sweep is performed for that folder (server renumbered UIDs).
|
||||||
|
-- Format: { "INBOX": {"lastUid":1234,"uidvalidity":5678}, "Junk Email": {...} }
|
||||||
|
-- DEFAULT '{}' → first run treats all folders as lastUid=0 (full sweep).
|
||||||
|
--
|
||||||
|
-- last_full_sweep_at TIMESTAMPTZ:
|
||||||
|
-- Timestamp of the last quality full-sweep (1×/day).
|
||||||
|
-- Guards against blocklist updates not reaching older messages.
|
||||||
|
-- NULL → no full-sweep yet → first run counts as full-sweep.
|
||||||
|
--
|
||||||
|
-- Breaking-change status: NONE.
|
||||||
|
-- DEFAULT '{}' covers all existing rows for folder_scan_state.
|
||||||
|
-- last_full_sweep_at is nullable → existing rows get NULL (triggers full-sweep on
|
||||||
|
-- next run, which is the correct and safe behavior: first run after migration
|
||||||
|
-- always does a full 200-message sweep per folder, exactly as before).
|
||||||
|
--
|
||||||
|
-- Deploy: pnpm prisma migrate deploy (on server via GitHub Actions pipeline)
|
||||||
|
|
||||||
|
ALTER TABLE "rebreak"."mail_connections"
|
||||||
|
ADD COLUMN "folder_scan_state" JSONB NOT NULL DEFAULT '{}',
|
||||||
|
ADD COLUMN "last_full_sweep_at" TIMESTAMPTZ;
|
||||||
|
|
||||||
|
-- No index on folder_scan_state:
|
||||||
|
-- Accessed only by primary key lookup (per-connection read), not in WHERE clauses.
|
||||||
|
-- No index on last_full_sweep_at:
|
||||||
|
-- Checked in application logic after fetching the connection by id — not used in queries.
|
||||||
@ -726,6 +726,20 @@ model MailConnection {
|
|||||||
// NULL → Frontend fällt auf Email-Domain zurück.
|
// NULL → Frontend fällt auf Email-Domain zurück.
|
||||||
title String?
|
title String?
|
||||||
|
|
||||||
|
// ─── Inkrementeller UID-Scan (Phase 2, Migration 20260605_mail_uid_scan_state) ─
|
||||||
|
// folder_scan_state: pro Ordner {lastUid, uidvalidity} — ermöglicht inkrementellen
|
||||||
|
// IMAP-Scan (nur neue UIDs seit letztem Run). Format:
|
||||||
|
// { "INBOX": {"lastUid":1234,"uidvalidity":5678}, "Junk Email": {...} }
|
||||||
|
// DEFAULT '{}' → erster Lauf behandelt alle Ordner als lastUid=0 (Full-Sweep).
|
||||||
|
// UIDVALIDITY-Wächter: wenn status.uidValidity != gespeicherter Wert → Reset auf 0
|
||||||
|
// (Ordner wurde server-seitig resettet, alle UIDs ungültig).
|
||||||
|
//
|
||||||
|
// last_full_sweep_at: Zeitstempel des letzten Quality-Full-Sweeps (1×/Tag).
|
||||||
|
// Wächter gegen Blocklist-Updates die ältere Mails nicht treffen würden.
|
||||||
|
// NULL → noch kein Full-Sweep ausgeführt → erster Lauf wird als Full-Sweep gezählt.
|
||||||
|
folderScanState Json @default("{}") @map("folder_scan_state")
|
||||||
|
lastFullSweepAt DateTime? @map("last_full_sweep_at") @db.Timestamptz(6)
|
||||||
|
|
||||||
// ─── Art. 9-Einwilligung (DSGVO-Compliance, Mail-Auto-Delete) ───────────
|
// ─── Art. 9-Einwilligung (DSGVO-Compliance, Mail-Auto-Delete) ───────────
|
||||||
// consentAt=NULL für Bestandsrows → "Re-Consent pending".
|
// consentAt=NULL für Bestandsrows → "Re-Consent pending".
|
||||||
// Daemon pausiert Mail-Verarbeitung wenn consentAt=NULL (kein Auto-Delete).
|
// Daemon pausiert Mail-Verarbeitung wenn consentAt=NULL (kein Auto-Delete).
|
||||||
|
|||||||
@ -7,6 +7,8 @@ import {
|
|||||||
upsertMailBlockedStat,
|
upsertMailBlockedStat,
|
||||||
updateMailConnectionScanStats,
|
updateMailConnectionScanStats,
|
||||||
insertMailClassificationSample,
|
insertMailClassificationSample,
|
||||||
|
patchFolderScanState,
|
||||||
|
markFullSweepDone,
|
||||||
} from "../../db/mail";
|
} from "../../db/mail";
|
||||||
import { getBlocklistedDomainsSet, getMailDisplayNamePatterns } from "../../db/domains";
|
import { getBlocklistedDomainsSet, getMailDisplayNamePatterns } from "../../db/domains";
|
||||||
import { getProfile } from "../../db/profile";
|
import { getProfile } from "../../db/profile";
|
||||||
@ -94,6 +96,21 @@ export default defineEventHandler(async (event) => {
|
|||||||
let scanned = 0;
|
let scanned = 0;
|
||||||
let newlyBlocked = 0;
|
let newlyBlocked = 0;
|
||||||
|
|
||||||
|
// ─── Quality Full-Sweep Wächter ──────────────────────────────────────────
|
||||||
|
// 1×/Tag: alle Ordner werden als lastUid=0 behandelt (Full-Sweep) um sicher-
|
||||||
|
// zustellen dass Blocklist-Updates auch ältere Mails erfassen.
|
||||||
|
// Wichtig: wir behandeln lastUid temporär als 0 — NICHT persistieren.
|
||||||
|
// Nach dem Sweep wird der echte maxUid gespeichert + lastFullSweepAt=NOW().
|
||||||
|
const needsFullSweep =
|
||||||
|
!connection.lastFullSweepAt ||
|
||||||
|
Date.now() - new Date(connection.lastFullSweepAt).getTime() > 24 * 3_600_000;
|
||||||
|
|
||||||
|
// Aktueller folder_scan_state aus der DB (JSONB, Prisma liefert plain object)
|
||||||
|
const folderScanState = (connection.folderScanState ?? {}) as Record<
|
||||||
|
string,
|
||||||
|
{ lastUid: number; uidvalidity: number }
|
||||||
|
>;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await imap.connect();
|
await imap.connect();
|
||||||
|
|
||||||
@ -111,7 +128,8 @@ export default defineEventHandler(async (event) => {
|
|||||||
const skippedSystemFolders = mailboxes.length - scannable.length;
|
const skippedSystemFolders = mailboxes.length - scannable.length;
|
||||||
console.log(
|
console.log(
|
||||||
`[scan-internal] ${connection.email} scanning ${scannable.length} folders` +
|
`[scan-internal] ${connection.email} scanning ${scannable.length} folders` +
|
||||||
(skippedSystemFolders > 0 ? ` (${skippedSystemFolders} system folders skipped)` : ""),
|
(skippedSystemFolders > 0 ? ` (${skippedSystemFolders} system folders skipped)` : "") +
|
||||||
|
(needsFullSweep ? " [full-sweep]" : " [incremental]"),
|
||||||
);
|
);
|
||||||
|
|
||||||
for (const mb of scannable) {
|
for (const mb of scannable) {
|
||||||
@ -123,15 +141,69 @@ export default defineEventHandler(async (event) => {
|
|||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
const SCAN_LIMIT = 200;
|
const SCAN_LIMIT = 200;
|
||||||
const status = await imap.status(mb.path, { messages: true });
|
// ─── UID-Scan-Strategie (Phase 2) ──────────────────────────────────
|
||||||
const msgCount = (status as any).messages ?? 0;
|
// 1. Status holen: messages, uidNext, uidValidity
|
||||||
if (msgCount === 0) continue;
|
const status = await imap.status(mb.path, {
|
||||||
|
messages: true,
|
||||||
const fetchRange =
|
uidNext: true,
|
||||||
msgCount > SCAN_LIMIT ? `${msgCount - SCAN_LIMIT + 1}:*` : "1:*";
|
uidValidity: true,
|
||||||
const allMessages = await imap.fetchAll(fetchRange, {
|
|
||||||
envelope: true,
|
|
||||||
});
|
});
|
||||||
|
const msgCount = (status as any).messages ?? 0;
|
||||||
|
const serverUidValidity: number = (status as any).uidValidity ?? 0;
|
||||||
|
|
||||||
|
if (msgCount === 0) {
|
||||||
|
// Ordner leer — trotzdem Zustand für diesen Ordner persistieren
|
||||||
|
// (verhindert endloses Re-Fetching auf leere Ordner).
|
||||||
|
if (serverUidValidity > 0) {
|
||||||
|
await patchFolderScanState(connection.id, mb.path, {
|
||||||
|
lastUid: 0,
|
||||||
|
uidvalidity: serverUidValidity,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Gespeicherten Zustand lesen + UIDVALIDITY-Wächter
|
||||||
|
const saved = folderScanState[mb.path];
|
||||||
|
let lastUid = saved?.lastUid ?? 0;
|
||||||
|
|
||||||
|
if (needsFullSweep) {
|
||||||
|
// Quality Full-Sweep: temporär als 0 behandeln (Full-Scan), aber
|
||||||
|
// lastUid=0 NICHT dauerhaft in folderScanState schreiben.
|
||||||
|
// Nach dem Sweep speichern wir den echten maxUid.
|
||||||
|
lastUid = 0;
|
||||||
|
} else if (saved && serverUidValidity > 0 && saved.uidvalidity !== serverUidValidity) {
|
||||||
|
// UIDVALIDITY hat sich geändert: Ordner wurde server-seitig resettet.
|
||||||
|
// Alle bisherigen UIDs sind ungültig → Full-Scan für diesen Ordner.
|
||||||
|
console.log(
|
||||||
|
`[scan-internal] ${connection.email} | ${mb.path} | UIDVALIDITY changed ` +
|
||||||
|
`(${saved.uidvalidity}→${serverUidValidity}) — forcing full sweep for this folder`,
|
||||||
|
);
|
||||||
|
lastUid = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Nachrichten fetchen: inkrementell oder full
|
||||||
|
let allMessages: any[];
|
||||||
|
|
||||||
|
if (lastUid > 0) {
|
||||||
|
// Inkrementell: nur UIDs > lastUid suchen
|
||||||
|
const newUids = await (imap as any).search({ uid: `${lastUid + 1}:*` });
|
||||||
|
if (!newUids || newUids.length === 0) {
|
||||||
|
// Keine neuen Nachrichten → Ordner skippen, kein fetchAll nötig
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// Fetch nur die neuen UIDs
|
||||||
|
allMessages = await imap.fetchAll(newUids.join(","), { envelope: true }, { uid: true } as any);
|
||||||
|
} else {
|
||||||
|
// Full-Sweep (erster Scan, UIDVALIDITY-Reset, oder Quality-Full-Sweep):
|
||||||
|
// identische Logik wie vorher — letzten SCAN_LIMIT Nachrichten
|
||||||
|
const fetchRange =
|
||||||
|
msgCount > SCAN_LIMIT ? `${msgCount - SCAN_LIMIT + 1}:*` : "1:*";
|
||||||
|
allMessages = await imap.fetchAll(fetchRange, { envelope: true });
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!allMessages || allMessages.length === 0) continue;
|
||||||
|
|
||||||
scanned += allMessages.length;
|
scanned += allMessages.length;
|
||||||
totalScanned += allMessages.length;
|
totalScanned += allMessages.length;
|
||||||
|
|
||||||
@ -271,11 +343,32 @@ export default defineEventHandler(async (event) => {
|
|||||||
count: toInsert.length,
|
count: toInsert.length,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ─── UID-Zustand persistieren ─────────────────────────────────────
|
||||||
|
// maxUid über alle gefetchten Nachrichten berechnen (numeric UID aus IMAP).
|
||||||
|
// Nur persistieren wenn wir mindestens eine Nachricht hatten.
|
||||||
|
const maxUid = allMessages.reduce((max: number, m: any) => {
|
||||||
|
const u = typeof m.uid === "number" ? m.uid : parseInt(String(m.uid ?? "0"), 10);
|
||||||
|
return u > max ? u : max;
|
||||||
|
}, 0);
|
||||||
|
|
||||||
|
if (maxUid > 0 && serverUidValidity > 0) {
|
||||||
|
await patchFolderScanState(connection.id, mb.path, {
|
||||||
|
lastUid: maxUid,
|
||||||
|
uidvalidity: serverUidValidity,
|
||||||
|
});
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
lock.release();
|
lock.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ─── Full-Sweep abschließen ─────────────────────────────────────────────
|
||||||
|
if (needsFullSweep) {
|
||||||
|
await markFullSweepDone(connection.id);
|
||||||
|
console.log(`[scan-internal] ${connection.email} | full-sweep complete, lastFullSweepAt updated`);
|
||||||
|
}
|
||||||
|
|
||||||
await imap.logout();
|
await imap.logout();
|
||||||
} catch {
|
} catch {
|
||||||
try {
|
try {
|
||||||
|
|||||||
@ -42,15 +42,8 @@ export async function getAllMailConnections(userId: string) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getAllActiveMailUserIds() {
|
// getAllActiveMailUserIds — removed (dead code after Phase-1 cron-deletion).
|
||||||
const db = usePrisma();
|
// No callers remain. The cron that called it was deleted in Phase 1.
|
||||||
const rows = await db.mailConnection.findMany({
|
|
||||||
where: { isActive: true, nextScanAt: { lte: new Date() } },
|
|
||||||
select: { userId: true },
|
|
||||||
distinct: ["userId"],
|
|
||||||
});
|
|
||||||
return rows.map((r) => r.userId);
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function countMailConnections(userId: string) {
|
export async function countMailConnections(userId: string) {
|
||||||
const db = usePrisma();
|
const db = usePrisma();
|
||||||
@ -141,6 +134,46 @@ export async function updateMailConnectionScanStats(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Atomically merges a single folder's UID-scan state into folder_scan_state.
|
||||||
|
*
|
||||||
|
* Uses PostgreSQL JSON merge operator (||) to patch only the given folder key —
|
||||||
|
* other folders are preserved. This is safe under concurrent multi-folder updates
|
||||||
|
* because each folder key is independent.
|
||||||
|
*
|
||||||
|
* Example JSONB result after call with (id, "INBOX", {lastUid:1234, uidvalidity:5678}):
|
||||||
|
* { "INBOX": {"lastUid":1234,"uidvalidity":5678}, "Junk Email": {"lastUid":99,...} }
|
||||||
|
*
|
||||||
|
* @param connectionId MailConnection.id
|
||||||
|
* @param folderPath IMAP mailbox path (e.g. "INBOX", "Junk Email")
|
||||||
|
* @param state { lastUid: number, uidvalidity: number }
|
||||||
|
*/
|
||||||
|
export async function patchFolderScanState(
|
||||||
|
connectionId: string,
|
||||||
|
folderPath: string,
|
||||||
|
state: { lastUid: number; uidvalidity: number },
|
||||||
|
): Promise<void> {
|
||||||
|
const db = usePrisma();
|
||||||
|
const patch = JSON.stringify({ [folderPath]: state });
|
||||||
|
await db.$executeRaw`
|
||||||
|
UPDATE "rebreak"."mail_connections"
|
||||||
|
SET "folder_scan_state" = "folder_scan_state" || ${patch}::jsonb
|
||||||
|
WHERE "id" = ${connectionId}::uuid
|
||||||
|
`;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marks a connection's last_full_sweep_at to NOW().
|
||||||
|
* Called once per connection per day when the quality full-sweep runs.
|
||||||
|
*/
|
||||||
|
export async function markFullSweepDone(connectionId: string): Promise<void> {
|
||||||
|
const db = usePrisma();
|
||||||
|
await db.mailConnection.update({
|
||||||
|
where: { id: connectionId },
|
||||||
|
data: { lastFullSweepAt: new Date() },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
export async function getMailBlockedStats(userId: string) {
|
export async function getMailBlockedStats(userId: string) {
|
||||||
const db = usePrisma();
|
const db = usePrisma();
|
||||||
const since7d = new Date(Date.now() - 7 * 86_400_000);
|
const since7d = new Date(Date.now() - 7 * 86_400_000);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user