import { usePrisma } from "../utils/prisma"; import { encrypt, decrypt } from "../utils/crypto"; import { refreshMicrosoftTokens } from "../utils/ms-oauth"; export async function getMailConnections(userId: string) { const db = usePrisma(); // isActive=true UND nicht pausiert (pausedAt=null) — pausierte werden vom Cron ausgelassen return db.mailConnection.findMany({ where: { userId, isActive: true, pausedAt: null }, orderBy: { createdAt: "asc" }, }); } /** Alle Verbindungen eines Users inkl. pausierten — für Status-Anzeige im Frontend. */ export async function getAllMailConnections(userId: string) { const db = usePrisma(); return db.mailConnection.findMany({ where: { userId }, orderBy: { createdAt: "asc" }, select: { id: true, email: true, title: true, provider: true, providerName: true, imapHost: true, authMethod: true, consentAt: true, isActive: true, pausedAt: true, pausedReason: true, scanInterval: true, lastScannedAt: true, nextScanAt: true, emailsBlocked: true, emailsScanned: true, lastConnectError: true, createdAt: true, }, }); } export async function getAllActiveMailUserIds() { const db = usePrisma(); const rows = await db.mailConnection.findMany({ where: { isActive: true, nextScanAt: { lte: new Date() } }, select: { userId: true }, distinct: ["userId"], }); return rows.map((r) => r.userId); } export async function countMailConnections(userId: string) { const db = usePrisma(); // Nur aktive + nicht-pausierte Verbindungen zählen gegen das Limit return db.mailConnection.count({ where: { userId, isActive: true, pausedAt: null } }); } export async function upsertMailConnection(data: { userId: string; email: string; provider: string; providerName: string; imapHost: string; imapPort: number; passwordEncrypted: string; rejectUnauthorized?: boolean; useStarttls?: boolean; }) { const db = usePrisma(); return db.mailConnection.upsert({ where: { userId_email: { userId: data.userId, email: data.email } }, create: { ...data, isActive: true, rejectUnauthorized: data.rejectUnauthorized ?? true, useStarttls: data.useStarttls ?? false, }, update: { providerName: data.providerName, imapHost: data.imapHost, imapPort: data.imapPort, passwordEncrypted: data.passwordEncrypted, rejectUnauthorized: data.rejectUnauthorized ?? true, useStarttls: data.useStarttls ?? false, isActive: true, // Bei Re-Connect (z.B. neues App-Passwort): alte Error-Spuren clearen, // damit UI sofort wieder "Live" zeigt — IDLE-daemon übernimmt. lastConnectError: null, lastConnectErrorAt: null, }, }); } export async function deleteMailConnection( userId: string, connectionId: string, ) { const db = usePrisma(); return db.mailConnection.deleteMany({ where: { id: connectionId, userId }, }); } export async function deleteAllMailConnections(userId: string) { const db = usePrisma(); return db.mailConnection.deleteMany({ where: { userId } }); } export async function updateMailConnectionInterval( userId: string, connectionId: string, interval: number, ) { const db = usePrisma(); return db.mailConnection.updateMany({ where: { id: connectionId, userId }, data: { scanInterval: interval }, }); } export async function updateMailConnectionScanStats( connectionId: string, scanned: number, blocked: number, currentBlocked: number, currentScanned: number, scanIntervalHours: number, ) { const db = usePrisma(); return db.mailConnection.update({ where: { id: connectionId }, data: { lastScannedAt: new Date(), emailsBlocked: currentBlocked + blocked, emailsScanned: currentScanned + scanned, nextScanAt: new Date(Date.now() + scanIntervalHours * 3_600_000), }, }); } export async function getMailBlockedStats(userId: string) { const db = usePrisma(); const since7d = new Date(Date.now() - 7 * 86_400_000); return db.mailBlocked.findMany({ where: { userId, createdAt: { gte: since7d } }, select: { createdAt: true }, }); } export async function isMailAlreadyBlocked( gmailMessageId: string, userId: string, ) { const db = usePrisma(); const existing = await db.mailBlocked.findFirst({ where: { gmailMessageId, userId }, select: { id: true }, }); return !!existing; } export async function getAlreadyBlockedUidSet( uids: string[], userId: string, ): Promise> { if (uids.length === 0) return new Set(); const db = usePrisma(); const existing = await db.mailBlocked.findMany({ where: { gmailMessageId: { in: uids }, userId }, select: { gmailMessageId: true }, }); return new Set(existing.map((e) => e.gmailMessageId)); } export async function insertMailBlocked( entries: { userId: string; connectionId: string; gmailMessageId: string; senderEmail: string; senderName: string | null; subject: string; receivedAt: Date; action: string; }[], ) { if (entries.length === 0) return; const db = usePrisma(); await db.mailBlocked.createMany({ data: entries, skipDuplicates: true }); } /** * Gibt alle MailConnections eines Users zurück bei denen consent_at noch NULL ist. * Wird vom pending-consent.get.ts Endpoint für den Re-Consent-Modal-Trigger genutzt. */ export async function getPendingConsentConnections( userId: string, ): Promise<{ id: string; email: string }[]> { const db = usePrisma(); return db.mailConnection.findMany({ where: { userId, consentAt: null }, select: { id: true, email: true }, orderBy: { createdAt: "asc" }, }); } export async function getImapProxyAccounts(userId: string) { const db = usePrisma(); return db.imapProxyAccount.findMany({ where: { userId } }); } export async function upsertImapProxyAccount(data: { userId: string; proxyUsername: string; proxyPassword: string; connectionId: string; }) { const db = usePrisma(); return db.imapProxyAccount.upsert({ where: { connectionId: data.connectionId }, create: data, update: { proxyPassword: data.proxyPassword }, }); } export async function deleteOldMailBlocked(userId: string) { const db = usePrisma(); const cutoff = new Date(Date.now() - 24 * 3_600_000); return db.mailBlocked.deleteMany({ where: { userId, createdAt: { lt: cutoff } }, }); } /** * UPSERT einen Aggregat-Zähler in mail_blocked_stats. * Wird direkt nach insertMailBlocked pro Connection aufgerufen. * date ist UTC-Mitternacht des aktuellen Tages. * Bei Conflict (selber User+Tag+Connection): count += 1. */ export async function upsertMailBlockedStat(entry: { userId: string; mailConnectionId: string; provider: string; providerLabel: string; count: number; }) { const db = usePrisma(); const today = new Date(); today.setUTCHours(0, 0, 0, 0); return db.mailBlockedStat.upsert({ where: { userId_date_mailConnectionId: { userId: entry.userId, date: today, mailConnectionId: entry.mailConnectionId, }, }, create: { userId: entry.userId, date: today, mailConnectionId: entry.mailConnectionId, provider: entry.provider, providerLabel: entry.providerLabel, count: entry.count, }, update: { // Neuester Label gewinnt (falls User IMAP-Host gewechselt hat) provider: entry.provider, providerLabel: entry.providerLabel, count: { increment: entry.count }, }, }); } export async function getMailBlockedPaginated( userId: string, page: number, limit = 20, providerFilter?: string[], ) { const db = usePrisma(); const offset = (page - 1) * limit; // Bei Provider-Filter: JOINen via connectionId → imapHost für Vergleich const whereBase = providerFilter && providerFilter.length > 0 ? { userId, connection: { imapHost: { in: providerFilter } } } : { userId }; const [results, total] = await Promise.all([ db.mailBlocked.findMany({ where: whereBase, orderBy: { createdAt: "desc" }, skip: offset, take: limit, include: { connection: { select: { id: true, email: true, title: true, providerName: true, imapHost: true }, }, }, }), db.mailBlocked.count({ where: whereBase }), ]); return { results, total, page, pages: Math.ceil(total / limit) }; } /** Title einer MailConnection setzen (nullable — reset auf NULL möglich). */ export async function updateMailConnectionTitle( userId: string, connectionId: string, title: string | null, ) { const db = usePrisma(); const updated = await db.mailConnection.updateMany({ where: { id: connectionId, userId }, data: { title }, }); if (updated.count === 0) return null; return db.mailConnection.findFirst({ where: { id: connectionId, userId }, select: { id: true, email: true, title: true }, }); } /** * Geblockte Mails pro Tag (UTC) für die letzten N Tage — für Bar-Chart. * Liest aus mail_blocked_stats (permanent, kein 24h-Cleanup). * Fehlende Tage werden mit count=0 aufgefüllt. * * connectionId (optional): filtert auf eine einzelne MailConnection. * Gehört die connectionId einem fremden User, liefert die WHERE-Klausel * schlicht 0 Rows → alle Buckets werden mit count=0 aufgefüllt (404-alike). */ export async function getBlockedMailsByDay( userId: string, days: number, connectionId?: string, ): Promise<{ date: string; count: number }[]> { const db = usePrisma(); const since = new Date(Date.now() - days * 86_400_000); since.setUTCHours(0, 0, 0, 0); // Aggregiere SUM(count) pro Tag aus der permanenten Stats-Tabelle const rows = connectionId ? await db.$queryRaw<{ date: string; count: bigint }[]>` SELECT TO_CHAR("date", 'YYYY-MM-DD') AS date, SUM("count")::bigint AS count FROM "rebreak"."mail_blocked_stats" WHERE "user_id" = ${userId}::uuid AND "date" >= ${since}::date AND "mail_connection_id" = ${connectionId}::uuid GROUP BY "date" ORDER BY "date" ASC ` : await db.$queryRaw<{ date: string; count: bigint }[]>` SELECT TO_CHAR("date", 'YYYY-MM-DD') AS date, SUM("count")::bigint AS count FROM "rebreak"."mail_blocked_stats" WHERE "user_id" = ${userId}::uuid AND "date" >= ${since}::date GROUP BY "date" ORDER BY "date" ASC `; const map: Record = {}; for (const row of rows) { map[row.date] = Number(row.count); } // Alle N Tage auffüllen (neueste zuletzt) return Array.from({ length: days }, (_, i) => { const d = new Date(Date.now() - (days - 1 - i) * 86_400_000); const key = d.toISOString().slice(0, 10); return { date: key, count: map[key] ?? 0 }; }); } /** * Anzahl blockierter Mails pro MailConnection — für Half-Donut-Chart. * Liest aus mail_blocked_stats (permanent). * Connections ohne blocked emails (stats=0) werden NICHT included. * Gibt imapHost zurück — resolveProviderMeta() wird im Endpoint aufgerufen. */ export async function getBlockedMailsByConnection(userId: string) { const db = usePrisma(); // SUM(count) pro Connection aus Stats-Tabelle const rows = await db.mailBlockedStat.groupBy({ by: ["mailConnectionId"], where: { userId }, _sum: { count: true }, orderBy: { _sum: { count: "desc" } }, }); if (rows.length === 0) return []; const connectionIds = rows.map((r) => r.mailConnectionId); const connections = await db.mailConnection.findMany({ where: { id: { in: connectionIds } }, select: { id: true, email: true, title: true, providerName: true, imapHost: true }, }); const connMap = new Map(connections.map((c) => [c.id, c])); return rows.map((r) => { const conn = connMap.get(r.mailConnectionId); return { connectionId: r.mailConnectionId, title: conn?.title ?? null, email: conn?.email ?? "", providerName: conn?.providerName ?? null, imapHost: conn?.imapHost ?? "", count: r._sum.count ?? 0, }; }); } // ─── OAuth Pending States ───────────────────────────────────────────────────── const OAUTH_STATE_TTL_MS = 10 * 60 * 1000; // 10 minutes /** * Creates a new OAuth pending state entry for PKCE flow. * Also garbage-collects expired states for this user (clean-on-write). */ export async function createOauthPendingState(params: { stateId: string; userId: string; codeVerifier: string; email: string | null; }) { const db = usePrisma(); // Garbage-collect stale states for this user const cutoff = new Date(Date.now() - OAUTH_STATE_TTL_MS); await db.oauthPendingState.deleteMany({ where: { userId: params.userId, createdAt: { lt: cutoff } }, }); return db.oauthPendingState.create({ data: { stateId: params.stateId, userId: params.userId, codeVerifier: params.codeVerifier, email: params.email, }, }); } /** * Consumes an OAuth pending state: validates it exists + not expired, then deletes it. * Returns null if not found or expired (caller should 401). * This is atomic enough for our use-case (state is single-use, mobile client is single-threaded). */ export async function consumeOauthPendingState(stateId: string): Promise<{ userId: string; codeVerifier: string; email: string | null; } | null> { const db = usePrisma(); const entry = await db.oauthPendingState.findUnique({ where: { stateId }, select: { id: true, userId: true, codeVerifier: true, email: true, createdAt: true }, }); if (!entry) return null; // Check TTL const age = Date.now() - entry.createdAt.getTime(); if (age > OAUTH_STATE_TTL_MS) { // Expired — clean up and reject await db.oauthPendingState.delete({ where: { id: entry.id } }).catch(() => {}); return null; } // Consume (delete) — single-use await db.oauthPendingState.delete({ where: { id: entry.id } }).catch(() => {}); return { userId: entry.userId, codeVerifier: entry.codeVerifier, email: entry.email, }; } // ─── OAuth MailConnection Upsert ────────────────────────────────────────────── /** * Creates or updates a MailConnection for Microsoft OAuth. * Uses userId+email as the unique key (same as password-based connections). * passwordEncrypted is set to "" (empty) — not used for oauth connections. * authMethod='oauth2_microsoft' is the discriminator. */ export async function upsertOauthMicrosoftConnection(params: { userId: string; email: string; encryptedAccessToken: string; encryptedRefreshToken: string; tokenExpiry: Date; scope: string; }) { const db = usePrisma(); return db.mailConnection.upsert({ where: { userId_email: { userId: params.userId, email: params.email } }, create: { userId: params.userId, email: params.email, provider: "imap", providerName: "Outlook", imapHost: "outlook.office365.com", imapPort: 993, passwordEncrypted: "", // not used for oauth rejectUnauthorized: true, useStarttls: false, isActive: true, authMethod: "oauth2_microsoft", oauthAccessToken: params.encryptedAccessToken, oauthRefreshToken: params.encryptedRefreshToken, oauthTokenExpiry: params.tokenExpiry, oauthScope: params.scope, }, update: { providerName: "Outlook", imapHost: "outlook.office365.com", imapPort: 993, authMethod: "oauth2_microsoft", oauthAccessToken: params.encryptedAccessToken, oauthRefreshToken: params.encryptedRefreshToken, oauthTokenExpiry: params.tokenExpiry, oauthScope: params.scope, isActive: true, // Clear error state from a previous failed connection attempt lastConnectError: null, lastConnectErrorAt: null, }, }); } // ─── Token Refresh with Race-Condition Protection ───────────────────────────── /** * Refreshes the Microsoft OAuth tokens for a given MailConnection and persists them. * * Race-Condition strategy (Optimistic Concurrency): * 1. Read current oauth_token_expiry from DB. * 2. POST to MS token endpoint to get fresh tokens. * 3. UPDATE with WHERE oauth_token_expiry = (optimistic lock). * 4. If affected_rows = 0: another process refreshed concurrently. * → Read the freshly stored access_token and return it WITHOUT re-refreshing. * This avoids a double-refresh loop that would invalidate the new refresh_token. * * Returns: decrypted (plaintext) access_token ready for IMAP XOAUTH2 use. * * Throws if: * - Connection not found or not oauth2_microsoft * - MS token refresh fails (invalid/revoked refresh_token) */ export async function refreshAndSaveTokens( connectionId: string, clientId: string, ): Promise { const db = usePrisma(); // Step 1: Read current token state const conn = await db.mailConnection.findFirst({ where: { id: connectionId, authMethod: "oauth2_microsoft" }, select: { oauthRefreshToken: true, oauthAccessToken: true, oauthTokenExpiry: true, }, }); if (!conn?.oauthRefreshToken) { throw new Error(`Connection ${connectionId} has no oauth refresh_token — cannot refresh`); } const currentExpiry = conn.oauthTokenExpiry; const decryptedRefreshToken = decrypt(conn.oauthRefreshToken); // Step 2: Refresh at MS const fresh = await refreshMicrosoftTokens({ clientId, refreshToken: decryptedRefreshToken, }); const newExpiry = new Date(Date.now() + fresh.expires_in * 1000); const encryptedNewAccess = encrypt(fresh.access_token); const encryptedNewRefresh = encrypt(fresh.refresh_token); // Step 3: Optimistic update — only update if expiry hasn't changed since we read // Using $executeRaw for the WHERE-with-timestamp comparison (Prisma updateMany // doesn't support "affected rows" count in a useful way here). const result = await db.$executeRaw` UPDATE "rebreak"."mail_connections" SET "oauth_access_token" = ${encryptedNewAccess}, "oauth_refresh_token" = ${encryptedNewRefresh}, "oauth_token_expiry" = ${newExpiry} WHERE "id" = ${connectionId}::uuid AND ( "oauth_token_expiry" IS NOT DISTINCT FROM ${currentExpiry}::timestamptz ) `; if (result === 0) { // Step 4: Another process refreshed concurrently — read the fresh token they stored // and return it. Do NOT refresh again (would invalidate their new refresh_token). const updated = await db.mailConnection.findFirst({ where: { id: connectionId }, select: { oauthAccessToken: true }, }); if (!updated?.oauthAccessToken) { throw new Error(`Concurrent refresh detected for ${connectionId} but no token found`); } return decrypt(updated.oauthAccessToken); } // Normal path: we won the race, return the token we just stored return fresh.access_token; } /** * Gets the decrypted refresh_token for a MailConnection. * Used by [id].delete.ts for the revoke flow. * Returns null if no refresh_token is stored. */ export async function getDecryptedRefreshToken( connectionId: string, userId: string, ): Promise { const db = usePrisma(); const conn = await db.mailConnection.findFirst({ where: { id: connectionId, userId, authMethod: "oauth2_microsoft" }, select: { oauthRefreshToken: true }, }); if (!conn?.oauthRefreshToken) return null; try { return decrypt(conn.oauthRefreshToken); } catch { return null; } }