chahinebrini de701677b2 feat(mail): IDLE-daemon for real-time Legend mail-protection
Standalone ESM-daemon that:
- Connects via ImapFlow IDLE to all active Legend mailboxes
- Triggers /api/mail/scan-internal on new-mail events (real-time)
- Auto-renew IDLE every 25min (RFC 3501 limit), exponential-backoff reconnect
- DB-refresh every 5min for new/removed connections

Plus deploy-pipeline:
- GH-Actions artifact-upload + scp to /srv/rebreak/backend/imap-idle/
- npm install --production on server (imapflow + pg)
- pm2 startOrReload via ecosystem.config.js
- start-idle-staging.sh wrapper for Infisical secret-injection

Replaces 30min-cron polling for Legend tier -- Casino-mails now blocked
within seconds, fulfilling Legend tier marketing promise.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 20:48:44 +02:00

354 lines
12 KiB
JavaScript

/**
* rebreak-imap-idle — IMAP IDLE Daemon
*
* Hält pro aktivem MailConnection-Eintrag eine persistente IMAP-IDLE-Session.
* Wenn der Server "EXISTS" meldet (neue Mail), feuert der Daemon sofort
* POST /api/mail/scan-internal gegen das lokale Backend — ohne 30min-Warte.
*
* Env-Vars (via Infisical-Wrapper):
* DATABASE_URL — Postgres-Connection-String
* ADMIN_SECRET — Header-Secret für /api/mail/scan-internal
* ENCRYPTION_KEY — AES-256 Key (gleicher wie im Backend)
* BACKEND_URL — z.B. http://127.0.0.1:3016 (default: 3016)
* NODE_ENV — production / staging
*
* Starten:
* node index.mjs
*
* Prozess-Signale:
* SIGTERM / SIGINT → graceful shutdown (alle IMAP-Sessions schließen)
*/
import { ImapFlow } from "imapflow";
import pg from "pg";
import { createDecipheriv } from "crypto";
// ─── Config ─────────────────────────────────────────────────────────────────
const BACKEND_URL =
process.env.BACKEND_URL ||
(process.env.NODE_ENV === "production"
? "http://127.0.0.1:3015"
: "http://127.0.0.1:3016");
const ADMIN_SECRET =
process.env.NUXT_ADMIN_SECRET || process.env.ADMIN_SECRET || "";
const DB_REFRESH_INTERVAL_MS = 5 * 60 * 1000; // 5 min — neue Connections entdecken
const IDLE_RENEW_INTERVAL_MS = 25 * 60 * 1000; // 25 min — RFC 3501 max = 29min
const RECONNECT_DELAYS_MS = [1000, 5000, 30_000]; // exponential backoff, danach 60s loop
const RECONNECT_LOOP_DELAY_MS = 60 * 1000;
// ─── DB-Pool (direktes pg, kein Prisma — Daemon ist kein Nitro-Kontext) ────
const pool = new pg.Pool({ connectionString: process.env.DATABASE_URL });
async function loadActiveConnections() {
const { rows } = await pool.query(
`SELECT id, "userId", email, "imapHost", "imapPort",
"passwordEncrypted", "rejectUnauthorized", "useStarttls"
FROM rebreak."MailConnection"
WHERE "isActive" = true`,
);
return rows;
}
// ─── Crypto (analog zu server/utils/crypto.ts) ──────────────────────────────
const AES_ALGO = "aes-256-gcm";
const KEY_LENGTH = 32;
function getKey() {
const raw =
process.env.NUXT_ENCRYPTION_KEY || process.env.ENCRYPTION_KEY || "";
if (!raw || raw.length < KEY_LENGTH) {
return Buffer.alloc(
KEY_LENGTH,
raw.padEnd(KEY_LENGTH, "0").slice(0, KEY_LENGTH),
);
}
return Buffer.from(raw.slice(0, KEY_LENGTH), "utf8");
}
function decrypt(stored) {
const parts = stored.split(":");
if (parts.length !== 3) throw new Error("Invalid encrypted format");
const [ivHex, tagHex, dataHex] = parts;
const decipher = createDecipheriv(
AES_ALGO,
getKey(),
Buffer.from(ivHex, "hex"),
);
decipher.setAuthTag(Buffer.from(tagHex, "hex"));
return decipher.update(Buffer.from(dataHex, "hex")) + decipher.final("utf8");
}
// ─── Logging ────────────────────────────────────────────────────────────────
function log(email, msg) {
// NIEMALS password/credentials loggen. email ist safe (kein secret).
console.log(`[idle/${email}] ${msg}`);
}
function logError(email, msg, err) {
const errMsg = err?.message ?? String(err);
// Credentials tauchen nie in err.message auf (ImapFlow maskiert sie nicht,
// aber wir liefern pass nur an ImapFlow — nicht in eigenen log-calls).
console.error(`[idle/${email}] ${msg}: ${errMsg}`);
}
// ─── Session-Registry ────────────────────────────────────────────────────────
// Map<connectionId, SessionHandle>
const sessions = new Map();
let shuttingDown = false;
// ─── Scan-Trigger ────────────────────────────────────────────────────────────
async function triggerScan(conn) {
try {
const res = await fetch(`${BACKEND_URL}/api/mail/scan-internal`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"x-admin-secret": ADMIN_SECRET,
},
body: JSON.stringify({ userId: conn.userId }),
});
if (!res.ok) {
log(conn.email, `scan-trigger HTTP ${res.status}`);
} else {
const data = await res.json().catch(() => ({}));
log(
conn.email,
`scan-triggered → scanned=${data.scanned ?? "?"} blocked=${data.blocked ?? "?"}`,
);
}
} catch (err) {
logError(conn.email, "scan-trigger failed", err);
}
}
// ─── IDLE-Session ────────────────────────────────────────────────────────────
/**
* Startet eine einzelne IDLE-Session für eine MailConnection.
* Reconnect-Loop läuft intern — diese Funktion returned nie (bis shutdown).
*/
async function runSession(conn) {
let attempt = 0;
while (!shuttingDown) {
let password;
try {
password = decrypt(conn.passwordEncrypted);
} catch (err) {
logError(conn.email, "decrypt failed — session aborted", err);
return; // Kein retry bei Decrypt-Fehler (kaputtes Credential)
}
const useImplicitTls = !conn.useStarttls;
const imap = new ImapFlow({
host: conn.imapHost,
port: conn.imapPort,
secure: useImplicitTls,
...(conn.useStarttls ? { requireTLS: true } : {}),
auth: { user: conn.email, pass: password },
logger: false,
tls: { rejectUnauthorized: conn.rejectUnauthorized ?? true },
// Outlook schließt idle-connections aggressiv — disableCompression
// verhindert edge-cases bei partial reads nach reconnect
disableCompression: conn.imapHost.includes("office365"),
});
// Referenz ablegen damit shutdown() darauf zugreifen kann
const handle = sessions.get(conn.id);
if (handle) handle.imap = imap;
try {
await imap.connect();
log(conn.email, `connected (${conn.imapHost}:${conn.imapPort})`);
attempt = 0; // Reset nach erfolgreicher Verbindung
await imap.getMailboxLock("INBOX");
// IDLE-Loop: alle IDLE_RENEW_INTERVAL_MS erneuern
while (!shuttingDown && sessions.has(conn.id)) {
let idleAbort;
const idlePromise = new Promise((resolve, reject) => {
imap
.idle()
.then(resolve)
.catch(reject);
idleAbort = () => {
// ImapFlow.idle() bricht ab wenn die Connection getrennt wird
imap.close();
resolve();
};
});
// exists-event → sofort scannen
const onExists = () => {
log(conn.email, "exists-event received (new mail)");
triggerScan(conn); // fire-and-forget
};
imap.on("exists", onExists);
// IDLE nach 25min erneuern (RFC 3501: Server darf nach 29min droppen)
const renewTimer = setTimeout(() => {
log(conn.email, "idle renewing (25min threshold)");
imap.close(); // Unterbricht idle() → Loop iteriert → reconnect
}, IDLE_RENEW_INTERVAL_MS);
try {
await idlePromise;
} finally {
clearTimeout(renewTimer);
imap.removeListener("exists", onExists);
}
if (shuttingDown || !sessions.has(conn.id)) break;
// Kurze Pause vor Reconnect (idle() returned auch bei normalem timeout)
await sleep(500);
}
try { await imap.logout(); } catch { /* ignore */ }
return; // Sauberer Exit (shutdown oder Connection entfernt)
} catch (err) {
logError(conn.email, "connection error", err);
try { imap.close(); } catch { /* ignore */ }
}
if (shuttingDown || !sessions.has(conn.id)) return;
// Exponential backoff
const delay =
attempt < RECONNECT_DELAYS_MS.length
? RECONNECT_DELAYS_MS[attempt]
: RECONNECT_LOOP_DELAY_MS;
attempt++;
log(conn.email, `reconnecting in ${delay / 1000}s (attempt ${attempt})`);
await sleep(delay);
}
}
// ─── Session-Management ──────────────────────────────────────────────────────
function startSession(conn) {
if (sessions.has(conn.id)) return; // bereits aktiv
log(conn.email, "session starting");
const handle = { conn, imap: null, promise: null };
sessions.set(conn.id, handle);
handle.promise = runSession(conn).catch((err) => {
logError(conn.email, "session crashed (unhandled)", err);
sessions.delete(conn.id);
});
}
async function stopSession(connectionId, email) {
const handle = sessions.get(connectionId);
if (!handle) return;
log(email ?? connectionId, "session stopping");
sessions.delete(connectionId);
if (handle.imap) {
try { handle.imap.close(); } catch { /* ignore */ }
}
await handle.promise.catch(() => {});
}
// ─── DB-Refresh-Loop ─────────────────────────────────────────────────────────
async function refreshConnections() {
let rows;
try {
rows = await loadActiveConnections();
} catch (err) {
console.error("[idle/db] loadActiveConnections failed:", err.message);
return;
}
const activeIds = new Set(rows.map((r) => r.id));
// Neue Connections starten
for (const row of rows) {
if (!sessions.has(row.id)) {
startSession(row);
}
}
// Entfernte Connections stoppen
for (const [id, handle] of sessions.entries()) {
if (!activeIds.has(id)) {
await stopSession(id, handle.conn?.email);
}
}
console.log(
`[idle/db] refreshed — ${activeIds.size} active connections, ${sessions.size} sessions`,
);
}
// ─── Graceful Shutdown ───────────────────────────────────────────────────────
async function shutdown(signal) {
console.log(`[idle] received ${signal} — shutting down gracefully`);
shuttingDown = true;
const stopPromises = [];
for (const [id, handle] of sessions.entries()) {
stopPromises.push(stopSession(id, handle.conn?.email));
}
await Promise.allSettled(stopPromises);
await pool.end().catch(() => {});
console.log("[idle] shutdown complete");
process.exit(0);
}
process.on("SIGTERM", () => shutdown("SIGTERM"));
process.on("SIGINT", () => shutdown("SIGINT"));
// ─── Startup ─────────────────────────────────────────────────────────────────
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function assertEnv() {
const missing = [];
if (!process.env.DATABASE_URL) missing.push("DATABASE_URL");
if (!ADMIN_SECRET) missing.push("ADMIN_SECRET / NUXT_ADMIN_SECRET");
if (missing.length > 0) {
console.error(
`[idle] FEHLER: fehlende Env-Vars: ${missing.join(", ")}. Daemon startet nicht.`,
);
process.exit(1);
}
}
async function main() {
assertEnv();
console.log(
`[idle] starting — backend=${BACKEND_URL} env=${process.env.NODE_ENV ?? "unknown"}`,
);
// Initialer Load
await refreshConnections();
// Periodischer DB-Refresh
setInterval(() => {
if (!shuttingDown) refreshConnections();
}, DB_REFRESH_INTERVAL_MS);
}
main().catch((err) => {
console.error("[idle] fatal startup error:", err);
process.exit(1);
});