/** * rebreak-imap-idle — IMAP IDLE Daemon * * Hält pro aktivem MailConnection-Eintrag eine persistente IMAP-IDLE-Session. * Wenn der Server "EXISTS" meldet (neue Mail), feuert der Daemon sofort * POST /api/mail/scan-internal gegen das lokale Backend — ohne 30min-Warte. * * Auth-Methoden: * app_password — Gmail (App-Password), iCloud, GMX, etc. * oauth2_microsoft — Outlook / Hotmail / O365 via XOAUTH2 (ImapFlow-nativ) * oauth2_google — Gmail via Google OAuth2 XOAUTH2 (kein App-Password nötig) * * Env-Vars (via Infisical-Wrapper): * DATABASE_URL — Postgres-Connection-String * ADMIN_SECRET — Header-Secret für /api/mail/scan-internal * ENCRYPTION_KEY — AES-256 Key (gleicher wie im Backend, 32+ Zeichen) * BACKEND_URL — z.B. http://127.0.0.1:3016 (default: 3016) * MS_OAUTH_CLIENT_ID — Azure App Registration Client ID * GOOGLE_OAUTH_CLIENT_ID — Google Cloud OAuth Client ID (iOS Native App) * NODE_ENV — production / staging * * Starten: * node index.mjs * * Prozess-Signale: * SIGTERM / SIGINT → graceful shutdown (alle IMAP-Sessions schließen) */ import { ImapFlow } from "imapflow"; import pg from "pg"; import { createCipheriv, createDecipheriv, randomBytes } from "crypto"; // ─── Config ────────────────────────────────────────────────────────────────── const BACKEND_URL = process.env.BACKEND_URL || (process.env.NODE_ENV === "production" ? "http://127.0.0.1:3015" : "http://127.0.0.1:3016"); const ADMIN_SECRET = process.env.NUXT_ADMIN_SECRET || process.env.ADMIN_SECRET || ""; const MS_OAUTH_CLIENT_ID = process.env.MS_OAUTH_CLIENT_ID || ""; const GOOGLE_OAUTH_CLIENT_ID = process.env.GOOGLE_OAUTH_CLIENT_ID || ""; const DB_REFRESH_INTERVAL_MS = 5 * 60 * 1000; // 5 min — neue Connections entdecken // IDLE_RENEW von 25min → 10min: GMX dropped IDLE-connections silent vor 25min // → exists-events kommen nie an + ImapFlow.idle() hängt ohne reject. 10min // deckt alle bekannten Provider-Timeouts ab: // GMX ~10-15min (aggressivster Provider) // Gmail ~29min // iCloud ~29min // outlook.office365.com ~29min (Microsoft dokumentiert 29min IDLE-Timeout) // Trade-off: alle 10min full reconnect-cycle. Vertretbar. const IDLE_RENEW_INTERVAL_MS = 10 * 60 * 1000; // 10 min // NOOP-heartbeat alle 30s während IDLE: detect silent-drops (GMX-pattern) + // Junk-Folder-Sweep-Frequenz. Vertretbar seit Phase-2-Inkremental-Scan: // leere SEARCH-Response (keine neuen UIDs) → skip ohne IMAP-Fetch, günstig. // Server-Headroom (CPX42) deckt die höhere Trigger-Rate ab. // Kein Konflikt mit IDLE_RENEW_INTERVAL_MS (10min) — beide laufen unabhängig. const IDLE_NOOP_INTERVAL_MS = 30 * 1000; // 30 s // Token-Refresh-Schwelle: wenn Access-Token in weniger als 5min abläuft, vor // dem IMAP-Connect refreshen. Verhindert Mid-Session-Expiry. const TOKEN_EXPIRY_THRESHOLD_MS = 5 * 60 * 1000; // 5 min // Recovery-Sweep-Schwelle: wenn eine Session länger als diese Zeitspanne // unterbrochen war (Downtime durch ECONNRESET, Prozess-Crash, pm2-Restart), // wird beim nächsten erfolgreichen Connect ein forceFullSweep ausgelöst. // Rationale: Eine Downtime > 5min bedeutet dass Mails angekommen sein können // ohne dass der IDLE-Daemon sie via exists-Event sah. Ein Full-Sweep stellt // sicher dass global-blocklistete Domains (z.B. mpmgame.com) auch dann // abgefangen werden wenn sie während der Downtime ankamen. // Cold-Start (Prozess-Neustart): immer forceFullSweep, unabhängig dieser Schwelle. // Routinemäßiger IDLE-Renew (10min Close-Reopen innerhalb aktiver Session): // kein forceFullSweep — Session war nie wirklich down. const RECOVERY_SWEEP_THRESHOLD_MS = 5 * 60 * 1000; // 5 min // Bei AUTHENTICATIONFAILED: max. 3 Refresh-Versuche bevor Connection als // auth_broken markiert und der Daemon sie aufgibt. const MAX_AUTH_RETRIES = 3; const RECONNECT_DELAYS_MS = [1000, 5000, 30_000]; // exponential backoff const RECONNECT_LOOP_DELAY_MS = 60 * 1000; // ─── DB-Pool (direktes pg, kein Prisma — Daemon ist kein Nitro-Kontext) ───── const pool = new pg.Pool({ connectionString: process.env.DATABASE_URL }); async function updateConnectionError(connId, errorText) { await pool.query( `UPDATE rebreak.mail_connections SET last_connect_error = $1, last_connect_error_at = NOW() WHERE id = $2`, [errorText, connId], ); } async function clearConnectionError(connId) { await pool.query( `UPDATE rebreak.mail_connections SET last_connect_error = NULL, last_connect_error_at = NULL WHERE id = $1`, [connId], ); } async function updateIdleHeartbeat(connId) { await pool.query( `UPDATE rebreak.mail_connections SET last_idle_heartbeat_at = NOW() WHERE id = $1`, [connId], ); } /** * Lädt alle aktiven MailConnections inkl. OAuth-Felder und consent_at. * Consent-Gate: connections ohne consent_at werden geladen aber vom * IDLE-Loop nicht gescannt (nur gehalten — kein Delete). */ async function loadActiveConnections() { const { rows } = await pool.query( `SELECT id, user_id AS "userId", email, imap_host AS "imapHost", imap_port AS "imapPort", password_encrypted AS "passwordEncrypted", reject_unauthorized AS "rejectUnauthorized", use_starttls AS "useStarttls", auth_method AS "authMethod", oauth_access_token AS "oauthAccessToken", oauth_refresh_token AS "oauthRefreshToken", oauth_token_expiry AS "oauthTokenExpiry", consent_at AS "consentAt" FROM rebreak.mail_connections WHERE is_active = true`, ); return rows; } // ─── Crypto (analog zu server/utils/crypto.ts) ─────────────────────────────── const AES_ALGO = "aes-256-gcm"; const KEY_LENGTH = 32; function getKey() { const raw = process.env.NUXT_ENCRYPTION_KEY || process.env.ENCRYPTION_KEY || ""; if (!raw || raw.length < KEY_LENGTH) { return Buffer.alloc( KEY_LENGTH, raw.padEnd(KEY_LENGTH, "0").slice(0, KEY_LENGTH), ); } return Buffer.from(raw.slice(0, KEY_LENGTH), "utf8"); } function decrypt(stored) { const parts = stored.split(":"); if (parts.length !== 3) throw new Error("Invalid encrypted format"); const [ivHex, tagHex, dataHex] = parts; const decipher = createDecipheriv( AES_ALGO, getKey(), Buffer.from(ivHex, "hex"), ); decipher.setAuthTag(Buffer.from(tagHex, "hex")); return decipher.update(Buffer.from(dataHex, "hex")) + decipher.final("utf8"); } function encrypt(plaintext) { const key = getKey(); const iv = randomBytes(12); // 96-bit IV für AES-256-GCM const cipher = createCipheriv(AES_ALGO, key, iv); const encrypted = Buffer.concat([ cipher.update(plaintext, "utf8"), cipher.final(), ]); const tag = cipher.getAuthTag(); return `${iv.toString("hex")}:${tag.toString("hex")}:${encrypted.toString("hex")}`; } // ─── Microsoft Token Refresh (inline, kein Prisma-Kontext) ─────────────────── const MS_TOKEN_ENDPOINT = "https://login.microsoftonline.com/common/oauth2/v2.0/token"; // MUSS identisch zu backend/server/utils/ms-oauth.ts MS_OAUTH_SCOPES sein. // Microsoft V2.0 erlaubt im /token-Exchange nur Scopes EINES Resource-Servers // — `User.Read` (graph.microsoft.com) mit `IMAP.AccessAsUser.All` // (outlook.office.com) wirft AADSTS70011. `email` ist ein OIDC-Standard-Scope // und damit cross-resource-kompatibel; liefert den email-Claim ins id_token. const MS_OAUTH_SCOPES = [ "https://outlook.office.com/IMAP.AccessAsUser.All", "offline_access", "openid", "email", ].join(" "); /** * Refresht Microsoft Access+Refresh-Token direkt via HTTP. * Race-Condition-Strategie (Optimistic Concurrency): * 1. Lese aktuelle oauth_token_expiry aus DB als "Fingerprint" des aktuellen Stands. * 2. POST an MS Token-Endpoint. * 3. UPDATE ... WHERE oauth_token_expiry = * → affected_rows = 0: anderer Prozess hat parallel refresht. * Dann: frischen token lesen + zurückgeben (kein doppelter Refresh!). * Doppelter Refresh würde MS-Refresh-Token-Rotation verletzen → * AADSTS70043 beim nächsten Versuch. * 4. Gibt plaintext Access-Token zurück, sofort für IMAP nutzbar. * * Wirft wenn: * - Connection nicht gefunden / kein Refresh-Token * - MS Token-Endpoint antwortet mit Fehler (revoked/expired Refresh-Token) */ async function refreshAndSaveTokensDaemon(connectionId, clientId) { // Step 1: Aktuellen Token-Stand lesen const { rows } = await pool.query( `SELECT oauth_refresh_token, oauth_access_token, oauth_token_expiry FROM rebreak.mail_connections WHERE id = $1 AND auth_method = 'oauth2_microsoft'`, [connectionId], ); const conn = rows[0]; if (!conn?.oauth_refresh_token) { throw new Error( `Connection ${connectionId} has no oauth_refresh_token — cannot refresh`, ); } const currentExpiry = conn.oauth_token_expiry; const decryptedRefreshToken = decrypt(conn.oauth_refresh_token); // Step 2: MS Token-Endpoint const body = new URLSearchParams({ grant_type: "refresh_token", client_id: clientId, refresh_token: decryptedRefreshToken, scope: MS_OAUTH_SCOPES, }); const res = await fetch(MS_TOKEN_ENDPOINT, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded" }, body: body.toString(), }); if (!res.ok) { const errText = await res.text().catch(() => "unknown"); throw new Error(`MS token refresh failed (${res.status}): ${errText}`); } const data = await res.json(); if (!data.access_token) { throw new Error("MS refresh response missing access_token"); } const newExpiry = new Date(Date.now() + data.expires_in * 1000); // MS rotiert Refresh-Tokens — wenn kein neuer kommt, den alten behalten const newRefreshToken = data.refresh_token ?? decryptedRefreshToken; const encryptedAccess = encrypt(data.access_token); const encryptedRefresh = encrypt(newRefreshToken); // Step 3: Optimistic update — nur wenn oauth_token_expiry noch gleich wie gelesen // IS NOT DISTINCT FROM deckt NULL=NULL korrekt ab (vs. = das NULL!=NULL behandelt) const result = await pool.query( `UPDATE rebreak.mail_connections SET oauth_access_token = $1, oauth_refresh_token = $2, oauth_token_expiry = $3 WHERE id = $4 AND oauth_token_expiry IS NOT DISTINCT FROM $5`, [encryptedAccess, encryptedRefresh, newExpiry, connectionId, currentExpiry], ); if (result.rowCount === 0) { // Step 4: Anderer Prozess hat parallel refresht — lese deren frischen Token const { rows: fresh } = await pool.query( `SELECT oauth_access_token FROM rebreak.mail_connections WHERE id = $1`, [connectionId], ); if (!fresh[0]?.oauth_access_token) { throw new Error( `Concurrent refresh for ${connectionId} detected but no token found`, ); } return decrypt(fresh[0].oauth_access_token); } return data.access_token; } /** * Refresht Google Access-Token direkt via HTTP. * Race-Condition-Strategie analog zu refreshAndSaveTokensDaemon (MS). * * Google-Spezifik: * - Google rotiert refresh_tokens NICHT bei normalem Einsatz. Das ausgestellte * refresh_token ist langlebig. Exceptions: User-Revocation, 6-Monate-Inaktivität. * - Response enthält KEIN neues refresh_token → immer das bestehende behalten. * - Kein scope-Parameter nötig beim Refresh (Google ignoriert ihn, MS braucht ihn). * - auth_method-Filter: WHERE auth_method = 'oauth2_google' für die DB-Abfrage. */ async function refreshGoogleTokensDaemon(connectionId, clientId) { // Step 1: Aktuellen Token-Stand lesen const { rows } = await pool.query( `SELECT oauth_refresh_token, oauth_access_token, oauth_token_expiry FROM rebreak.mail_connections WHERE id = $1 AND auth_method = 'oauth2_google'`, [connectionId], ); const conn = rows[0]; if (!conn?.oauth_refresh_token) { throw new Error( `Connection ${connectionId} has no oauth_refresh_token (google) — cannot refresh`, ); } const currentExpiry = conn.oauth_token_expiry; const decryptedRefreshToken = decrypt(conn.oauth_refresh_token); // Step 2: Google Token-Endpoint const body = new URLSearchParams({ grant_type: "refresh_token", client_id: clientId, refresh_token: decryptedRefreshToken, // kein scope-Parameter — Google ignoriert ihn beim Refresh }); const res = await fetch("https://oauth2.googleapis.com/token", { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded" }, body: body.toString(), }); if (!res.ok) { const errText = await res.text().catch(() => "unknown"); throw new Error(`Google token refresh failed (${res.status}): ${errText}`); } const data = await res.json(); if (!data.access_token) { throw new Error("Google refresh response missing access_token"); } const newExpiry = new Date(Date.now() + data.expires_in * 1000); // Google rotiert NICHT — refresh_token bleibt unverändert const encryptedAccess = encrypt(data.access_token); // refresh_token bleibt derselbe → re-encrypt aus dem bestehenden decrypted Wert const encryptedRefresh = encrypt(decryptedRefreshToken); // Step 3: Optimistic update const result = await pool.query( `UPDATE rebreak.mail_connections SET oauth_access_token = $1, oauth_refresh_token = $2, oauth_token_expiry = $3 WHERE id = $4 AND oauth_token_expiry IS NOT DISTINCT FROM $5`, [encryptedAccess, encryptedRefresh, newExpiry, connectionId, currentExpiry], ); if (result.rowCount === 0) { // Concurrent refresh — lese frischen Token const { rows: fresh } = await pool.query( `SELECT oauth_access_token FROM rebreak.mail_connections WHERE id = $1`, [connectionId], ); if (!fresh[0]?.oauth_access_token) { throw new Error( `Concurrent Google refresh for ${connectionId} detected but no token found`, ); } return decrypt(fresh[0].oauth_access_token); } return data.access_token; } /** * Markiert eine Connection als "auth_broken" — kein weiterer Retry im Daemon. * User sieht den Error im Frontend (last_connect_error = 'auth_revoked'). */ async function markConnectionAuthBroken(connectionId) { await pool.query( `UPDATE rebreak.mail_connections SET last_connect_error = 'auth_revoked', last_connect_error_at = NOW() WHERE id = $1`, [connectionId], ); } // ─── Credentials-Resolution ────────────────────────────────────────────────── /** * Gibt die IMAP-Credentials für eine Connection zurück. * auth_method-aware: * * app_password → { type: 'password', user, pass } * oauth2_microsoft → { type: 'xoauth2', user, accessToken } (MS XOAUTH2) * oauth2_google → { type: 'xoauth2', user, accessToken } (Google XOAUTH2) * * Für OAuth: wenn Token in <5min abläuft, wird vor Connect proaktiv refresht. * * Der reaktive Refresh-Path (Cold-Path) liegt in runSession(): * AUTHENTICATIONFAILED während laufender Session → provider-spezifischer Refresh * → neuer ImapFlow-Connect mit frischem Token. * * FORCE_REFRESH_FOR_TEST: wenn env IDLE_FORCE_TOKEN_REFRESH=1, immer refreshen. */ async function getCredentialsForConnection(conn) { if (conn.authMethod === "oauth2_microsoft" || conn.authMethod === "oauth2_google") { const forceRefresh = process.env.IDLE_FORCE_TOKEN_REFRESH === "1"; const fiveMinFromNow = Date.now() + TOKEN_EXPIRY_THRESHOLD_MS; const isExpiring = !conn.oauthTokenExpiry || new Date(conn.oauthTokenExpiry).getTime() < fiveMinFromNow; if (forceRefresh || isExpiring) { const reason = forceRefresh ? "IDLE_FORCE_TOKEN_REFRESH=1" : "token expiring <5min"; console.log(`[idle/${conn.email}] proactive token refresh (${reason}, method=${conn.authMethod})`); let accessToken; if (conn.authMethod === "oauth2_google") { accessToken = await refreshGoogleTokensDaemon(conn.id, GOOGLE_OAUTH_CLIENT_ID); } else { accessToken = await refreshAndSaveTokensDaemon(conn.id, MS_OAUTH_CLIENT_ID); } return { type: "xoauth2", user: conn.email, accessToken }; } // Token noch gültig — direkt entschlüsseln if (!conn.oauthAccessToken) { throw new Error(`Connection ${conn.id} has no oauth_access_token (method=${conn.authMethod})`); } const accessToken = decrypt(conn.oauthAccessToken); return { type: "xoauth2", user: conn.email, accessToken }; } // App-Password-Pfad: iCloud, GMX, Yahoo, Custom-IMAP, Gmail-App-Password if (!conn.passwordEncrypted) { throw new Error(`Connection ${conn.id} has no password_encrypted`); } const pass = decrypt(conn.passwordEncrypted); return { type: "password", user: conn.email, pass }; } // ─── Logging ───────────────────────────────────────────────────────────────── function log(email, msg) { // NIEMALS password/accessToken/credentials loggen. email ist safe. console.log(`[idle/${email}] ${msg}`); } function logError(email, msg, err) { // responseText = IMAP-Serverantwort (z.B. "NO [AUTHENTICATIONFAILED] Invalid credentials") // Credentials tauchen NICHT in err.responseText auf — ImapFlow legt sie nicht rein. const errMsg = err?.responseText || err?.message || String(err); console.error(`[idle/${email}] ${msg}: ${errMsg}`); } // ─── Auth-Fehler Detection ─────────────────────────────────────────────────── function isAuthError(err) { const text = (err?.responseText || err?.message || "").toUpperCase(); return ( text.includes("AUTHENTICATIONFAILED") || text.includes("AUTHENTICATE") || text.includes("INVALID CREDENTIALS") || text.includes("AUTHENTICATION FAILED") || // MS-spezifische Fehlercodes: AADSTS = Azure AD Token Service Error text.includes("AADSTS") ); } // ─── Session-Registry ──────────────────────────────────────────────────────── // Map const sessions = new Map(); let shuttingDown = false; // ─── In-Flight-Guard ───────────────────────────────────────────────────────── // Verhindert gestapelte scan-internal-Aufrufe für dieselbe Connection. // // scanInFlight: Map — läuft gerade ein Scan? // coalescePending: Map — kam während des laufenden Scans // ein weiterer Trigger? Wenn ja, einmalig nachholen. // // Verhalten: // - Kein laufender Scan: sofort feuern, inFlight=true setzen // - Läuft Scan bereits: pending=true setzen (coalesce — nicht stapeln) // - Nach Scan-Ende: wenn pending=true → einen weiteren Scan starten, // dann pending=false. Maximal EIN gestapelter Trigger bleibt hängen. // // Gilt für NOOP-Tick UND exists-Event (beide rufen triggerScan auf). const scanInFlight = new Map(); // Map const coalescePending = new Map(); // Map /** * Feuert POST /api/mail/scan-internal für eine Connection. * * opts.forceFullSweep: wenn true, wird scan-internal angewiesen einen * Full-Sweep aller Ordner durchzuführen (ignoriert lastUid / lastFullSweepAt- * Schwelle). Wird bei Recovery nach Downtime gesetzt (Cold-Start oder * Reconnect nach > RECOVERY_SWEEP_THRESHOLD_MS). * * Beim Coalesce-Fall (Scan läuft, neuer Trigger kommt rein): wenn irgendein * Trigger mit forceFullSweep=true kam, wird das Flag für den nachgeholten * Scan beibehalten — ein inkrementeller Trigger darf einen pending-Full-Sweep * nicht degradieren. */ async function triggerScan(conn, opts = {}) { const connId = conn.id; const forceFullSweep = opts.forceFullSweep === true; if (scanInFlight.get(connId)) { // Scan läuft bereits — coalescer merkt sich den Wunsch, stapelt nicht. // forceFullSweep wird gemerkt: einmal gesetzt bleibt es gesetzt bis der // nachgeholte Scan läuft (downgrade von full→incremental verboten). if (!coalescePending.get(connId)) { log(conn.email, `scan-trigger coalesced (in-flight)${forceFullSweep ? " [force-full]" : ""}`); coalescePending.set(connId, forceFullSweep ? "full" : "incremental"); } else if (forceFullSweep && coalescePending.get(connId) !== "full") { // Upgrade: incremental→full wenn neuer Trigger full verlangt coalescePending.set(connId, "full"); } return; } scanInFlight.set(connId, true); try { const body = { userId: conn.userId }; if (forceFullSweep) body.forceFullSweep = true; const res = await fetch(`${BACKEND_URL}/api/mail/scan-internal`, { method: "POST", headers: { "Content-Type": "application/json", "x-admin-secret": ADMIN_SECRET, }, body: JSON.stringify(body), }); if (!res.ok) { log(conn.email, `scan-trigger HTTP ${res.status}${forceFullSweep ? " [force-full]" : ""}`); } else { const data = await res.json().catch(() => ({})); log( conn.email, `scan-triggered → scanned=${data.scanned ?? "?"} blocked=${data.blocked ?? "?"}${forceFullSweep ? " [force-full]" : ""}`, ); } } catch (err) { logError(conn.email, "scan-trigger failed", err); } finally { scanInFlight.set(connId, false); // Coalesced Trigger nachholen: einmalig, fire-and-forget. // forceFullSweep-Flag aus dem gemerkten Zustand übernehmen. const pendingState = coalescePending.get(connId); if (pendingState) { coalescePending.set(connId, false); const pendingForce = pendingState === "full"; log(conn.email, `scan-trigger coalesced fire (post-flight)${pendingForce ? " [force-full]" : ""}`); triggerScan(conn, { forceFullSweep: pendingForce }).catch(() => {}); } } } // ─── IDLE-Session ───────────────────────────────────────────────────────────── /** * Startet eine einzelne IDLE-Session für eine MailConnection. * Reconnect-Loop läuft intern — diese Funktion returned nie (bis shutdown). * * Auth-Retry-Loop (OAuth-spezifisch): * Bei AUTHENTICATIONFAILED → Token refreshen → neu verbinden. * Max MAX_AUTH_RETRIES (3) mal. Dann: markConnectionAuthBroken + exit. * authRetries wird nach jedem erfolgreichen Connect resettet. * * Consent-Gate: * conn.consentAt === null → IDLE-Verbindung wird gehalten (keep-alive), * aber triggerScan() wird NICHT aufgerufen. exists-Events werden still ignoriert. * Sobald der User consent erteilt (DB-Refresh-Cycle nach max. 5min), * wird die Connection neu gestartet mit consentAt gesetzt. */ async function runSession(conn) { let attempt = 0; let authRetries = 0; // Recovery-Sweep-Tracking: // isColdStart=true beim ersten Loop-Durchlauf (Prozess-Start oder Session-Erststart). // isRecovery wird im catch-Block gesetzt wenn die Session länger als // RECOVERY_SWEEP_THRESHOLD_MS unterbrochen war. // Beide Flags lösen beim nächsten erfolgreichen Connect forceFullSweep aus. // lastConnectedAt: Zeitpunkt des letzten erfolgreichen imap.connect() — // damit wissen wir im catch-Block wie lange die Downtime dauerte. let isColdStart = true; let isRecovery = false; let lastConnectedAt = 0; // ms-Timestamp, 0 = noch nie verbunden gewesen while (!shuttingDown) { // Credentials holen (proaktiver Token-Refresh wenn nötig) let creds; try { creds = await getCredentialsForConnection(conn); } catch (err) { logError(conn.email, "credential resolution failed — session aborted", err); // Kein retry: kaputte Credentials oder Token-Refresh fehlgeschlagen // (z.B. Refresh-Token revoked). Als auth_broken markieren wenn OAuth. if (conn.authMethod === "oauth2_microsoft") { await markConnectionAuthBroken(conn.id).catch(() => {}); } else { await updateConnectionError(conn.id, err?.message || String(err)).catch(() => {}); } return; } const useImplicitTls = !conn.useStarttls; const imap = new ImapFlow({ host: conn.imapHost, port: conn.imapPort, secure: useImplicitTls, ...(conn.useStarttls ? { requireTLS: true } : {}), // ImapFlow 1.2.18 unterstützt XOAUTH2 nativ: // { user, accessToken } → AUTHENTICATE XOAUTH2 // { user, pass } → LOGIN oder PLAIN (je nach Server-Capability) auth: creds.type === "xoauth2" ? { user: creds.user, accessToken: creds.accessToken } : { user: creds.user, pass: creds.pass }, logger: false, tls: { rejectUnauthorized: conn.rejectUnauthorized ?? true }, // outlook.office365.com: disableCompression verhindert edge-cases bei // partial reads nach Reconnect. Gilt für oauth2_microsoft Connections. disableCompression: conn.imapHost.includes("office365"), }); // ImapFlow kann socket-level Errors (ECONNRESET, TLS-disconnect, ETIMEDOUT) // als EventEmitter-'error'-Event feuern — ZUSÄTZLICH zu oder STATT eines // rejected Promise. Ohne diesen Handler eskaliert Node zu uncaughtException // → Prozess-Exit → alle 44+ Sessions fallen gleichzeitig aus. // Dieser Handler absorbiert das Event account-lokal; der bestehende catch-Block // weiter unten greift für den Promise-Rejection-Pfad. imap.on("error", (err) => { logError(conn.email, "imap socket error (absorbed, reconnect-loop handles it)", err); // imap.close() hier ist safe: idempotent, löst idlePromise-rejection aus // → IDLE-Loop verlässt await idlePromise → runSession-catch greift. try { imap.close(); } catch { /* ignore */ } }); // Referenz ablegen damit shutdown() darauf zugreifen kann const handle = sessions.get(conn.id); if (handle) handle.imap = imap; try { await imap.connect(); log(conn.email, `connected (${conn.imapHost}:${conn.imapPort}, auth=${creds.type})`); lastConnectedAt = Date.now(); attempt = 0; // Reset nach erfolgreicher Verbindung authRetries = 0; // Auth-Retry-Counter ebenfalls reset // Initial-Heartbeat: sofort nach erfolgreichem Connect schreiben damit // das Frontend "aktiv" zeigt statt bis zum ersten NOOP-Cycle zu warten // (NOOP-Cycle = alle 2min → worst-case 2min+, gemessene delay 2-9min). // Gilt für alle auth-Methoden (app_password + oauth2_microsoft) und // auch für den Re-Connect nach AUTHENTICATIONFAILED-Recovery, da beide // Paths durch diesen Block laufen. // last_connect_error wird gleichzeitig geclearet: ein zuvor failed-State // (z.B. abgelaufener Token) ist nach erfolgreichem Connect behoben. await Promise.all([ updateIdleHeartbeat(conn.id).catch(() => {}), clearConnectionError(conn.id).catch(() => {}), ]); // Initial-Sweep: einmalig nach erfolgreichem Connect scan-internal anstoßen. // Damit werden bestehende Gambling-Mails in allen Folders sofort gelöscht, // statt auf das erste exists-Event zu warten (das nur bei neuen Mails kommt). // // Recovery-Sweep-Logik: // isColdStart=true → Prozess frisch gestartet (pm2-Restart, Deploy) → forceFullSweep // isRecovery=true → Reconnect nach Downtime > RECOVERY_SWEEP_THRESHOLD_MS → forceFullSweep // sonst → routinemäßiger IDLE-Renew oder kurzer Reconnect → inkrementell // // Cold-Start + Recovery sind die einzigen Fälle wo Mails während Downtime // ankamen ohne von einem exists-Event erfasst zu werden. Full-Sweep stellt sicher // dass global-blocklistete Domains (Layer-2-Hard-Block) nie durchrutschen. // // scan-internal baut eine eigene IMAP-Connection auf → kein Lock-Konflikt. // Consent-Gate sitzt in scan-internal selbst → kein doppeltes Check hier. // fire-and-forget: Fehler werden intern geloggt, Session läuft weiter. const sweepReason = isColdStart ? "cold-start" : isRecovery ? "recovery" : null; const needsSweepForce = isColdStart || isRecovery; if (sweepReason) { log(conn.email, `initial sweep [force-full, reason=${sweepReason}]`); } triggerScan(conn, { forceFullSweep: needsSweepForce }).catch(() => {}); // Flags für diesen Connect-Durchlauf konsumieren isColdStart = false; isRecovery = false; // Outlook/XOAUTH2 hat den Edge-Case dass getMailboxLock lautlos hängt // wenn der Server in einen ungültigen Zustand kommt — die Session // bleibt offen ohne Fortschritt bis der Renew-Timer (10min) ein // imap.close() schickt. Timeout-wrap macht das Failure-Mode explizit // → Auth-Retry-Loop greift sauber. await Promise.race([ imap.getMailboxLock("INBOX"), new Promise((_, reject) => setTimeout(() => reject(new Error("getMailboxLock timeout (30s)")), 30_000), ), ]); // Consent-Gate-Log: einmalig beim Connect — nur wenn consentAt fehlt if (!conn.consentAt) { log( conn.email, "consent_at=NULL — IDLE session held but scan/delete suspended. " + "Re-consent required via app.", ); } // IDLE-Loop: alle IDLE_RENEW_INTERVAL_MS erneuern while (!shuttingDown && sessions.has(conn.id)) { const idlePromise = new Promise((resolve, reject) => { imap.idle().then(resolve).catch(reject); }); // exists-event → sofort scannen (nur wenn consent erteilt) const onExists = () => { if (!conn.consentAt) { // Consent fehlt: exists-event ignorieren, Mail bleibt in Inbox. // UI zeigt Re-Consent-Modal (via /api/mail/pending-consent Endpoint). log(conn.email, "exists-event received — skipped (no consent_at)"); return; } log(conn.email, "exists-event received (new mail)"); triggerScan(conn); // fire-and-forget }; imap.on("exists", onExists); // IDLE nach 10min erneuern // Gilt für: GMX (~10-15min), Gmail (~29min), iCloud (~29min), outlook.office365.com (~29min) const renewTimer = setTimeout(() => { log(conn.email, "idle renewing (10min threshold)"); imap.close(); }, IDLE_RENEW_INTERVAL_MS); // NOOP-heartbeat alle 2min: silent-drop early-detection (GMX-pattern). // Zusätzlich: fire-and-forget scan-internal bei jedem Tick (Junk-Folder-Fix). // Outlook/Hotmail liefert kein INBOX-exists-Event für Mails die direkt in // "Junk Email" landen — IDLE hört nur INBOX. Der 2min-Tick stellt sicher dass // auch Junk-Ordner-Mails innerhalb von max. 2min erfasst werden. // Consent-Gate sitzt in scan-internal → kein doppeltes Check hier nötig. const noopTimer = setInterval(async () => { try { await imap.noop(); await updateIdleHeartbeat(conn.id).catch(() => {}); // Junk-Folder-Sweep: scan-internal scannt alle Ordner inkl. Junk. // Nur auslösen wenn Consent erteilt (analog exists-Event-Guard). if (conn.consentAt) { triggerScan(conn).catch(() => {}); } } catch (err) { logError(conn.email, "noop failed — connection dead, force reconnect", err); imap.close(); } }, IDLE_NOOP_INTERVAL_MS); try { await idlePromise; } finally { clearInterval(noopTimer); clearTimeout(renewTimer); imap.removeListener("exists", onExists); } if (shuttingDown || !sessions.has(conn.id)) break; await sleep(500); } try { await imap.logout(); } catch { /* ignore */ } return; // Sauberer Exit (shutdown oder Connection entfernt) } catch (err) { logError(conn.email, "connection error", err); try { imap.close(); } catch { /* ignore */ } // ── Recovery-Sweep-Gating ───────────────────────────────────────────── // Jeder Fehler hier bedeutet: die Session war unterbrochen. Wenn sie // lange genug unterbrochen war (> RECOVERY_SWEEP_THRESHOLD_MS), setzen // wir isRecovery=true damit der nächste erfolgreiche Connect einen // forceFullSweep auslöst. // lastConnectedAt=0: Session hat noch nie erfolgreich verbunden (z.B. // Backend war beim Cold-Start noch nicht ready) → immer forceFullSweep // (isColdStart deckt diesen Fall, aber doppelte Absicherung schadet nicht). const downtimeMs = lastConnectedAt > 0 ? Date.now() - lastConnectedAt : Infinity; if (downtimeMs > RECOVERY_SWEEP_THRESHOLD_MS) { isRecovery = true; log( conn.email, `downtime ${Math.round(downtimeMs / 1000)}s > threshold — next connect will force full sweep`, ); } // ── AUTHENTICATIONFAILED-Recovery (OAuth-spezifisch) ────────────────── // Cold-Path: Token zwischen zwei IDLE-Renewals abgelaufen (>1h Session). // Oder: proaktiver Refresh ist fehlgeschlagen und wir landen hier. const isOauthConn = conn.authMethod === "oauth2_microsoft" || conn.authMethod === "oauth2_google"; if (isOauthConn && isAuthError(err)) { authRetries++; log( conn.email, `AUTHENTICATIONFAILED detected — refresh attempt ${authRetries}/${MAX_AUTH_RETRIES} (method=${conn.authMethod})`, ); if (authRetries > MAX_AUTH_RETRIES) { // Auth dauerhaft revoked (User hat App-Permission entzogen o.ä.) // Connection als auth_broken markieren — User muss re-connecten. log(conn.email, `auth_broken after ${MAX_AUTH_RETRIES} retries — session stopped`); await markConnectionAuthBroken(conn.id).catch(() => {}); sessions.delete(conn.id); return; } // Token refreshen — direkt hier im catch-Block. // Provider-spezifisch: Google via refreshGoogleTokensDaemon, MS via refreshAndSaveTokensDaemon. try { let freshToken; if (conn.authMethod === "oauth2_google") { freshToken = await refreshGoogleTokensDaemon(conn.id, GOOGLE_OAUTH_CLIENT_ID); } else { freshToken = await refreshAndSaveTokensDaemon(conn.id, MS_OAUTH_CLIENT_ID); } // conn inline patchen damit getCredentialsForConnection() beim nächsten // Loop-Durchlauf den frischen Token nutzt ohne erneuten DB-Read. conn.oauthAccessToken = encrypt(freshToken); conn.oauthTokenExpiry = new Date(Date.now() + 55 * 60 * 1000); // ~55min buffer log(conn.email, `token refreshed (${conn.authMethod}) — reconnecting immediately`); // Kein normaler Backoff nach Auth-Refresh — sofort neu verbinden. continue; } catch (refreshErr) { logError(conn.email, "token refresh failed after AUTHENTICATIONFAILED", refreshErr); // Refresh selbst gescheitert → normaler Backoff // authRetries bleibt erhöht — beim nächsten Auth-Fehler zählt es weiter. await updateConnectionError(conn.id, refreshErr?.message || String(refreshErr)).catch(() => {}); } } else { // Nicht-Auth-Fehler (Netz, TLS, etc.) — normal in DB schreiben const errText = err?.responseText || err?.message || String(err); await updateConnectionError(conn.id, errText).catch(() => {}); } } if (shuttingDown || !sessions.has(conn.id)) return; // Exponential backoff const delay = attempt < RECONNECT_DELAYS_MS.length ? RECONNECT_DELAYS_MS[attempt] : RECONNECT_LOOP_DELAY_MS; attempt++; log(conn.email, `reconnecting in ${delay / 1000}s (attempt ${attempt})`); await sleep(delay); } } // ─── Session-Management ─────────────────────────────────────────────────────── function startSession(conn) { if (sessions.has(conn.id)) return; // bereits aktiv log(conn.email, "session starting"); const handle = { conn, imap: null, promise: null }; sessions.set(conn.id, handle); handle.promise = runSession(conn).catch((err) => { logError(conn.email, "session crashed (unhandled)", err); sessions.delete(conn.id); }); } async function stopSession(connectionId, email) { const handle = sessions.get(connectionId); if (!handle) return; log(email ?? connectionId, "session stopping"); sessions.delete(connectionId); if (handle.imap) { try { handle.imap.close(); } catch { /* ignore */ } } await handle.promise.catch(() => {}); } // ─── DB-Refresh-Loop ────────────────────────────────────────────────────────── async function refreshConnections() { let rows; try { rows = await loadActiveConnections(); } catch (err) { console.error("[idle/db] loadActiveConnections failed:", err.message); return; } const activeIds = new Set(rows.map((r) => r.id)); // Neue Connections starten for (const row of rows) { if (!sessions.has(row.id)) { startSession(row); } } // Entfernte Connections stoppen for (const [id, handle] of sessions.entries()) { if (!activeIds.has(id)) { await stopSession(id, handle.conn?.email); } } // Consent-Status für laufende Sessions aktualisieren. // Wenn ein User consent erteilt hat seit dem letzten DB-Refresh: // Die laufende Session bekommt conn.consentAt gesetzt — next exists-event // löst dann sofort triggerScan() aus. Kein Restart nötig. for (const row of rows) { const handle = sessions.get(row.id); if (handle && handle.conn.consentAt !== row.consentAt) { if (!handle.conn.consentAt && row.consentAt) { log(row.email, "consent_at received — scan/delete now active"); } handle.conn.consentAt = row.consentAt; } } const consentPending = rows.filter((r) => !r.consentAt).length; console.log( `[idle/db] refreshed — ${activeIds.size} active connections, ${sessions.size} sessions` + (consentPending > 0 ? `, ${consentPending} pending consent` : ""), ); } // ─── Graceful Shutdown ──────────────────────────────────────────────────────── async function shutdown(signal) { console.log(`[idle] received ${signal} — shutting down gracefully`); shuttingDown = true; const stopPromises = []; for (const [id, handle] of sessions.entries()) { stopPromises.push(stopSession(id, handle.conn?.email)); } await Promise.allSettled(stopPromises); await pool.end().catch(() => {}); console.log("[idle] shutdown complete"); process.exit(0); } process.on("SIGTERM", () => shutdown("SIGTERM")); process.on("SIGINT", () => shutdown("SIGINT")); // ─── Letztes Netz: bekannte IMAP-Socket-Errors als uncaughtException ────────── // Obwohl per-Account imap.on('error') und try/catch die primären Abfanggräben // sind, kann ein Timing-Fenster (z.B. Error-Event feuert zwischen close() und // neuem imap-Objekt) doch auf Prozessebene ankommen. // Dieser Guard schluckt NUR bekannte IMAP/Socket-Fehler (ECONNRESET, ETIMEDOUT, // ECONNREFUSED, EPIPE, EHOSTUNREACH, TLS-Fehler). Alles andere (Bug im Daemon- // Code selbst) wird NICHT geschluckt — normaler uncaughtException-Exit bleibt. const IMAP_SOCKET_ERROR_CODES = new Set([ "ECONNRESET", "ETIMEDOUT", "ECONNREFUSED", "EPIPE", "EHOSTUNREACH", "ENOTFOUND", "ENETUNREACH", ]); process.on("uncaughtException", (err, origin) => { const code = err?.code; const msg = err?.message ?? ""; const isKnownImapError = (code && IMAP_SOCKET_ERROR_CODES.has(code)) || msg.includes("Client network socket disconnected") || msg.includes("before secure TLS connection") || msg.includes("ECONNRESET") || msg.includes("socket hang up"); if (isKnownImapError) { console.error( `[idle] uncaughtException absorbed (known IMAP/socket error) — origin=${origin} code=${code ?? "?"}: ${msg}`, ); // Daemon läuft weiter — Reconnect-Loop der betroffenen Session übernimmt return; } // Unbekannter Fehler: normal crashen (kein stummes Schlucken von Bugs) console.error(`[idle] uncaughtException (non-IMAP, crashing) — origin=${origin}:`, err); process.exit(1); }); process.on("unhandledRejection", (reason, promise) => { const msg = (reason instanceof Error ? reason.message : String(reason)) ?? ""; const code = (reason instanceof Error ? /** @type {any} */ (reason).code : undefined); const isKnownImapError = (code && IMAP_SOCKET_ERROR_CODES.has(code)) || msg.includes("Client network socket disconnected") || msg.includes("before secure TLS connection") || msg.includes("ECONNRESET") || msg.includes("socket hang up"); if (isKnownImapError) { console.error( `[idle] unhandledRejection absorbed (known IMAP/socket error): ${msg}`, ); return; } console.error("[idle] unhandledRejection (non-IMAP):", reason); }); // ─── Startup ────────────────────────────────────────────────────────────────── function sleep(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); } function assertEnv() { const missing = []; if (!process.env.DATABASE_URL) missing.push("DATABASE_URL"); if (!ADMIN_SECRET) missing.push("ADMIN_SECRET / NUXT_ADMIN_SECRET"); if (!MS_OAUTH_CLIENT_ID) missing.push("MS_OAUTH_CLIENT_ID"); if (!GOOGLE_OAUTH_CLIENT_ID) missing.push("GOOGLE_OAUTH_CLIENT_ID"); if (missing.length > 0) { console.error( `[idle] FEHLER: fehlende Env-Vars: ${missing.join(", ")}. Daemon startet nicht.`, ); process.exit(1); } } async function main() { assertEnv(); console.log( `[idle] starting — backend=${BACKEND_URL} env=${process.env.NODE_ENV ?? "unknown"}`, ); await refreshConnections(); setInterval(() => { if (!shuttingDown) refreshConnections(); }, DB_REFRESH_INTERVAL_MS); } main().catch((err) => { console.error("[idle] fatal startup error:", err); process.exit(1); });