chahinebrini bdd93668ae feat(mail): multi-layer classifier — Brand+Random, Relay-Decoder, Score, Groq + ML-Sampling
Layer 0–4 Klassifikations-Pipeline in mail-classifier.ts:
- Layer 2: Domain-Hard-Block + Relay-Decoder (=domain.tld aus SendGrid/Mailchimp-Bounces)
- Layer 2.5: Brand+Random-Token-Hard-Block (Gambling-Brand-Normalisierung + Random-Token-Detection)
  verhindert LLM-Call für bekannte Gambling-Relayer (Gamblezen, BetandPlay etc.)
- Layer 3: Score 0–100 (TS-Gewichte: Domain-Keywords, Subject-Keywords, Name-Match,
  Geld-Pattern, Urgency, All-Caps, Short-Random-Domain, Brand/Random-Ergänzungen)
- Layer 4: Groq Llama 3.3 70B Borderline-Klassifikation (Score 25–75)
  mit Local-Part-Redaction (DSGVO: nur behalten wenn local-part selbst Keyword enthält)
- Layer 5: MailClassificationSample-Insert nach jeder Klassifikation (ML-Phase 3)

Migrations:
- 20260514_add_mail_blocked_trigger_source: ADD COLUMN trigger_source auf mail_blocked
- 20260514_add_mail_classification_sample: CREATE TABLE mail_classification_samples

50 neue Tests (mail-classifier.test.ts): alle Layer, beide Screenshot-Beispiele (Gamblezen +
BetandPlay) bestätigt als Layer-2.5-Hard-Block ohne LLM-Call, Whitelist, Score, Redaction.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 22:05:35 +02:00

687 lines
21 KiB
TypeScript

import { usePrisma } from "../utils/prisma";
import { encrypt, decrypt } from "../utils/crypto";
import { refreshMicrosoftTokens } from "../utils/ms-oauth";
export async function getMailConnections(userId: string) {
const db = usePrisma();
// isActive=true UND nicht pausiert (pausedAt=null) — pausierte werden vom Cron ausgelassen
return db.mailConnection.findMany({
where: { userId, isActive: true, pausedAt: null },
orderBy: { createdAt: "asc" },
});
}
/** Alle Verbindungen eines Users inkl. pausierten — für Status-Anzeige im Frontend. */
export async function getAllMailConnections(userId: string) {
const db = usePrisma();
return db.mailConnection.findMany({
where: { userId },
orderBy: { createdAt: "asc" },
select: {
id: true,
email: true,
title: true,
provider: true,
providerName: true,
imapHost: true,
authMethod: true,
consentAt: true,
isActive: true,
pausedAt: true,
pausedReason: true,
scanInterval: true,
lastScannedAt: true,
nextScanAt: true,
emailsBlocked: true,
emailsScanned: true,
lastConnectError: true,
createdAt: true,
},
});
}
export async function getAllActiveMailUserIds() {
const db = usePrisma();
const rows = await db.mailConnection.findMany({
where: { isActive: true, nextScanAt: { lte: new Date() } },
select: { userId: true },
distinct: ["userId"],
});
return rows.map((r) => r.userId);
}
export async function countMailConnections(userId: string) {
const db = usePrisma();
// Nur aktive + nicht-pausierte Verbindungen zählen gegen das Limit
return db.mailConnection.count({ where: { userId, isActive: true, pausedAt: null } });
}
export async function upsertMailConnection(data: {
userId: string;
email: string;
provider: string;
providerName: string;
imapHost: string;
imapPort: number;
passwordEncrypted: string;
rejectUnauthorized?: boolean;
useStarttls?: boolean;
}) {
const db = usePrisma();
return db.mailConnection.upsert({
where: { userId_email: { userId: data.userId, email: data.email } },
create: {
...data,
isActive: true,
rejectUnauthorized: data.rejectUnauthorized ?? true,
useStarttls: data.useStarttls ?? false,
},
update: {
providerName: data.providerName,
imapHost: data.imapHost,
imapPort: data.imapPort,
passwordEncrypted: data.passwordEncrypted,
rejectUnauthorized: data.rejectUnauthorized ?? true,
useStarttls: data.useStarttls ?? false,
isActive: true,
// Bei Re-Connect (z.B. neues App-Passwort): alte Error-Spuren clearen,
// damit UI sofort wieder "Live" zeigt — IDLE-daemon übernimmt.
lastConnectError: null,
lastConnectErrorAt: null,
},
});
}
export async function deleteMailConnection(
userId: string,
connectionId: string,
) {
const db = usePrisma();
return db.mailConnection.deleteMany({
where: { id: connectionId, userId },
});
}
export async function deleteAllMailConnections(userId: string) {
const db = usePrisma();
return db.mailConnection.deleteMany({ where: { userId } });
}
export async function updateMailConnectionInterval(
userId: string,
connectionId: string,
interval: number,
) {
const db = usePrisma();
return db.mailConnection.updateMany({
where: { id: connectionId, userId },
data: { scanInterval: interval },
});
}
export async function updateMailConnectionScanStats(
connectionId: string,
scanned: number,
blocked: number,
currentBlocked: number,
currentScanned: number,
scanIntervalHours: number,
) {
const db = usePrisma();
return db.mailConnection.update({
where: { id: connectionId },
data: {
lastScannedAt: new Date(),
emailsBlocked: currentBlocked + blocked,
emailsScanned: currentScanned + scanned,
nextScanAt: new Date(Date.now() + scanIntervalHours * 3_600_000),
},
});
}
export async function getMailBlockedStats(userId: string) {
const db = usePrisma();
const since7d = new Date(Date.now() - 7 * 86_400_000);
return db.mailBlocked.findMany({
where: { userId, createdAt: { gte: since7d } },
select: { createdAt: true },
});
}
export async function isMailAlreadyBlocked(
gmailMessageId: string,
userId: string,
) {
const db = usePrisma();
const existing = await db.mailBlocked.findFirst({
where: { gmailMessageId, userId },
select: { id: true },
});
return !!existing;
}
export async function getAlreadyBlockedUidSet(
uids: string[],
userId: string,
): Promise<Set<string>> {
if (uids.length === 0) return new Set();
const db = usePrisma();
const existing = await db.mailBlocked.findMany({
where: { gmailMessageId: { in: uids }, userId },
select: { gmailMessageId: true },
});
return new Set(existing.map((e) => e.gmailMessageId));
}
export async function insertMailBlocked(
entries: {
userId: string;
connectionId: string;
gmailMessageId: string;
senderEmail: string;
senderName: string | null;
subject: string;
receivedAt: Date;
action: string;
triggerSource?: string | null;
}[],
) {
if (entries.length === 0) return;
const db = usePrisma();
await db.mailBlocked.createMany({ data: entries, skipDuplicates: true });
}
// ─── MailClassificationSample ─────────────────────────────────────────────────
/**
* Schreibt einen Klassifikations-Sample-Eintrag für ML-Phase 3.
* Wird nach JEDER Klassifikation aufgerufen (außer Layer 0 / Already-blocked Skips).
*
* DSGVO: Nur Features, keine Mail-Inhalte (kein Body). Subject + Sender sind
* kurzlebige Detection-Signale, kein narrativer Inhalt. Cascade-Delete bei
* User-Löschung (Art. 17).
*/
export async function insertMailClassificationSample(entry: {
userId: string;
connectionId: string | null;
senderName: string | null;
senderDomain: string | null;
relayDecodedDomain: string | null;
subject: string | null;
// features ist ein Prisma-Json-Feld — InputJsonValue erwartet kein plain Record.
// Wir serialisieren explizit via JSON.parse(JSON.stringify(...)) für TS-Zufriedenheit.
features: Record<string, unknown>;
finalAction: string;
triggerSource: string;
groqIsGambling?: boolean | null;
groqConfidence?: number | null;
groqReason?: string | null;
}) {
const db = usePrisma();
// JSON.parse(JSON.stringify(features)) liefert ein "plain JSON value" das Prisma akzeptiert.
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const featuresJson = JSON.parse(JSON.stringify(entry.features));
await db.mailClassificationSample.create({
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
data: { ...entry, features: featuresJson },
});
}
/**
* Gibt alle MailConnections eines Users zurück bei denen consent_at noch NULL ist.
* Wird vom pending-consent.get.ts Endpoint für den Re-Consent-Modal-Trigger genutzt.
*/
export async function getPendingConsentConnections(
userId: string,
): Promise<{ id: string; email: string }[]> {
const db = usePrisma();
return db.mailConnection.findMany({
where: { userId, consentAt: null },
select: { id: true, email: true },
orderBy: { createdAt: "asc" },
});
}
export async function getImapProxyAccounts(userId: string) {
const db = usePrisma();
return db.imapProxyAccount.findMany({ where: { userId } });
}
export async function upsertImapProxyAccount(data: {
userId: string;
proxyUsername: string;
proxyPassword: string;
connectionId: string;
}) {
const db = usePrisma();
return db.imapProxyAccount.upsert({
where: { connectionId: data.connectionId },
create: data,
update: { proxyPassword: data.proxyPassword },
});
}
export async function deleteOldMailBlocked(userId: string) {
const db = usePrisma();
const cutoff = new Date(Date.now() - 24 * 3_600_000);
return db.mailBlocked.deleteMany({
where: { userId, createdAt: { lt: cutoff } },
});
}
/**
* UPSERT einen Aggregat-Zähler in mail_blocked_stats.
* Wird direkt nach insertMailBlocked pro Connection aufgerufen.
* date ist UTC-Mitternacht des aktuellen Tages.
* Bei Conflict (selber User+Tag+Connection): count += 1.
*/
export async function upsertMailBlockedStat(entry: {
userId: string;
mailConnectionId: string;
provider: string;
providerLabel: string;
count: number;
}) {
const db = usePrisma();
const today = new Date();
today.setUTCHours(0, 0, 0, 0);
return db.mailBlockedStat.upsert({
where: {
userId_date_mailConnectionId: {
userId: entry.userId,
date: today,
mailConnectionId: entry.mailConnectionId,
},
},
create: {
userId: entry.userId,
date: today,
mailConnectionId: entry.mailConnectionId,
provider: entry.provider,
providerLabel: entry.providerLabel,
count: entry.count,
},
update: {
// Neuester Label gewinnt (falls User IMAP-Host gewechselt hat)
provider: entry.provider,
providerLabel: entry.providerLabel,
count: { increment: entry.count },
},
});
}
export async function getMailBlockedPaginated(
userId: string,
page: number,
limit = 20,
providerFilter?: string[],
) {
const db = usePrisma();
const offset = (page - 1) * limit;
// Bei Provider-Filter: JOINen via connectionId → imapHost für Vergleich
const whereBase = providerFilter && providerFilter.length > 0
? { userId, connection: { imapHost: { in: providerFilter } } }
: { userId };
const [results, total] = await Promise.all([
db.mailBlocked.findMany({
where: whereBase,
orderBy: { createdAt: "desc" },
skip: offset,
take: limit,
include: {
connection: {
select: { id: true, email: true, title: true, providerName: true, imapHost: true },
},
},
}),
db.mailBlocked.count({ where: whereBase }),
]);
return { results, total, page, pages: Math.ceil(total / limit) };
}
/** Title einer MailConnection setzen (nullable — reset auf NULL möglich). */
export async function updateMailConnectionTitle(
userId: string,
connectionId: string,
title: string | null,
) {
const db = usePrisma();
const updated = await db.mailConnection.updateMany({
where: { id: connectionId, userId },
data: { title },
});
if (updated.count === 0) return null;
return db.mailConnection.findFirst({
where: { id: connectionId, userId },
select: { id: true, email: true, title: true },
});
}
/**
* Geblockte Mails pro Tag (UTC) für die letzten N Tage — für Bar-Chart.
* Liest aus mail_blocked_stats (permanent, kein 24h-Cleanup).
* Fehlende Tage werden mit count=0 aufgefüllt.
*
* connectionId (optional): filtert auf eine einzelne MailConnection.
* Gehört die connectionId einem fremden User, liefert die WHERE-Klausel
* schlicht 0 Rows → alle Buckets werden mit count=0 aufgefüllt (404-alike).
*/
export async function getBlockedMailsByDay(
userId: string,
days: number,
connectionId?: string,
): Promise<{ date: string; count: number }[]> {
const db = usePrisma();
const since = new Date(Date.now() - days * 86_400_000);
since.setUTCHours(0, 0, 0, 0);
// Aggregiere SUM(count) pro Tag aus der permanenten Stats-Tabelle
const rows = connectionId
? await db.$queryRaw<{ date: string; count: bigint }[]>`
SELECT TO_CHAR("date", 'YYYY-MM-DD') AS date, SUM("count")::bigint AS count
FROM "rebreak"."mail_blocked_stats"
WHERE "user_id" = ${userId}::uuid
AND "date" >= ${since}::date
AND "mail_connection_id" = ${connectionId}::uuid
GROUP BY "date"
ORDER BY "date" ASC
`
: await db.$queryRaw<{ date: string; count: bigint }[]>`
SELECT TO_CHAR("date", 'YYYY-MM-DD') AS date, SUM("count")::bigint AS count
FROM "rebreak"."mail_blocked_stats"
WHERE "user_id" = ${userId}::uuid
AND "date" >= ${since}::date
GROUP BY "date"
ORDER BY "date" ASC
`;
const map: Record<string, number> = {};
for (const row of rows) {
map[row.date] = Number(row.count);
}
// Alle N Tage auffüllen (neueste zuletzt)
return Array.from({ length: days }, (_, i) => {
const d = new Date(Date.now() - (days - 1 - i) * 86_400_000);
const key = d.toISOString().slice(0, 10);
return { date: key, count: map[key] ?? 0 };
});
}
/**
* Anzahl blockierter Mails pro MailConnection — für Half-Donut-Chart.
* Liest aus mail_blocked_stats (permanent).
* Connections ohne blocked emails (stats=0) werden NICHT included.
* Gibt imapHost zurück — resolveProviderMeta() wird im Endpoint aufgerufen.
*/
export async function getBlockedMailsByConnection(userId: string) {
const db = usePrisma();
// SUM(count) pro Connection aus Stats-Tabelle
const rows = await db.mailBlockedStat.groupBy({
by: ["mailConnectionId"],
where: { userId },
_sum: { count: true },
orderBy: { _sum: { count: "desc" } },
});
if (rows.length === 0) return [];
const connectionIds = rows.map((r) => r.mailConnectionId);
const connections = await db.mailConnection.findMany({
where: { id: { in: connectionIds } },
select: { id: true, email: true, title: true, providerName: true, imapHost: true },
});
const connMap = new Map(connections.map((c) => [c.id, c]));
return rows.map((r) => {
const conn = connMap.get(r.mailConnectionId);
return {
connectionId: r.mailConnectionId,
title: conn?.title ?? null,
email: conn?.email ?? "",
providerName: conn?.providerName ?? null,
imapHost: conn?.imapHost ?? "",
count: r._sum.count ?? 0,
};
});
}
// ─── OAuth Pending States ─────────────────────────────────────────────────────
const OAUTH_STATE_TTL_MS = 10 * 60 * 1000; // 10 minutes
/**
* Creates a new OAuth pending state entry for PKCE flow.
* Also garbage-collects expired states for this user (clean-on-write).
*/
export async function createOauthPendingState(params: {
stateId: string;
userId: string;
codeVerifier: string;
email: string | null;
}) {
const db = usePrisma();
// Garbage-collect stale states for this user
const cutoff = new Date(Date.now() - OAUTH_STATE_TTL_MS);
await db.oauthPendingState.deleteMany({
where: { userId: params.userId, createdAt: { lt: cutoff } },
});
return db.oauthPendingState.create({
data: {
stateId: params.stateId,
userId: params.userId,
codeVerifier: params.codeVerifier,
email: params.email,
},
});
}
/**
* Consumes an OAuth pending state: validates it exists + not expired, then deletes it.
* Returns null if not found or expired (caller should 401).
* This is atomic enough for our use-case (state is single-use, mobile client is single-threaded).
*/
export async function consumeOauthPendingState(stateId: string): Promise<{
userId: string;
codeVerifier: string;
email: string | null;
} | null> {
const db = usePrisma();
const entry = await db.oauthPendingState.findUnique({
where: { stateId },
select: { id: true, userId: true, codeVerifier: true, email: true, createdAt: true },
});
if (!entry) return null;
// Check TTL
const age = Date.now() - entry.createdAt.getTime();
if (age > OAUTH_STATE_TTL_MS) {
// Expired — clean up and reject
await db.oauthPendingState.delete({ where: { id: entry.id } }).catch(() => {});
return null;
}
// Consume (delete) — single-use
await db.oauthPendingState.delete({ where: { id: entry.id } }).catch(() => {});
return {
userId: entry.userId,
codeVerifier: entry.codeVerifier,
email: entry.email,
};
}
// ─── OAuth MailConnection Upsert ──────────────────────────────────────────────
/**
* Creates or updates a MailConnection for Microsoft OAuth.
* Uses userId+email as the unique key (same as password-based connections).
* passwordEncrypted is set to "" (empty) — not used for oauth connections.
* authMethod='oauth2_microsoft' is the discriminator.
*/
export async function upsertOauthMicrosoftConnection(params: {
userId: string;
email: string;
encryptedAccessToken: string;
encryptedRefreshToken: string;
tokenExpiry: Date;
scope: string;
}) {
const db = usePrisma();
return db.mailConnection.upsert({
where: { userId_email: { userId: params.userId, email: params.email } },
create: {
userId: params.userId,
email: params.email,
provider: "imap",
providerName: "Outlook",
imapHost: "outlook.office365.com",
imapPort: 993,
passwordEncrypted: "", // not used for oauth
rejectUnauthorized: true,
useStarttls: false,
isActive: true,
authMethod: "oauth2_microsoft",
oauthAccessToken: params.encryptedAccessToken,
oauthRefreshToken: params.encryptedRefreshToken,
oauthTokenExpiry: params.tokenExpiry,
oauthScope: params.scope,
},
update: {
providerName: "Outlook",
imapHost: "outlook.office365.com",
imapPort: 993,
authMethod: "oauth2_microsoft",
oauthAccessToken: params.encryptedAccessToken,
oauthRefreshToken: params.encryptedRefreshToken,
oauthTokenExpiry: params.tokenExpiry,
oauthScope: params.scope,
isActive: true,
// Clear error state from a previous failed connection attempt
lastConnectError: null,
lastConnectErrorAt: null,
},
});
}
// ─── Token Refresh with Race-Condition Protection ─────────────────────────────
/**
* Refreshes the Microsoft OAuth tokens for a given MailConnection and persists them.
*
* Race-Condition strategy (Optimistic Concurrency):
* 1. Read current oauth_token_expiry from DB.
* 2. POST to MS token endpoint to get fresh tokens.
* 3. UPDATE with WHERE oauth_token_expiry = <read value> (optimistic lock).
* 4. If affected_rows = 0: another process refreshed concurrently.
* → Read the freshly stored access_token and return it WITHOUT re-refreshing.
* This avoids a double-refresh loop that would invalidate the new refresh_token.
*
* Returns: decrypted (plaintext) access_token ready for IMAP XOAUTH2 use.
*
* Throws if:
* - Connection not found or not oauth2_microsoft
* - MS token refresh fails (invalid/revoked refresh_token)
*/
export async function refreshAndSaveTokens(
connectionId: string,
clientId: string,
): Promise<string> {
const db = usePrisma();
// Step 1: Read current token state
const conn = await db.mailConnection.findFirst({
where: { id: connectionId, authMethod: "oauth2_microsoft" },
select: {
oauthRefreshToken: true,
oauthAccessToken: true,
oauthTokenExpiry: true,
},
});
if (!conn?.oauthRefreshToken) {
throw new Error(`Connection ${connectionId} has no oauth refresh_token — cannot refresh`);
}
const currentExpiry = conn.oauthTokenExpiry;
const decryptedRefreshToken = decrypt(conn.oauthRefreshToken);
// Step 2: Refresh at MS
const fresh = await refreshMicrosoftTokens({
clientId,
refreshToken: decryptedRefreshToken,
});
const newExpiry = new Date(Date.now() + fresh.expires_in * 1000);
const encryptedNewAccess = encrypt(fresh.access_token);
const encryptedNewRefresh = encrypt(fresh.refresh_token);
// Step 3: Optimistic update — only update if expiry hasn't changed since we read
// Using $executeRaw for the WHERE-with-timestamp comparison (Prisma updateMany
// doesn't support "affected rows" count in a useful way here).
const result = await db.$executeRaw`
UPDATE "rebreak"."mail_connections"
SET
"oauth_access_token" = ${encryptedNewAccess},
"oauth_refresh_token" = ${encryptedNewRefresh},
"oauth_token_expiry" = ${newExpiry}
WHERE
"id" = ${connectionId}::uuid
AND (
"oauth_token_expiry" IS NOT DISTINCT FROM ${currentExpiry}::timestamptz
)
`;
if (result === 0) {
// Step 4: Another process refreshed concurrently — read the fresh token they stored
// and return it. Do NOT refresh again (would invalidate their new refresh_token).
const updated = await db.mailConnection.findFirst({
where: { id: connectionId },
select: { oauthAccessToken: true },
});
if (!updated?.oauthAccessToken) {
throw new Error(`Concurrent refresh detected for ${connectionId} but no token found`);
}
return decrypt(updated.oauthAccessToken);
}
// Normal path: we won the race, return the token we just stored
return fresh.access_token;
}
/**
* Gets the decrypted refresh_token for a MailConnection.
* Used by [id].delete.ts for the revoke flow.
* Returns null if no refresh_token is stored.
*/
export async function getDecryptedRefreshToken(
connectionId: string,
userId: string,
): Promise<string | null> {
const db = usePrisma();
const conn = await db.mailConnection.findFirst({
where: { id: connectionId, userId, authMethod: "oauth2_microsoft" },
select: { oauthRefreshToken: true },
});
if (!conn?.oauthRefreshToken) return null;
try {
return decrypt(conn.oauthRefreshToken);
} catch {
return null;
}
}