chahinebrini 5b57bea9c0 perf(mail): kill redundant 30min scan-cron + in-flight scan guard
Backend-Lag-Fix Phase 1 — entlastet die CPU-Dauerschleife im Mail-Stack:
- delete mail-scan-cron.ts: der 30-Min-Nitro-Cron scannte alle User parallel
  (Promise.allSettled) und war redundant zum IMAP-IDLE-Daemon (Single Source
  of Truth). Reine Dauerlast ohne Mehrwert.
- imap-idle: In-Flight-Guard (scanInFlight + coalescePending). triggerScan ist
  jetzt re-entry-safe — pro Connection max. 1 aktiver + 1 pending Scan statt
  bis zu 8 gestapelt pro 2-Min-NOOP-Tick. Gilt für NOOP + exists-Event.
- plan-features: Pro mailAgents 3->2 (+ Math.min-Hack in coach/message aufgeräumt).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 10:38:06 +02:00

914 lines
35 KiB
JavaScript

/**
* rebreak-imap-idle — IMAP IDLE Daemon
*
* Hält pro aktivem MailConnection-Eintrag eine persistente IMAP-IDLE-Session.
* Wenn der Server "EXISTS" meldet (neue Mail), feuert der Daemon sofort
* POST /api/mail/scan-internal gegen das lokale Backend — ohne 30min-Warte.
*
* Auth-Methoden:
* app_password — Gmail (App-Password), iCloud, GMX, etc.
* oauth2_microsoft — Outlook / Hotmail / O365 via XOAUTH2 (ImapFlow-nativ)
* oauth2_google — Gmail via Google OAuth2 XOAUTH2 (kein App-Password nötig)
*
* Env-Vars (via Infisical-Wrapper):
* DATABASE_URL — Postgres-Connection-String
* ADMIN_SECRET — Header-Secret für /api/mail/scan-internal
* ENCRYPTION_KEY — AES-256 Key (gleicher wie im Backend, 32+ Zeichen)
* BACKEND_URL — z.B. http://127.0.0.1:3016 (default: 3016)
* MS_OAUTH_CLIENT_ID — Azure App Registration Client ID
* GOOGLE_OAUTH_CLIENT_ID — Google Cloud OAuth Client ID (iOS Native App)
* NODE_ENV — production / staging
*
* Starten:
* node index.mjs
*
* Prozess-Signale:
* SIGTERM / SIGINT → graceful shutdown (alle IMAP-Sessions schließen)
*/
import { ImapFlow } from "imapflow";
import pg from "pg";
import { createCipheriv, createDecipheriv, randomBytes } from "crypto";
// ─── Config ──────────────────────────────────────────────────────────────────
const BACKEND_URL =
process.env.BACKEND_URL ||
(process.env.NODE_ENV === "production"
? "http://127.0.0.1:3015"
: "http://127.0.0.1:3016");
const ADMIN_SECRET =
process.env.NUXT_ADMIN_SECRET || process.env.ADMIN_SECRET || "";
const MS_OAUTH_CLIENT_ID =
process.env.MS_OAUTH_CLIENT_ID || "";
const GOOGLE_OAUTH_CLIENT_ID =
process.env.GOOGLE_OAUTH_CLIENT_ID || "";
const DB_REFRESH_INTERVAL_MS = 5 * 60 * 1000; // 5 min — neue Connections entdecken
// IDLE_RENEW von 25min → 10min: GMX dropped IDLE-connections silent vor 25min
// → exists-events kommen nie an + ImapFlow.idle() hängt ohne reject. 10min
// deckt alle bekannten Provider-Timeouts ab:
// GMX ~10-15min (aggressivster Provider)
// Gmail ~29min
// iCloud ~29min
// outlook.office365.com ~29min (Microsoft dokumentiert 29min IDLE-Timeout)
// Trade-off: alle 10min full reconnect-cycle. Vertretbar.
const IDLE_RENEW_INTERVAL_MS = 10 * 60 * 1000; // 10 min
// NOOP-heartbeat alle 2min während IDLE: detect silent-drops (GMX-pattern).
// Wenn NOOP fehlschlägt → close → loop iteriert → reconnect.
const IDLE_NOOP_INTERVAL_MS = 2 * 60 * 1000; // 2 min
// Token-Refresh-Schwelle: wenn Access-Token in weniger als 5min abläuft, vor
// dem IMAP-Connect refreshen. Verhindert Mid-Session-Expiry.
const TOKEN_EXPIRY_THRESHOLD_MS = 5 * 60 * 1000; // 5 min
// Bei AUTHENTICATIONFAILED: max. 3 Refresh-Versuche bevor Connection als
// auth_broken markiert und der Daemon sie aufgibt.
const MAX_AUTH_RETRIES = 3;
const RECONNECT_DELAYS_MS = [1000, 5000, 30_000]; // exponential backoff
const RECONNECT_LOOP_DELAY_MS = 60 * 1000;
// ─── DB-Pool (direktes pg, kein Prisma — Daemon ist kein Nitro-Kontext) ─────
const pool = new pg.Pool({ connectionString: process.env.DATABASE_URL });
async function updateConnectionError(connId, errorText) {
await pool.query(
`UPDATE rebreak.mail_connections
SET last_connect_error = $1, last_connect_error_at = NOW()
WHERE id = $2`,
[errorText, connId],
);
}
async function clearConnectionError(connId) {
await pool.query(
`UPDATE rebreak.mail_connections
SET last_connect_error = NULL, last_connect_error_at = NULL
WHERE id = $1`,
[connId],
);
}
async function updateIdleHeartbeat(connId) {
await pool.query(
`UPDATE rebreak.mail_connections
SET last_idle_heartbeat_at = NOW()
WHERE id = $1`,
[connId],
);
}
/**
* Lädt alle aktiven MailConnections inkl. OAuth-Felder und consent_at.
* Consent-Gate: connections ohne consent_at werden geladen aber vom
* IDLE-Loop nicht gescannt (nur gehalten — kein Delete).
*/
async function loadActiveConnections() {
const { rows } = await pool.query(
`SELECT id,
user_id AS "userId",
email,
imap_host AS "imapHost",
imap_port AS "imapPort",
password_encrypted AS "passwordEncrypted",
reject_unauthorized AS "rejectUnauthorized",
use_starttls AS "useStarttls",
auth_method AS "authMethod",
oauth_access_token AS "oauthAccessToken",
oauth_refresh_token AS "oauthRefreshToken",
oauth_token_expiry AS "oauthTokenExpiry",
consent_at AS "consentAt"
FROM rebreak.mail_connections
WHERE is_active = true`,
);
return rows;
}
// ─── Crypto (analog zu server/utils/crypto.ts) ───────────────────────────────
const AES_ALGO = "aes-256-gcm";
const KEY_LENGTH = 32;
function getKey() {
const raw =
process.env.NUXT_ENCRYPTION_KEY || process.env.ENCRYPTION_KEY || "";
if (!raw || raw.length < KEY_LENGTH) {
return Buffer.alloc(
KEY_LENGTH,
raw.padEnd(KEY_LENGTH, "0").slice(0, KEY_LENGTH),
);
}
return Buffer.from(raw.slice(0, KEY_LENGTH), "utf8");
}
function decrypt(stored) {
const parts = stored.split(":");
if (parts.length !== 3) throw new Error("Invalid encrypted format");
const [ivHex, tagHex, dataHex] = parts;
const decipher = createDecipheriv(
AES_ALGO,
getKey(),
Buffer.from(ivHex, "hex"),
);
decipher.setAuthTag(Buffer.from(tagHex, "hex"));
return decipher.update(Buffer.from(dataHex, "hex")) + decipher.final("utf8");
}
function encrypt(plaintext) {
const key = getKey();
const iv = randomBytes(12); // 96-bit IV für AES-256-GCM
const cipher = createCipheriv(AES_ALGO, key, iv);
const encrypted = Buffer.concat([
cipher.update(plaintext, "utf8"),
cipher.final(),
]);
const tag = cipher.getAuthTag();
return `${iv.toString("hex")}:${tag.toString("hex")}:${encrypted.toString("hex")}`;
}
// ─── Microsoft Token Refresh (inline, kein Prisma-Kontext) ───────────────────
const MS_TOKEN_ENDPOINT =
"https://login.microsoftonline.com/common/oauth2/v2.0/token";
// MUSS identisch zu backend/server/utils/ms-oauth.ts MS_OAUTH_SCOPES sein.
// Microsoft V2.0 erlaubt im /token-Exchange nur Scopes EINES Resource-Servers
// — `User.Read` (graph.microsoft.com) mit `IMAP.AccessAsUser.All`
// (outlook.office.com) wirft AADSTS70011. `email` ist ein OIDC-Standard-Scope
// und damit cross-resource-kompatibel; liefert den email-Claim ins id_token.
const MS_OAUTH_SCOPES = [
"https://outlook.office.com/IMAP.AccessAsUser.All",
"offline_access",
"openid",
"email",
].join(" ");
/**
* Refresht Microsoft Access+Refresh-Token direkt via HTTP.
* Race-Condition-Strategie (Optimistic Concurrency):
* 1. Lese aktuelle oauth_token_expiry aus DB als "Fingerprint" des aktuellen Stands.
* 2. POST an MS Token-Endpoint.
* 3. UPDATE ... WHERE oauth_token_expiry = <gelesener Wert>
* → affected_rows = 0: anderer Prozess hat parallel refresht.
* Dann: frischen token lesen + zurückgeben (kein doppelter Refresh!).
* Doppelter Refresh würde MS-Refresh-Token-Rotation verletzen →
* AADSTS70043 beim nächsten Versuch.
* 4. Gibt plaintext Access-Token zurück, sofort für IMAP nutzbar.
*
* Wirft wenn:
* - Connection nicht gefunden / kein Refresh-Token
* - MS Token-Endpoint antwortet mit Fehler (revoked/expired Refresh-Token)
*/
async function refreshAndSaveTokensDaemon(connectionId, clientId) {
// Step 1: Aktuellen Token-Stand lesen
const { rows } = await pool.query(
`SELECT oauth_refresh_token, oauth_access_token, oauth_token_expiry
FROM rebreak.mail_connections
WHERE id = $1 AND auth_method = 'oauth2_microsoft'`,
[connectionId],
);
const conn = rows[0];
if (!conn?.oauth_refresh_token) {
throw new Error(
`Connection ${connectionId} has no oauth_refresh_token — cannot refresh`,
);
}
const currentExpiry = conn.oauth_token_expiry;
const decryptedRefreshToken = decrypt(conn.oauth_refresh_token);
// Step 2: MS Token-Endpoint
const body = new URLSearchParams({
grant_type: "refresh_token",
client_id: clientId,
refresh_token: decryptedRefreshToken,
scope: MS_OAUTH_SCOPES,
});
const res = await fetch(MS_TOKEN_ENDPOINT, {
method: "POST",
headers: { "Content-Type": "application/x-www-form-urlencoded" },
body: body.toString(),
});
if (!res.ok) {
const errText = await res.text().catch(() => "unknown");
throw new Error(`MS token refresh failed (${res.status}): ${errText}`);
}
const data = await res.json();
if (!data.access_token) {
throw new Error("MS refresh response missing access_token");
}
const newExpiry = new Date(Date.now() + data.expires_in * 1000);
// MS rotiert Refresh-Tokens — wenn kein neuer kommt, den alten behalten
const newRefreshToken = data.refresh_token ?? decryptedRefreshToken;
const encryptedAccess = encrypt(data.access_token);
const encryptedRefresh = encrypt(newRefreshToken);
// Step 3: Optimistic update — nur wenn oauth_token_expiry noch gleich wie gelesen
// IS NOT DISTINCT FROM deckt NULL=NULL korrekt ab (vs. = das NULL!=NULL behandelt)
const result = await pool.query(
`UPDATE rebreak.mail_connections
SET oauth_access_token = $1,
oauth_refresh_token = $2,
oauth_token_expiry = $3
WHERE id = $4
AND oauth_token_expiry IS NOT DISTINCT FROM $5`,
[encryptedAccess, encryptedRefresh, newExpiry, connectionId, currentExpiry],
);
if (result.rowCount === 0) {
// Step 4: Anderer Prozess hat parallel refresht — lese deren frischen Token
const { rows: fresh } = await pool.query(
`SELECT oauth_access_token FROM rebreak.mail_connections WHERE id = $1`,
[connectionId],
);
if (!fresh[0]?.oauth_access_token) {
throw new Error(
`Concurrent refresh for ${connectionId} detected but no token found`,
);
}
return decrypt(fresh[0].oauth_access_token);
}
return data.access_token;
}
/**
* Refresht Google Access-Token direkt via HTTP.
* Race-Condition-Strategie analog zu refreshAndSaveTokensDaemon (MS).
*
* Google-Spezifik:
* - Google rotiert refresh_tokens NICHT bei normalem Einsatz. Das ausgestellte
* refresh_token ist langlebig. Exceptions: User-Revocation, 6-Monate-Inaktivität.
* - Response enthält KEIN neues refresh_token → immer das bestehende behalten.
* - Kein scope-Parameter nötig beim Refresh (Google ignoriert ihn, MS braucht ihn).
* - auth_method-Filter: WHERE auth_method = 'oauth2_google' für die DB-Abfrage.
*/
async function refreshGoogleTokensDaemon(connectionId, clientId) {
// Step 1: Aktuellen Token-Stand lesen
const { rows } = await pool.query(
`SELECT oauth_refresh_token, oauth_access_token, oauth_token_expiry
FROM rebreak.mail_connections
WHERE id = $1 AND auth_method = 'oauth2_google'`,
[connectionId],
);
const conn = rows[0];
if (!conn?.oauth_refresh_token) {
throw new Error(
`Connection ${connectionId} has no oauth_refresh_token (google) — cannot refresh`,
);
}
const currentExpiry = conn.oauth_token_expiry;
const decryptedRefreshToken = decrypt(conn.oauth_refresh_token);
// Step 2: Google Token-Endpoint
const body = new URLSearchParams({
grant_type: "refresh_token",
client_id: clientId,
refresh_token: decryptedRefreshToken,
// kein scope-Parameter — Google ignoriert ihn beim Refresh
});
const res = await fetch("https://oauth2.googleapis.com/token", {
method: "POST",
headers: { "Content-Type": "application/x-www-form-urlencoded" },
body: body.toString(),
});
if (!res.ok) {
const errText = await res.text().catch(() => "unknown");
throw new Error(`Google token refresh failed (${res.status}): ${errText}`);
}
const data = await res.json();
if (!data.access_token) {
throw new Error("Google refresh response missing access_token");
}
const newExpiry = new Date(Date.now() + data.expires_in * 1000);
// Google rotiert NICHT — refresh_token bleibt unverändert
const encryptedAccess = encrypt(data.access_token);
// refresh_token bleibt derselbe → re-encrypt aus dem bestehenden decrypted Wert
const encryptedRefresh = encrypt(decryptedRefreshToken);
// Step 3: Optimistic update
const result = await pool.query(
`UPDATE rebreak.mail_connections
SET oauth_access_token = $1,
oauth_refresh_token = $2,
oauth_token_expiry = $3
WHERE id = $4
AND oauth_token_expiry IS NOT DISTINCT FROM $5`,
[encryptedAccess, encryptedRefresh, newExpiry, connectionId, currentExpiry],
);
if (result.rowCount === 0) {
// Concurrent refresh — lese frischen Token
const { rows: fresh } = await pool.query(
`SELECT oauth_access_token FROM rebreak.mail_connections WHERE id = $1`,
[connectionId],
);
if (!fresh[0]?.oauth_access_token) {
throw new Error(
`Concurrent Google refresh for ${connectionId} detected but no token found`,
);
}
return decrypt(fresh[0].oauth_access_token);
}
return data.access_token;
}
/**
* Markiert eine Connection als "auth_broken" — kein weiterer Retry im Daemon.
* User sieht den Error im Frontend (last_connect_error = 'auth_revoked').
*/
async function markConnectionAuthBroken(connectionId) {
await pool.query(
`UPDATE rebreak.mail_connections
SET last_connect_error = 'auth_revoked',
last_connect_error_at = NOW()
WHERE id = $1`,
[connectionId],
);
}
// ─── Credentials-Resolution ──────────────────────────────────────────────────
/**
* Gibt die IMAP-Credentials für eine Connection zurück.
* auth_method-aware:
*
* app_password → { type: 'password', user, pass }
* oauth2_microsoft → { type: 'xoauth2', user, accessToken } (MS XOAUTH2)
* oauth2_google → { type: 'xoauth2', user, accessToken } (Google XOAUTH2)
*
* Für OAuth: wenn Token in <5min abläuft, wird vor Connect proaktiv refresht.
*
* Der reaktive Refresh-Path (Cold-Path) liegt in runSession():
* AUTHENTICATIONFAILED während laufender Session → provider-spezifischer Refresh
* → neuer ImapFlow-Connect mit frischem Token.
*
* FORCE_REFRESH_FOR_TEST: wenn env IDLE_FORCE_TOKEN_REFRESH=1, immer refreshen.
*/
async function getCredentialsForConnection(conn) {
if (conn.authMethod === "oauth2_microsoft" || conn.authMethod === "oauth2_google") {
const forceRefresh = process.env.IDLE_FORCE_TOKEN_REFRESH === "1";
const fiveMinFromNow = Date.now() + TOKEN_EXPIRY_THRESHOLD_MS;
const isExpiring =
!conn.oauthTokenExpiry ||
new Date(conn.oauthTokenExpiry).getTime() < fiveMinFromNow;
if (forceRefresh || isExpiring) {
const reason = forceRefresh ? "IDLE_FORCE_TOKEN_REFRESH=1" : "token expiring <5min";
console.log(`[idle/${conn.email}] proactive token refresh (${reason}, method=${conn.authMethod})`);
let accessToken;
if (conn.authMethod === "oauth2_google") {
accessToken = await refreshGoogleTokensDaemon(conn.id, GOOGLE_OAUTH_CLIENT_ID);
} else {
accessToken = await refreshAndSaveTokensDaemon(conn.id, MS_OAUTH_CLIENT_ID);
}
return { type: "xoauth2", user: conn.email, accessToken };
}
// Token noch gültig — direkt entschlüsseln
if (!conn.oauthAccessToken) {
throw new Error(`Connection ${conn.id} has no oauth_access_token (method=${conn.authMethod})`);
}
const accessToken = decrypt(conn.oauthAccessToken);
return { type: "xoauth2", user: conn.email, accessToken };
}
// App-Password-Pfad: iCloud, GMX, Yahoo, Custom-IMAP, Gmail-App-Password
if (!conn.passwordEncrypted) {
throw new Error(`Connection ${conn.id} has no password_encrypted`);
}
const pass = decrypt(conn.passwordEncrypted);
return { type: "password", user: conn.email, pass };
}
// ─── Logging ─────────────────────────────────────────────────────────────────
function log(email, msg) {
// NIEMALS password/accessToken/credentials loggen. email ist safe.
console.log(`[idle/${email}] ${msg}`);
}
function logError(email, msg, err) {
// responseText = IMAP-Serverantwort (z.B. "NO [AUTHENTICATIONFAILED] Invalid credentials")
// Credentials tauchen NICHT in err.responseText auf — ImapFlow legt sie nicht rein.
const errMsg = err?.responseText || err?.message || String(err);
console.error(`[idle/${email}] ${msg}: ${errMsg}`);
}
// ─── Auth-Fehler Detection ───────────────────────────────────────────────────
function isAuthError(err) {
const text = (err?.responseText || err?.message || "").toUpperCase();
return (
text.includes("AUTHENTICATIONFAILED") ||
text.includes("AUTHENTICATE") ||
text.includes("INVALID CREDENTIALS") ||
text.includes("AUTHENTICATION FAILED") ||
// MS-spezifische Fehlercodes: AADSTS = Azure AD Token Service Error
text.includes("AADSTS")
);
}
// ─── Session-Registry ────────────────────────────────────────────────────────
// Map<connectionId, SessionHandle>
const sessions = new Map();
let shuttingDown = false;
// ─── In-Flight-Guard ─────────────────────────────────────────────────────────
// Verhindert gestapelte scan-internal-Aufrufe für dieselbe Connection.
//
// scanInFlight: Map<connId, boolean> — läuft gerade ein Scan?
// coalescePending: Map<connId, boolean> — kam während des laufenden Scans
// ein weiterer Trigger? Wenn ja, einmalig nachholen.
//
// Verhalten:
// - Kein laufender Scan: sofort feuern, inFlight=true setzen
// - Läuft Scan bereits: pending=true setzen (coalesce — nicht stapeln)
// - Nach Scan-Ende: wenn pending=true → einen weiteren Scan starten,
// dann pending=false. Maximal EIN gestapelter Trigger bleibt hängen.
//
// Gilt für NOOP-Tick UND exists-Event (beide rufen triggerScan auf).
const scanInFlight = new Map(); // Map<connId, boolean>
const coalescePending = new Map(); // Map<connId, boolean>
async function triggerScan(conn) {
const connId = conn.id;
if (scanInFlight.get(connId)) {
// Scan läuft bereits — coalescer merkt sich den Wunsch, stapelt nicht
if (!coalescePending.get(connId)) {
log(conn.email, "scan-trigger coalesced (in-flight)");
coalescePending.set(connId, true);
}
return;
}
scanInFlight.set(connId, true);
try {
const res = await fetch(`${BACKEND_URL}/api/mail/scan-internal`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"x-admin-secret": ADMIN_SECRET,
},
body: JSON.stringify({ userId: conn.userId }),
});
if (!res.ok) {
log(conn.email, `scan-trigger HTTP ${res.status}`);
} else {
const data = await res.json().catch(() => ({}));
log(
conn.email,
`scan-triggered → scanned=${data.scanned ?? "?"} blocked=${data.blocked ?? "?"}`,
);
}
} catch (err) {
logError(conn.email, "scan-trigger failed", err);
} finally {
scanInFlight.set(connId, false);
// Coalesced Trigger nachholen: einmalig, fire-and-forget
if (coalescePending.get(connId)) {
coalescePending.set(connId, false);
log(conn.email, "scan-trigger coalesced fire (post-flight)");
triggerScan(conn).catch(() => {});
}
}
}
// ─── IDLE-Session ─────────────────────────────────────────────────────────────
/**
* Startet eine einzelne IDLE-Session für eine MailConnection.
* Reconnect-Loop läuft intern — diese Funktion returned nie (bis shutdown).
*
* Auth-Retry-Loop (OAuth-spezifisch):
* Bei AUTHENTICATIONFAILED → Token refreshen → neu verbinden.
* Max MAX_AUTH_RETRIES (3) mal. Dann: markConnectionAuthBroken + exit.
* authRetries wird nach jedem erfolgreichen Connect resettet.
*
* Consent-Gate:
* conn.consentAt === null → IDLE-Verbindung wird gehalten (keep-alive),
* aber triggerScan() wird NICHT aufgerufen. exists-Events werden still ignoriert.
* Sobald der User consent erteilt (DB-Refresh-Cycle nach max. 5min),
* wird die Connection neu gestartet mit consentAt gesetzt.
*/
async function runSession(conn) {
let attempt = 0;
let authRetries = 0;
while (!shuttingDown) {
// Credentials holen (proaktiver Token-Refresh wenn nötig)
let creds;
try {
creds = await getCredentialsForConnection(conn);
} catch (err) {
logError(conn.email, "credential resolution failed — session aborted", err);
// Kein retry: kaputte Credentials oder Token-Refresh fehlgeschlagen
// (z.B. Refresh-Token revoked). Als auth_broken markieren wenn OAuth.
if (conn.authMethod === "oauth2_microsoft") {
await markConnectionAuthBroken(conn.id).catch(() => {});
} else {
await updateConnectionError(conn.id, err?.message || String(err)).catch(() => {});
}
return;
}
const useImplicitTls = !conn.useStarttls;
const imap = new ImapFlow({
host: conn.imapHost,
port: conn.imapPort,
secure: useImplicitTls,
...(conn.useStarttls ? { requireTLS: true } : {}),
// ImapFlow 1.2.18 unterstützt XOAUTH2 nativ:
// { user, accessToken } → AUTHENTICATE XOAUTH2 <base64-token>
// { user, pass } → LOGIN oder PLAIN (je nach Server-Capability)
auth: creds.type === "xoauth2"
? { user: creds.user, accessToken: creds.accessToken }
: { user: creds.user, pass: creds.pass },
logger: false,
tls: { rejectUnauthorized: conn.rejectUnauthorized ?? true },
// outlook.office365.com: disableCompression verhindert edge-cases bei
// partial reads nach Reconnect. Gilt für oauth2_microsoft Connections.
disableCompression: conn.imapHost.includes("office365"),
});
// Referenz ablegen damit shutdown() darauf zugreifen kann
const handle = sessions.get(conn.id);
if (handle) handle.imap = imap;
try {
await imap.connect();
log(conn.email, `connected (${conn.imapHost}:${conn.imapPort}, auth=${creds.type})`);
attempt = 0; // Reset nach erfolgreicher Verbindung
authRetries = 0; // Auth-Retry-Counter ebenfalls reset
// Initial-Heartbeat: sofort nach erfolgreichem Connect schreiben damit
// das Frontend "aktiv" zeigt statt bis zum ersten NOOP-Cycle zu warten
// (NOOP-Cycle = alle 2min → worst-case 2min+, gemessene delay 2-9min).
// Gilt für alle auth-Methoden (app_password + oauth2_microsoft) und
// auch für den Re-Connect nach AUTHENTICATIONFAILED-Recovery, da beide
// Paths durch diesen Block laufen.
// last_connect_error wird gleichzeitig geclearet: ein zuvor failed-State
// (z.B. abgelaufener Token) ist nach erfolgreichem Connect behoben.
await Promise.all([
updateIdleHeartbeat(conn.id).catch(() => {}),
clearConnectionError(conn.id).catch(() => {}),
]);
// Initial-Sweep: einmalig nach erfolgreichem Connect scan-internal anstoßen.
// Damit werden bestehende Gambling-Mails in allen Folders sofort gelöscht,
// statt auf das erste exists-Event zu warten (das nur bei neuen Mails kommt).
// scan-internal baut eine eigene IMAP-Connection auf → kein Lock-Konflikt.
// Consent-Gate sitzt in scan-internal selbst → kein doppeltes Check hier.
// fire-and-forget: Fehler werden intern geloggt, Session läuft weiter.
triggerScan(conn).catch(() => {});
// Outlook/XOAUTH2 hat den Edge-Case dass getMailboxLock lautlos hängt
// wenn der Server in einen ungültigen Zustand kommt — die Session
// bleibt offen ohne Fortschritt bis der Renew-Timer (10min) ein
// imap.close() schickt. Timeout-wrap macht das Failure-Mode explizit
// → Auth-Retry-Loop greift sauber.
await Promise.race([
imap.getMailboxLock("INBOX"),
new Promise((_, reject) =>
setTimeout(() => reject(new Error("getMailboxLock timeout (30s)")), 30_000),
),
]);
// Consent-Gate-Log: einmalig beim Connect — nur wenn consentAt fehlt
if (!conn.consentAt) {
log(
conn.email,
"consent_at=NULL — IDLE session held but scan/delete suspended. " +
"Re-consent required via app.",
);
}
// IDLE-Loop: alle IDLE_RENEW_INTERVAL_MS erneuern
while (!shuttingDown && sessions.has(conn.id)) {
const idlePromise = new Promise((resolve, reject) => {
imap.idle().then(resolve).catch(reject);
});
// exists-event → sofort scannen (nur wenn consent erteilt)
const onExists = () => {
if (!conn.consentAt) {
// Consent fehlt: exists-event ignorieren, Mail bleibt in Inbox.
// UI zeigt Re-Consent-Modal (via /api/mail/pending-consent Endpoint).
log(conn.email, "exists-event received — skipped (no consent_at)");
return;
}
log(conn.email, "exists-event received (new mail)");
triggerScan(conn); // fire-and-forget
};
imap.on("exists", onExists);
// IDLE nach 10min erneuern
// Gilt für: GMX (~10-15min), Gmail (~29min), iCloud (~29min), outlook.office365.com (~29min)
const renewTimer = setTimeout(() => {
log(conn.email, "idle renewing (10min threshold)");
imap.close();
}, IDLE_RENEW_INTERVAL_MS);
// NOOP-heartbeat alle 2min: silent-drop early-detection (GMX-pattern).
// Zusätzlich: fire-and-forget scan-internal bei jedem Tick (Junk-Folder-Fix).
// Outlook/Hotmail liefert kein INBOX-exists-Event für Mails die direkt in
// "Junk Email" landen — IDLE hört nur INBOX. Der 2min-Tick stellt sicher dass
// auch Junk-Ordner-Mails innerhalb von max. 2min erfasst werden.
// Consent-Gate sitzt in scan-internal → kein doppeltes Check hier nötig.
const noopTimer = setInterval(async () => {
try {
await imap.noop();
await updateIdleHeartbeat(conn.id).catch(() => {});
// Junk-Folder-Sweep: scan-internal scannt alle Ordner inkl. Junk.
// Nur auslösen wenn Consent erteilt (analog exists-Event-Guard).
if (conn.consentAt) {
triggerScan(conn).catch(() => {});
}
} catch (err) {
logError(conn.email, "noop failed — connection dead, force reconnect", err);
imap.close();
}
}, IDLE_NOOP_INTERVAL_MS);
try {
await idlePromise;
} finally {
clearInterval(noopTimer);
clearTimeout(renewTimer);
imap.removeListener("exists", onExists);
}
if (shuttingDown || !sessions.has(conn.id)) break;
await sleep(500);
}
try { await imap.logout(); } catch { /* ignore */ }
return; // Sauberer Exit (shutdown oder Connection entfernt)
} catch (err) {
logError(conn.email, "connection error", err);
try { imap.close(); } catch { /* ignore */ }
// ── AUTHENTICATIONFAILED-Recovery (OAuth-spezifisch) ──────────────────
// Cold-Path: Token zwischen zwei IDLE-Renewals abgelaufen (>1h Session).
// Oder: proaktiver Refresh ist fehlgeschlagen und wir landen hier.
const isOauthConn =
conn.authMethod === "oauth2_microsoft" ||
conn.authMethod === "oauth2_google";
if (isOauthConn && isAuthError(err)) {
authRetries++;
log(
conn.email,
`AUTHENTICATIONFAILED detected — refresh attempt ${authRetries}/${MAX_AUTH_RETRIES} (method=${conn.authMethod})`,
);
if (authRetries > MAX_AUTH_RETRIES) {
// Auth dauerhaft revoked (User hat App-Permission entzogen o.ä.)
// Connection als auth_broken markieren — User muss re-connecten.
log(conn.email, `auth_broken after ${MAX_AUTH_RETRIES} retries — session stopped`);
await markConnectionAuthBroken(conn.id).catch(() => {});
sessions.delete(conn.id);
return;
}
// Token refreshen — direkt hier im catch-Block.
// Provider-spezifisch: Google via refreshGoogleTokensDaemon, MS via refreshAndSaveTokensDaemon.
try {
let freshToken;
if (conn.authMethod === "oauth2_google") {
freshToken = await refreshGoogleTokensDaemon(conn.id, GOOGLE_OAUTH_CLIENT_ID);
} else {
freshToken = await refreshAndSaveTokensDaemon(conn.id, MS_OAUTH_CLIENT_ID);
}
// conn inline patchen damit getCredentialsForConnection() beim nächsten
// Loop-Durchlauf den frischen Token nutzt ohne erneuten DB-Read.
conn.oauthAccessToken = encrypt(freshToken);
conn.oauthTokenExpiry = new Date(Date.now() + 55 * 60 * 1000); // ~55min buffer
log(conn.email, `token refreshed (${conn.authMethod}) — reconnecting immediately`);
// Kein normaler Backoff nach Auth-Refresh — sofort neu verbinden.
continue;
} catch (refreshErr) {
logError(conn.email, "token refresh failed after AUTHENTICATIONFAILED", refreshErr);
// Refresh selbst gescheitert → normaler Backoff
// authRetries bleibt erhöht — beim nächsten Auth-Fehler zählt es weiter.
await updateConnectionError(conn.id, refreshErr?.message || String(refreshErr)).catch(() => {});
}
} else {
// Nicht-Auth-Fehler (Netz, TLS, etc.) — normal in DB schreiben
const errText = err?.responseText || err?.message || String(err);
await updateConnectionError(conn.id, errText).catch(() => {});
}
}
if (shuttingDown || !sessions.has(conn.id)) return;
// Exponential backoff
const delay =
attempt < RECONNECT_DELAYS_MS.length
? RECONNECT_DELAYS_MS[attempt]
: RECONNECT_LOOP_DELAY_MS;
attempt++;
log(conn.email, `reconnecting in ${delay / 1000}s (attempt ${attempt})`);
await sleep(delay);
}
}
// ─── Session-Management ───────────────────────────────────────────────────────
function startSession(conn) {
if (sessions.has(conn.id)) return; // bereits aktiv
log(conn.email, "session starting");
const handle = { conn, imap: null, promise: null };
sessions.set(conn.id, handle);
handle.promise = runSession(conn).catch((err) => {
logError(conn.email, "session crashed (unhandled)", err);
sessions.delete(conn.id);
});
}
async function stopSession(connectionId, email) {
const handle = sessions.get(connectionId);
if (!handle) return;
log(email ?? connectionId, "session stopping");
sessions.delete(connectionId);
if (handle.imap) {
try { handle.imap.close(); } catch { /* ignore */ }
}
await handle.promise.catch(() => {});
}
// ─── DB-Refresh-Loop ──────────────────────────────────────────────────────────
async function refreshConnections() {
let rows;
try {
rows = await loadActiveConnections();
} catch (err) {
console.error("[idle/db] loadActiveConnections failed:", err.message);
return;
}
const activeIds = new Set(rows.map((r) => r.id));
// Neue Connections starten
for (const row of rows) {
if (!sessions.has(row.id)) {
startSession(row);
}
}
// Entfernte Connections stoppen
for (const [id, handle] of sessions.entries()) {
if (!activeIds.has(id)) {
await stopSession(id, handle.conn?.email);
}
}
// Consent-Status für laufende Sessions aktualisieren.
// Wenn ein User consent erteilt hat seit dem letzten DB-Refresh:
// Die laufende Session bekommt conn.consentAt gesetzt — next exists-event
// löst dann sofort triggerScan() aus. Kein Restart nötig.
for (const row of rows) {
const handle = sessions.get(row.id);
if (handle && handle.conn.consentAt !== row.consentAt) {
if (!handle.conn.consentAt && row.consentAt) {
log(row.email, "consent_at received — scan/delete now active");
}
handle.conn.consentAt = row.consentAt;
}
}
const consentPending = rows.filter((r) => !r.consentAt).length;
console.log(
`[idle/db] refreshed — ${activeIds.size} active connections, ${sessions.size} sessions` +
(consentPending > 0 ? `, ${consentPending} pending consent` : ""),
);
}
// ─── Graceful Shutdown ────────────────────────────────────────────────────────
async function shutdown(signal) {
console.log(`[idle] received ${signal} — shutting down gracefully`);
shuttingDown = true;
const stopPromises = [];
for (const [id, handle] of sessions.entries()) {
stopPromises.push(stopSession(id, handle.conn?.email));
}
await Promise.allSettled(stopPromises);
await pool.end().catch(() => {});
console.log("[idle] shutdown complete");
process.exit(0);
}
process.on("SIGTERM", () => shutdown("SIGTERM"));
process.on("SIGINT", () => shutdown("SIGINT"));
// ─── Startup ──────────────────────────────────────────────────────────────────
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function assertEnv() {
const missing = [];
if (!process.env.DATABASE_URL) missing.push("DATABASE_URL");
if (!ADMIN_SECRET) missing.push("ADMIN_SECRET / NUXT_ADMIN_SECRET");
if (!MS_OAUTH_CLIENT_ID) missing.push("MS_OAUTH_CLIENT_ID");
if (!GOOGLE_OAUTH_CLIENT_ID) missing.push("GOOGLE_OAUTH_CLIENT_ID");
if (missing.length > 0) {
console.error(
`[idle] FEHLER: fehlende Env-Vars: ${missing.join(", ")}. Daemon startet nicht.`,
);
process.exit(1);
}
}
async function main() {
assertEnv();
console.log(
`[idle] starting — backend=${BACKEND_URL} env=${process.env.NODE_ENV ?? "unknown"}`,
);
await refreshConnections();
setInterval(() => {
if (!shuttingDown) refreshConnections();
}, DB_REFRESH_INTERVAL_MS);
}
main().catch((err) => {
console.error("[idle] fatal startup error:", err);
process.exit(1);
});