from __future__ import annotations import json import os from dataclasses import dataclass from typing import Any, Optional import requests from . import compare from .health import probe_healthy @dataclass class ParityResult: name: str method: str path: str old_status: int new_status: int old_text: str new_text: str match: bool message: str def default_headers() -> dict[str, str]: h: dict[str, str] = { "Content-Type": "application/json", } m = { "businessid": "ELEVATOR_HEADER_BUSINESSID", "loginid": "ELEVATOR_HEADER_LOGINID", "platformuserid": "ELEVATOR_HEADER_PLATFORMUSERID", "authorization": "ELEVATOR_HEADER_AUTHORIZATION", "applicationid": "ELEVATOR_HEADER_APPLICATIONID", } for hdr, evar in m.items(): v = os.environ.get(evar, "").strip() if v: h[hdr] = v return h def call_both( name: str, method: str, path: str, body: Any, base_old: str, base_new: str, extra_headers: Optional[dict] = None, compare_mode: str = "deep", session: requests.Session | None = None, ) -> ParityResult: s = session or requests.Session() h = {**default_headers(), **(extra_headers or {})} url_o = base_old.rstrip("/") + path url_n = base_new.rstrip("/") + path if body is None: data = None else: data = body if isinstance(body, str) else json.dumps(body, ensure_ascii=False) m = method.upper() if m == "GET": r_old = s.get(url_o, headers=h, timeout=60) r_new = s.get(url_n, headers=h, timeout=60) else: r_old = s.post(url_o, headers=h, data=data, timeout=120) r_new = s.post(url_n, headers=h, data=data, timeout=120) ot, nt = r_old.text, r_new.text st_o, st_n = r_old.status_code, r_new.status_code msg = "" if compare_mode == "status_only": m_ok = st_o == st_n if not m_ok: msg = f"http {st_o} vs {st_n}" elif compare_mode == "code_only": msg = "" oj, nj = _safe_json(ot), _safe_json(nt) c1, c2 = compare.business_code(oj), compare.business_code(nj) m_ok = st_o == st_n if m_ok and c1 is not None and c2 is not None: m_ok = c1 == c2 elif m_ok and c1 is None and c2 is None and st_o == st_n and 400 <= st_o < 600: m_ok = True msg = "http status only (no parseable JSON body)" elif m_ok and (c1 is None) != (c2 is None): m_ok = False if not m_ok and not msg: msg = compare.describe_diff(ot, nt, "code_only") elif m_ok and not msg: msg = "" else: m_ok = st_o == st_n oj, nj = _safe_json(ot), _safe_json(nt) if oj is not None and nj is not None and m_ok: m_ok = compare.normalize(oj) == compare.normalize(nj) elif m_ok and (ot or "") != (nt or ""): m_ok = (ot or "").strip() == (nt or "").strip() if not m_ok: msg = compare.describe_diff(ot, nt, "deep") or f"http {st_o}/{st_n} or body mismatch" return ParityResult( name=name, method=m, path=path, old_status=st_o, new_status=st_n, old_text=ot, new_text=nt, match=m_ok, message=msg, ) def _safe_json(text: str) -> Any: if not (text or "").strip(): return None try: return compare.parse_json_loose(text) except Exception: return None def can_reach_both( base_old: str, base_new: str, s: requests.Session | None = None ) -> tuple[bool, str]: s = s or requests.Session() p1, ok1, _ = probe_healthy(base_old, s) p2, ok2, _ = probe_healthy(base_new, s) if ok1 and ok2: return True, f"ok old health path~{p1} new probed" return False, f"health old ok={ok1} new ok={ok2} (paths like {p1})"