diff --git a/backend/prisma/migrations/20260605_mail_uid_scan_state/migration.sql b/backend/prisma/migrations/20260605_mail_uid_scan_state/migration.sql new file mode 100644 index 0000000..e81eb45 --- /dev/null +++ b/backend/prisma/migrations/20260605_mail_uid_scan_state/migration.sql @@ -0,0 +1,40 @@ +-- Migration: mail_uid_scan_state +-- Adds incremental UID-scan state to mail_connections (Phase 2 performance). +-- +-- Problem being solved: +-- scan-internal.post.ts fetched the last 200 messages per folder on EVERY trigger +-- (NOOP-tick every 2 minutes × folders × accounts = sustained CPU load 51-55%). +-- Phase 1 (in-flight guard) capped stacking. Phase 2 eliminates the root cost: +-- re-fetching already-seen messages. +-- +-- New columns: +-- folder_scan_state JSONB: +-- Per-folder {lastUid, uidvalidity} map. On each scan run the daemon fetches +-- only UIDs > lastUid for each folder. After processing, lastUid is updated to +-- the highest UID seen. +-- UIDVALIDITY guard: if the server's uidValidity changed, lastUid is reset to 0 +-- and a full 200-message sweep is performed for that folder (server renumbered UIDs). +-- Format: { "INBOX": {"lastUid":1234,"uidvalidity":5678}, "Junk Email": {...} } +-- DEFAULT '{}' → first run treats all folders as lastUid=0 (full sweep). +-- +-- last_full_sweep_at TIMESTAMPTZ: +-- Timestamp of the last quality full-sweep (1×/day). +-- Guards against blocklist updates not reaching older messages. +-- NULL → no full-sweep yet → first run counts as full-sweep. +-- +-- Breaking-change status: NONE. +-- DEFAULT '{}' covers all existing rows for folder_scan_state. +-- last_full_sweep_at is nullable → existing rows get NULL (triggers full-sweep on +-- next run, which is the correct and safe behavior: first run after migration +-- always does a full 200-message sweep per folder, exactly as before). +-- +-- Deploy: pnpm prisma migrate deploy (on server via GitHub Actions pipeline) + +ALTER TABLE "rebreak"."mail_connections" + ADD COLUMN "folder_scan_state" JSONB NOT NULL DEFAULT '{}', + ADD COLUMN "last_full_sweep_at" TIMESTAMPTZ; + +-- No index on folder_scan_state: +-- Accessed only by primary key lookup (per-connection read), not in WHERE clauses. +-- No index on last_full_sweep_at: +-- Checked in application logic after fetching the connection by id — not used in queries. diff --git a/backend/prisma/schema.prisma b/backend/prisma/schema.prisma index d2963bf..22731a2 100644 --- a/backend/prisma/schema.prisma +++ b/backend/prisma/schema.prisma @@ -726,6 +726,20 @@ model MailConnection { // NULL → Frontend fällt auf Email-Domain zurück. title String? + // ─── Inkrementeller UID-Scan (Phase 2, Migration 20260605_mail_uid_scan_state) ─ + // folder_scan_state: pro Ordner {lastUid, uidvalidity} — ermöglicht inkrementellen + // IMAP-Scan (nur neue UIDs seit letztem Run). Format: + // { "INBOX": {"lastUid":1234,"uidvalidity":5678}, "Junk Email": {...} } + // DEFAULT '{}' → erster Lauf behandelt alle Ordner als lastUid=0 (Full-Sweep). + // UIDVALIDITY-Wächter: wenn status.uidValidity != gespeicherter Wert → Reset auf 0 + // (Ordner wurde server-seitig resettet, alle UIDs ungültig). + // + // last_full_sweep_at: Zeitstempel des letzten Quality-Full-Sweeps (1×/Tag). + // Wächter gegen Blocklist-Updates die ältere Mails nicht treffen würden. + // NULL → noch kein Full-Sweep ausgeführt → erster Lauf wird als Full-Sweep gezählt. + folderScanState Json @default("{}") @map("folder_scan_state") + lastFullSweepAt DateTime? @map("last_full_sweep_at") @db.Timestamptz(6) + // ─── Art. 9-Einwilligung (DSGVO-Compliance, Mail-Auto-Delete) ─────────── // consentAt=NULL für Bestandsrows → "Re-Consent pending". // Daemon pausiert Mail-Verarbeitung wenn consentAt=NULL (kein Auto-Delete). diff --git a/backend/server/api/mail/scan-internal.post.ts b/backend/server/api/mail/scan-internal.post.ts index 2b48c6f..dc9b90a 100644 --- a/backend/server/api/mail/scan-internal.post.ts +++ b/backend/server/api/mail/scan-internal.post.ts @@ -7,6 +7,8 @@ import { upsertMailBlockedStat, updateMailConnectionScanStats, insertMailClassificationSample, + patchFolderScanState, + markFullSweepDone, } from "../../db/mail"; import { getBlocklistedDomainsSet, getMailDisplayNamePatterns } from "../../db/domains"; import { getProfile } from "../../db/profile"; @@ -94,6 +96,21 @@ export default defineEventHandler(async (event) => { let scanned = 0; let newlyBlocked = 0; + // ─── Quality Full-Sweep Wächter ────────────────────────────────────────── + // 1×/Tag: alle Ordner werden als lastUid=0 behandelt (Full-Sweep) um sicher- + // zustellen dass Blocklist-Updates auch ältere Mails erfassen. + // Wichtig: wir behandeln lastUid temporär als 0 — NICHT persistieren. + // Nach dem Sweep wird der echte maxUid gespeichert + lastFullSweepAt=NOW(). + const needsFullSweep = + !connection.lastFullSweepAt || + Date.now() - new Date(connection.lastFullSweepAt).getTime() > 24 * 3_600_000; + + // Aktueller folder_scan_state aus der DB (JSONB, Prisma liefert plain object) + const folderScanState = (connection.folderScanState ?? {}) as Record< + string, + { lastUid: number; uidvalidity: number } + >; + try { await imap.connect(); @@ -111,7 +128,8 @@ export default defineEventHandler(async (event) => { const skippedSystemFolders = mailboxes.length - scannable.length; console.log( `[scan-internal] ${connection.email} scanning ${scannable.length} folders` + - (skippedSystemFolders > 0 ? ` (${skippedSystemFolders} system folders skipped)` : ""), + (skippedSystemFolders > 0 ? ` (${skippedSystemFolders} system folders skipped)` : "") + + (needsFullSweep ? " [full-sweep]" : " [incremental]"), ); for (const mb of scannable) { @@ -123,15 +141,69 @@ export default defineEventHandler(async (event) => { } try { const SCAN_LIMIT = 200; - const status = await imap.status(mb.path, { messages: true }); - const msgCount = (status as any).messages ?? 0; - if (msgCount === 0) continue; - - const fetchRange = - msgCount > SCAN_LIMIT ? `${msgCount - SCAN_LIMIT + 1}:*` : "1:*"; - const allMessages = await imap.fetchAll(fetchRange, { - envelope: true, + // ─── UID-Scan-Strategie (Phase 2) ────────────────────────────────── + // 1. Status holen: messages, uidNext, uidValidity + const status = await imap.status(mb.path, { + messages: true, + uidNext: true, + uidValidity: true, }); + const msgCount = (status as any).messages ?? 0; + const serverUidValidity: number = (status as any).uidValidity ?? 0; + + if (msgCount === 0) { + // Ordner leer — trotzdem Zustand für diesen Ordner persistieren + // (verhindert endloses Re-Fetching auf leere Ordner). + if (serverUidValidity > 0) { + await patchFolderScanState(connection.id, mb.path, { + lastUid: 0, + uidvalidity: serverUidValidity, + }); + } + continue; + } + + // 2. Gespeicherten Zustand lesen + UIDVALIDITY-Wächter + const saved = folderScanState[mb.path]; + let lastUid = saved?.lastUid ?? 0; + + if (needsFullSweep) { + // Quality Full-Sweep: temporär als 0 behandeln (Full-Scan), aber + // lastUid=0 NICHT dauerhaft in folderScanState schreiben. + // Nach dem Sweep speichern wir den echten maxUid. + lastUid = 0; + } else if (saved && serverUidValidity > 0 && saved.uidvalidity !== serverUidValidity) { + // UIDVALIDITY hat sich geändert: Ordner wurde server-seitig resettet. + // Alle bisherigen UIDs sind ungültig → Full-Scan für diesen Ordner. + console.log( + `[scan-internal] ${connection.email} | ${mb.path} | UIDVALIDITY changed ` + + `(${saved.uidvalidity}→${serverUidValidity}) — forcing full sweep for this folder`, + ); + lastUid = 0; + } + + // 3. Nachrichten fetchen: inkrementell oder full + let allMessages: any[]; + + if (lastUid > 0) { + // Inkrementell: nur UIDs > lastUid suchen + const newUids = await (imap as any).search({ uid: `${lastUid + 1}:*` }); + if (!newUids || newUids.length === 0) { + // Keine neuen Nachrichten → Ordner skippen, kein fetchAll nötig + continue; + } + // Fetch nur die neuen UIDs + allMessages = await imap.fetchAll(newUids.join(","), { envelope: true }, { uid: true } as any); + } else { + // Full-Sweep (erster Scan, UIDVALIDITY-Reset, oder Quality-Full-Sweep): + // identische Logik wie vorher — letzten SCAN_LIMIT Nachrichten + const fetchRange = + msgCount > SCAN_LIMIT ? `${msgCount - SCAN_LIMIT + 1}:*` : "1:*"; + allMessages = await imap.fetchAll(fetchRange, { envelope: true }); + } + + if (!allMessages || allMessages.length === 0) continue; + scanned += allMessages.length; totalScanned += allMessages.length; @@ -271,11 +343,32 @@ export default defineEventHandler(async (event) => { count: toInsert.length, }); } + + // ─── UID-Zustand persistieren ───────────────────────────────────── + // maxUid über alle gefetchten Nachrichten berechnen (numeric UID aus IMAP). + // Nur persistieren wenn wir mindestens eine Nachricht hatten. + const maxUid = allMessages.reduce((max: number, m: any) => { + const u = typeof m.uid === "number" ? m.uid : parseInt(String(m.uid ?? "0"), 10); + return u > max ? u : max; + }, 0); + + if (maxUid > 0 && serverUidValidity > 0) { + await patchFolderScanState(connection.id, mb.path, { + lastUid: maxUid, + uidvalidity: serverUidValidity, + }); + } } finally { lock.release(); } } + // ─── Full-Sweep abschließen ───────────────────────────────────────────── + if (needsFullSweep) { + await markFullSweepDone(connection.id); + console.log(`[scan-internal] ${connection.email} | full-sweep complete, lastFullSweepAt updated`); + } + await imap.logout(); } catch { try { diff --git a/backend/server/db/mail.ts b/backend/server/db/mail.ts index cf85fd6..41120f8 100644 --- a/backend/server/db/mail.ts +++ b/backend/server/db/mail.ts @@ -42,15 +42,8 @@ export async function getAllMailConnections(userId: string) { }); } -export async function getAllActiveMailUserIds() { - const db = usePrisma(); - const rows = await db.mailConnection.findMany({ - where: { isActive: true, nextScanAt: { lte: new Date() } }, - select: { userId: true }, - distinct: ["userId"], - }); - return rows.map((r) => r.userId); -} +// getAllActiveMailUserIds — removed (dead code after Phase-1 cron-deletion). +// No callers remain. The cron that called it was deleted in Phase 1. export async function countMailConnections(userId: string) { const db = usePrisma(); @@ -141,6 +134,46 @@ export async function updateMailConnectionScanStats( }); } +/** + * Atomically merges a single folder's UID-scan state into folder_scan_state. + * + * Uses PostgreSQL JSON merge operator (||) to patch only the given folder key — + * other folders are preserved. This is safe under concurrent multi-folder updates + * because each folder key is independent. + * + * Example JSONB result after call with (id, "INBOX", {lastUid:1234, uidvalidity:5678}): + * { "INBOX": {"lastUid":1234,"uidvalidity":5678}, "Junk Email": {"lastUid":99,...} } + * + * @param connectionId MailConnection.id + * @param folderPath IMAP mailbox path (e.g. "INBOX", "Junk Email") + * @param state { lastUid: number, uidvalidity: number } + */ +export async function patchFolderScanState( + connectionId: string, + folderPath: string, + state: { lastUid: number; uidvalidity: number }, +): Promise { + const db = usePrisma(); + const patch = JSON.stringify({ [folderPath]: state }); + await db.$executeRaw` + UPDATE "rebreak"."mail_connections" + SET "folder_scan_state" = "folder_scan_state" || ${patch}::jsonb + WHERE "id" = ${connectionId}::uuid + `; +} + +/** + * Marks a connection's last_full_sweep_at to NOW(). + * Called once per connection per day when the quality full-sweep runs. + */ +export async function markFullSweepDone(connectionId: string): Promise { + const db = usePrisma(); + await db.mailConnection.update({ + where: { id: connectionId }, + data: { lastFullSweepAt: new Date() }, + }); +} + export async function getMailBlockedStats(userId: string) { const db = usePrisma(); const since7d = new Date(Date.now() - 7 * 86_400_000);