#!/usr/bin/env python3 """ adguard-handshake-watcher ========================= Tails AdGuard Home's querylog.json (rotated) and fires a POST to the rebreak backend's /api/devices/protected/handshake endpoint whenever a DNS query contains a non-empty ClientID (= dnsToken of a protected device). Environment variables (required / optional): HANDSHAKE_SECRET — shared secret, sent as x-handshake-secret header (required; provisioned via Infisical, see RUNBOOK) BACKEND_URL — base URL of the backend, no trailing slash default: https://staging.rebreak.org QUERYLOG_PATH — path to AdGuard's querylog.json default: /opt/adguardhome/data/querylog.json Per-token in-memory cooldown: 60 seconds. Only one POST is fired per token per minute even if the browser hammers DoH at 10+ req/s. This keeps backend write-pressure negligible. Log-rotation safety: AdGuard rotates querylog.json by renaming and creating a new file. The watcher detects EOF + inode change and re-opens the new file. Polling interval: 1 second. """ import json import logging import os import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Optional import urllib.request import urllib.error # ── Logging ────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%dT%H:%M:%SZ", ) log = logging.getLogger("handshake-watcher") # ── Config ─────────────────────────────────────────────────────────────────── HANDSHAKE_SECRET: str = os.environ.get("HANDSHAKE_SECRET", "") BACKEND_URL: str = os.environ.get("BACKEND_URL", "https://staging.rebreak.org").rstrip("/") QUERYLOG_PATH: str = os.environ.get("QUERYLOG_PATH", "/opt/adguardhome/data/querylog.json") COOLDOWN_SECONDS: int = 60 # minimum gap between two POSTs for the same token POLL_INTERVAL: float = 1.0 # seconds between file-tail polls if not HANDSHAKE_SECRET: log.error("HANDSHAKE_SECRET env var is not set — cannot authenticate to backend. Exiting.") sys.exit(1) # ── State ──────────────────────────────────────────────────────────────────── # token -> unix timestamp of last successful POST _last_fired: dict[str, float] = {} def _cooldown_ok(token: str) -> bool: """Returns True if we have not fired for this token within the cooldown window.""" last = _last_fired.get(token) if last is None: return True return (time.monotonic() - last) >= COOLDOWN_SECONDS def _mark_fired(token: str) -> None: _last_fired[token] = time.monotonic() # ── HTTP ───────────────────────────────────────────────────────────────────── def post_handshake(token: str) -> None: """ POST /api/devices/protected/handshake Body: { "token": "<32hex>" } Header: x-handshake-secret: Handles gracefully: 401 — secret wrong (log error, do not crash) 404 — token unknown (log warning, mark fired to avoid spam) 5xx — backend error (log error, do NOT mark fired so we retry next poll) """ url = f"{BACKEND_URL}/api/devices/protected/handshake" payload = json.dumps({"token": token}).encode("utf-8") req = urllib.request.Request( url, data=payload, method="POST", headers={ "Content-Type": "application/json", "x-handshake-secret": HANDSHAKE_SECRET, }, ) try: with urllib.request.urlopen(req, timeout=10) as resp: body = resp.read().decode("utf-8", errors="replace") data = json.loads(body) if body else {} if data.get("ignored"): log.debug("token=%s: backend ignored (revoked/inactive)", token) else: status_changed = data.get("statusChanged", False) status = data.get("status", "?") if status_changed: log.info("token=%s: status → %s (changed)", token, status) else: log.debug("token=%s: lastDnsQueryAt updated, status=%s", token, status) _mark_fired(token) except urllib.error.HTTPError as exc: body_bytes = exc.read() if exc.fp else b"" body_str = body_bytes.decode("utf-8", errors="replace") if exc.code == 401: log.error( "token=%s: 401 UNAUTHORIZED — HANDSHAKE_SECRET mismatch. " "Check Infisical secret. body=%s", token, body_str, ) # Still mark fired — no point spamming a broken secret every second. _mark_fired(token) elif exc.code == 404: log.warning( "token=%s: 404 TOKEN_NOT_FOUND — token not in DB yet (pending provisioning?). " "Will retry after cooldown. body=%s", token, body_str, ) # Do NOT mark fired — let it retry after normal cooldown expires naturally. # Actually mark fired to avoid per-second hammering on unknown tokens: _mark_fired(token) else: # 5xx or other — log but do NOT mark fired so we retry sooner log.error( "token=%s: HTTP %d from backend. Will retry. body=%s", token, exc.code, body_str, ) except (urllib.error.URLError, OSError, json.JSONDecodeError) as exc: log.error("token=%s: request failed (%s). Will retry.", token, exc) # ── QueryLog parsing ───────────────────────────────────────────────────────── # # AdGuard Home writes querylog.json as a sequence of newline-delimited JSON # objects (NDJSON), one per line. Each object looks like: # # { # "T": "2026-05-15T12:34:56.789Z", // timestamp (ISO8601) # "QH": "example.com", // queried hostname # "QT": "A", // query type # "QC": "IN", // query class # "CP": "abc123def456", // ClientID (the dnsToken, if path-based CID) # "Result": { ... }, # "Elapsed": 123456, # "IP": "127.0.0.1", # ... # } # # The "CP" field ("Client Protocol" / ClientID parameter) is set by AdGuard # when a ClientID is embedded in the DNS-over-HTTPS URL path: # /dns-query/ # # References: # https://adguard-dns.io/kb/general/dns-filtering-syntax/ # AdGuard Home source: querylog/querylog_file.go, dnsforward/client_id.go # # NOTE: field name is "CP" in AdGuard Home's querylog JSON serialization # (as of AdGuard Home v0.107.x). If the field appears empty or absent, # double-check by tailing the actual querylog after a test query: # docker exec adguardhome tail -f /opt/adguardhome/data/querylog.json # and doing: curl -s https://dns.rebreak.org/dns-query/TESTTOKEN -H "accept: application/dns-json" "?name=example.com&type=A" def extract_client_id(line: str) -> Optional[str]: """ Parse one NDJSON line from querylog.json. Returns the ClientID string if non-empty, else None. """ line = line.strip() if not line: return None try: entry = json.loads(line) except json.JSONDecodeError: return None cid = entry.get("CP", "") if isinstance(cid, str) and cid.strip(): return cid.strip() return None # ── File tailing with rotation detection ───────────────────────────────────── class RotationSafeTailer: """ Tails a file line-by-line. Detects log rotation by monitoring inode. On rotation: waits one poll cycle (to let AdGuard finish writing the renamed file), then opens the new file from offset 0. """ def __init__(self, path: str) -> None: self.path = Path(path) self._file = None self._inode: Optional[int] = None self._open() def _open(self) -> None: if self._file: try: self._file.close() except OSError: pass self._file = None try: self._file = open(self.path, "r", encoding="utf-8", errors="replace") self._inode = self.path.stat().st_ino # Seek to end on initial open (don't replay old history). # On rotation, we re-open from offset 0 to catch new entries. log.info("Opened querylog: %s (inode=%d)", self.path, self._inode) except FileNotFoundError: log.warning("querylog not found: %s — will retry", self.path) self._file = None self._inode = None def _seek_to_end_on_first_open(self) -> None: """Call once after initial _open() to skip historical entries.""" if self._file: self._file.seek(0, 2) # SEEK_END def readline(self) -> Optional[str]: """ Returns the next line or None if no new data. Handles rotation transparently. """ if self._file is None: self._open() return None line = self._file.readline() if line: return line # EOF — check for rotation try: current_inode = self.path.stat().st_ino except FileNotFoundError: log.warning("querylog disappeared (rotation in progress?) — will re-open") self._open() return None if current_inode != self._inode: log.info("querylog rotation detected (inode %d -> %d) — re-opening", self._inode, current_inode) self._open() # Don't seek to end on rotation — read from beginning to catch # any entries written right after rotation. return None # ── Main loop ───────────────────────────────────────────────────────────────── def main() -> None: log.info( "Starting handshake-watcher | backend=%s | querylog=%s | cooldown=%ds", BACKEND_URL, QUERYLOG_PATH, COOLDOWN_SECONDS, ) tailer = RotationSafeTailer(QUERYLOG_PATH) tailer._seek_to_end_on_first_open() while True: line = tailer.readline() if line: token = extract_client_id(line) if token and _cooldown_ok(token): log.info("Firing handshake for token=%s", token) post_handshake(token) else: time.sleep(POLL_INTERVAL) if __name__ == "__main__": main()