From cc549c7f17cecc9120d5686fb8998bb4e7ebd6d3 Mon Sep 17 00:00:00 2001 From: chahinebrini Date: Fri, 5 Jun 2026 11:59:44 +0200 Subject: [PATCH] =?UTF-8?q?perf(mail):=20re-apply=20incremental=20UID-scan?= =?UTF-8?q?=20(Phase=202)=20=E2=80=94=20safe=20w/=20externals=20fix?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wiedereinspielung von mo's inkrementellem Scan (war nach dem imapflow-Bundle- Incident zurückgerollt). Jetzt safe, weil der nitro externals-Fix (d64f31d) imapflow robust extern hält — lokal verifiziert: Build clean, imapflow in node_modules (kein Chunk), kein util.inherits. - scan-internal: inkrementeller UID-Scan (status uidNext/uidValidity, search UID>lastUid, leere Ordner skip), UIDVALIDITY-Wächter, täglicher Quality-Full- Sweep (last_full_sweep_at). Klassifikation/Delete/Consent 1:1. - db/mail: patchFolderScanState (atomic JSONB-merge) + markFullSweepDone; toter getAllActiveMailUserIds entfernt. Schema + Migration (folder_scan_state, last_full_sweep_at) sind bereits live. Co-Authored-By: Claude Opus 4.8 --- backend/server/api/mail/scan-internal.post.ts | 111 ++++++++++++++++-- backend/server/db/mail.ts | 51 ++++++-- 2 files changed, 144 insertions(+), 18 deletions(-) diff --git a/backend/server/api/mail/scan-internal.post.ts b/backend/server/api/mail/scan-internal.post.ts index 2b48c6f..dc9b90a 100644 --- a/backend/server/api/mail/scan-internal.post.ts +++ b/backend/server/api/mail/scan-internal.post.ts @@ -7,6 +7,8 @@ import { upsertMailBlockedStat, updateMailConnectionScanStats, insertMailClassificationSample, + patchFolderScanState, + markFullSweepDone, } from "../../db/mail"; import { getBlocklistedDomainsSet, getMailDisplayNamePatterns } from "../../db/domains"; import { getProfile } from "../../db/profile"; @@ -94,6 +96,21 @@ export default defineEventHandler(async (event) => { let scanned = 0; let newlyBlocked = 0; + // ─── Quality Full-Sweep Wächter ────────────────────────────────────────── + // 1×/Tag: alle Ordner werden als lastUid=0 behandelt (Full-Sweep) um sicher- + // zustellen dass Blocklist-Updates auch ältere Mails erfassen. + // Wichtig: wir behandeln lastUid temporär als 0 — NICHT persistieren. + // Nach dem Sweep wird der echte maxUid gespeichert + lastFullSweepAt=NOW(). + const needsFullSweep = + !connection.lastFullSweepAt || + Date.now() - new Date(connection.lastFullSweepAt).getTime() > 24 * 3_600_000; + + // Aktueller folder_scan_state aus der DB (JSONB, Prisma liefert plain object) + const folderScanState = (connection.folderScanState ?? {}) as Record< + string, + { lastUid: number; uidvalidity: number } + >; + try { await imap.connect(); @@ -111,7 +128,8 @@ export default defineEventHandler(async (event) => { const skippedSystemFolders = mailboxes.length - scannable.length; console.log( `[scan-internal] ${connection.email} scanning ${scannable.length} folders` + - (skippedSystemFolders > 0 ? ` (${skippedSystemFolders} system folders skipped)` : ""), + (skippedSystemFolders > 0 ? ` (${skippedSystemFolders} system folders skipped)` : "") + + (needsFullSweep ? " [full-sweep]" : " [incremental]"), ); for (const mb of scannable) { @@ -123,15 +141,69 @@ export default defineEventHandler(async (event) => { } try { const SCAN_LIMIT = 200; - const status = await imap.status(mb.path, { messages: true }); - const msgCount = (status as any).messages ?? 0; - if (msgCount === 0) continue; - - const fetchRange = - msgCount > SCAN_LIMIT ? `${msgCount - SCAN_LIMIT + 1}:*` : "1:*"; - const allMessages = await imap.fetchAll(fetchRange, { - envelope: true, + // ─── UID-Scan-Strategie (Phase 2) ────────────────────────────────── + // 1. Status holen: messages, uidNext, uidValidity + const status = await imap.status(mb.path, { + messages: true, + uidNext: true, + uidValidity: true, }); + const msgCount = (status as any).messages ?? 0; + const serverUidValidity: number = (status as any).uidValidity ?? 0; + + if (msgCount === 0) { + // Ordner leer — trotzdem Zustand für diesen Ordner persistieren + // (verhindert endloses Re-Fetching auf leere Ordner). + if (serverUidValidity > 0) { + await patchFolderScanState(connection.id, mb.path, { + lastUid: 0, + uidvalidity: serverUidValidity, + }); + } + continue; + } + + // 2. Gespeicherten Zustand lesen + UIDVALIDITY-Wächter + const saved = folderScanState[mb.path]; + let lastUid = saved?.lastUid ?? 0; + + if (needsFullSweep) { + // Quality Full-Sweep: temporär als 0 behandeln (Full-Scan), aber + // lastUid=0 NICHT dauerhaft in folderScanState schreiben. + // Nach dem Sweep speichern wir den echten maxUid. + lastUid = 0; + } else if (saved && serverUidValidity > 0 && saved.uidvalidity !== serverUidValidity) { + // UIDVALIDITY hat sich geändert: Ordner wurde server-seitig resettet. + // Alle bisherigen UIDs sind ungültig → Full-Scan für diesen Ordner. + console.log( + `[scan-internal] ${connection.email} | ${mb.path} | UIDVALIDITY changed ` + + `(${saved.uidvalidity}→${serverUidValidity}) — forcing full sweep for this folder`, + ); + lastUid = 0; + } + + // 3. Nachrichten fetchen: inkrementell oder full + let allMessages: any[]; + + if (lastUid > 0) { + // Inkrementell: nur UIDs > lastUid suchen + const newUids = await (imap as any).search({ uid: `${lastUid + 1}:*` }); + if (!newUids || newUids.length === 0) { + // Keine neuen Nachrichten → Ordner skippen, kein fetchAll nötig + continue; + } + // Fetch nur die neuen UIDs + allMessages = await imap.fetchAll(newUids.join(","), { envelope: true }, { uid: true } as any); + } else { + // Full-Sweep (erster Scan, UIDVALIDITY-Reset, oder Quality-Full-Sweep): + // identische Logik wie vorher — letzten SCAN_LIMIT Nachrichten + const fetchRange = + msgCount > SCAN_LIMIT ? `${msgCount - SCAN_LIMIT + 1}:*` : "1:*"; + allMessages = await imap.fetchAll(fetchRange, { envelope: true }); + } + + if (!allMessages || allMessages.length === 0) continue; + scanned += allMessages.length; totalScanned += allMessages.length; @@ -271,11 +343,32 @@ export default defineEventHandler(async (event) => { count: toInsert.length, }); } + + // ─── UID-Zustand persistieren ───────────────────────────────────── + // maxUid über alle gefetchten Nachrichten berechnen (numeric UID aus IMAP). + // Nur persistieren wenn wir mindestens eine Nachricht hatten. + const maxUid = allMessages.reduce((max: number, m: any) => { + const u = typeof m.uid === "number" ? m.uid : parseInt(String(m.uid ?? "0"), 10); + return u > max ? u : max; + }, 0); + + if (maxUid > 0 && serverUidValidity > 0) { + await patchFolderScanState(connection.id, mb.path, { + lastUid: maxUid, + uidvalidity: serverUidValidity, + }); + } } finally { lock.release(); } } + // ─── Full-Sweep abschließen ───────────────────────────────────────────── + if (needsFullSweep) { + await markFullSweepDone(connection.id); + console.log(`[scan-internal] ${connection.email} | full-sweep complete, lastFullSweepAt updated`); + } + await imap.logout(); } catch { try { diff --git a/backend/server/db/mail.ts b/backend/server/db/mail.ts index cf85fd6..41120f8 100644 --- a/backend/server/db/mail.ts +++ b/backend/server/db/mail.ts @@ -42,15 +42,8 @@ export async function getAllMailConnections(userId: string) { }); } -export async function getAllActiveMailUserIds() { - const db = usePrisma(); - const rows = await db.mailConnection.findMany({ - where: { isActive: true, nextScanAt: { lte: new Date() } }, - select: { userId: true }, - distinct: ["userId"], - }); - return rows.map((r) => r.userId); -} +// getAllActiveMailUserIds — removed (dead code after Phase-1 cron-deletion). +// No callers remain. The cron that called it was deleted in Phase 1. export async function countMailConnections(userId: string) { const db = usePrisma(); @@ -141,6 +134,46 @@ export async function updateMailConnectionScanStats( }); } +/** + * Atomically merges a single folder's UID-scan state into folder_scan_state. + * + * Uses PostgreSQL JSON merge operator (||) to patch only the given folder key — + * other folders are preserved. This is safe under concurrent multi-folder updates + * because each folder key is independent. + * + * Example JSONB result after call with (id, "INBOX", {lastUid:1234, uidvalidity:5678}): + * { "INBOX": {"lastUid":1234,"uidvalidity":5678}, "Junk Email": {"lastUid":99,...} } + * + * @param connectionId MailConnection.id + * @param folderPath IMAP mailbox path (e.g. "INBOX", "Junk Email") + * @param state { lastUid: number, uidvalidity: number } + */ +export async function patchFolderScanState( + connectionId: string, + folderPath: string, + state: { lastUid: number; uidvalidity: number }, +): Promise { + const db = usePrisma(); + const patch = JSON.stringify({ [folderPath]: state }); + await db.$executeRaw` + UPDATE "rebreak"."mail_connections" + SET "folder_scan_state" = "folder_scan_state" || ${patch}::jsonb + WHERE "id" = ${connectionId}::uuid + `; +} + +/** + * Marks a connection's last_full_sweep_at to NOW(). + * Called once per connection per day when the quality full-sweep runs. + */ +export async function markFullSweepDone(connectionId: string): Promise { + const db = usePrisma(); + await db.mailConnection.update({ + where: { id: connectionId }, + data: { lastFullSweepAt: new Date() }, + }); +} + export async function getMailBlockedStats(userId: string) { const db = usePrisma(); const since7d = new Date(Date.now() - 7 * 86_400_000);