Initial commit: reorganized source tree

- backend/: 13 Maven modules (cw-elevator-application, cloudwalk-cloud, intelligent-cwoscomponent, ninca-crk, etc.)
- frontend/: 4 Vue projects (elevator-front, cwos-portal, alarm-front, front_acs) + decompiled + scripts
- scripts/: build, test-env, tools (Docker Compose, service templates, API parity)
- docs/: AGENTS.md, superpowers specs, architecture docs
- .gitignore: standard Java/Maven exclusions

Moved from legacy maven-*/ root layout to backend/ organized structure.
This commit is contained in:
hpd840321
2026-05-09 09:00:12 +08:00
commit 7b2bd307f1
7260 changed files with 612980 additions and 0 deletions
@@ -0,0 +1 @@
"""Parity: HTTP client, health probe, JSON compare for elevator JARs."""
@@ -0,0 +1,64 @@
"""
在基线请求体上叠加 boundary_patches,用于双端对拍的入参边界/范围扩展。
catalog 中可选字段(仅当开启 ELEVATOR_PARITY_BOUNDARY=1 时展开)::
"boundary_patches": [
{ "id": "rows_zero", "description": "可选说明", "patch": { "rowsOfPage": 0 } }
]
patch 与基线 JSON **深度合并**(字典递归;非 dict 叶子由 patch 覆盖)。
"""
from __future__ import annotations
import copy
import os
from typing import Any, Dict, Iterator, List, Tuple
from parity.catalog_loader import endpoint_body
def deep_merge(base: Any, patch: Any) -> Any:
"""深度合并:patch 覆盖 base;两 dict 则递归。"""
if not isinstance(base, dict) or not isinstance(patch, dict):
return copy.deepcopy(patch)
out = copy.deepcopy(base)
for k, v in patch.items():
if k in out and isinstance(out[k], dict) and isinstance(v, dict):
out[k] = deep_merge(out[k], v)
else:
out[k] = copy.deepcopy(v)
return out
def boundary_enabled() -> bool:
return os.environ.get("ELEVATOR_PARITY_BOUNDARY", "0").strip() == "1"
def iter_parity_request_cases(ep: Dict[str, Any]) -> Iterator[Tuple[str, Dict[str, Any]]]:
"""
生成 (case_id, body)。首条恒为 (\"default\", 基线体)。
boundary 开启且条目含 boundary_patches 时,追加合并后的变体。
"""
base = endpoint_body(ep)
yield "default", base
if not boundary_enabled():
return
patches: List[Dict[str, Any]] = ep.get("boundary_patches") or []
if not patches:
return
for bp in patches:
cid = str(bp.get("id") or bp.get("label") or "variant").strip()
if not cid:
cid = "variant"
patch = bp.get("patch")
if patch is None or not isinstance(patch, dict):
continue
merged = deep_merge(base, patch)
if not isinstance(merged, dict):
merged = {}
yield cid, merged
@@ -0,0 +1,42 @@
import json
from pathlib import Path
from typing import Any, Dict, Optional
_ROOT = Path(__file__).resolve().parent.parent
def load() -> dict:
return json.loads((_ROOT / "api_catalog.json").read_text(encoding="utf-8"))
def endpoint_body(ep: Dict[str, Any]) -> Dict[str, Any]:
"""Resolve request JSON: inline ``body`` wins, else load ``fixture`` file."""
if ep.get("body") is not None:
return dict(ep["body"])
fix = ep.get("fixture")
if fix:
p = _ROOT / "fixtures" / fix
return json.loads(p.read_text(encoding="utf-8"))
return {}
def iter_endpoints(catalog: dict, *, tag: Optional[str] = None) -> list[dict]:
"""If ``tag`` is set, only endpoints with ``tags`` containing it (or legacy entries with no tags = all)."""
out: list[dict] = []
for ep in catalog.get("endpoints", []):
tags = ep.get("tags") or []
if tag is None:
out.append(ep)
elif not tags:
out.append(ep)
elif tag in tags:
out.append(ep)
return out
def include_in_parity(ep: dict) -> bool:
return bool(ep.get("include_in_parity", True))
def include_in_smoke(ep: dict) -> bool:
return bool(ep.get("include_in_smoke", True))
@@ -0,0 +1,182 @@
from __future__ import annotations
import json
import os
import time
from dataclasses import dataclass
from typing import Any, Optional
import requests
from . import compare
from .health import probe_healthy
@dataclass
class ParityResult:
name: str
method: str
path: str
old_status: int
new_status: int
old_text: str
new_text: str
match: bool
message: str
def default_headers() -> dict[str, str]:
h: dict[str, str] = {
"Content-Type": "application/json",
}
m = {
"businessid": "ELEVATOR_HEADER_BUSINESSID",
"loginid": "ELEVATOR_HEADER_LOGINID",
"platformuserid": "ELEVATOR_HEADER_PLATFORMUSERID",
"authorization": "ELEVATOR_HEADER_AUTHORIZATION",
"applicationid": "ELEVATOR_HEADER_APPLICATIONID",
}
for hdr, evar in m.items():
v = os.environ.get(evar, "").strip()
if v:
h[hdr] = v
return h
def call_both(
name: str,
method: str,
path: str,
body: Any,
base_old: str,
base_new: str,
extra_headers: Optional[dict] = None,
compare_mode: str = "deep",
session: requests.Session | None = None,
) -> ParityResult:
s = session or requests.Session()
h = {**default_headers(), **(extra_headers or {})}
url_o = base_old.rstrip("/") + path
url_n = base_new.rstrip("/") + path
if body is None:
data = None
else:
data = body if isinstance(body, str) else json.dumps(body, ensure_ascii=False)
m = method.upper()
if m == "GET":
r_old = s.get(url_o, headers=h, timeout=60)
r_new = s.get(url_n, headers=h, timeout=60)
else:
r_old = s.post(url_o, headers=h, data=data, timeout=120)
r_new = s.post(url_n, headers=h, data=data, timeout=120)
ot, nt = r_old.text, r_new.text
st_o, st_n = r_old.status_code, r_new.status_code
msg = ""
if compare_mode == "status_only":
m_ok = st_o == st_n
if not m_ok:
msg = f"http {st_o} vs {st_n}"
elif compare_mode == "code_only":
msg = ""
oj, nj = _safe_json(ot), _safe_json(nt)
c1, c2 = compare.business_code(oj), compare.business_code(nj)
m_ok = st_o == st_n
if m_ok and c1 is not None and c2 is not None:
m_ok = c1 == c2
elif m_ok and c1 is None and c2 is None and st_o == st_n and 400 <= st_o < 600:
m_ok = True
msg = "http status only (no parseable JSON body)"
elif m_ok and (c1 is None) != (c2 is None):
m_ok = False
if not m_ok and not msg:
msg = compare.describe_diff(ot, nt, "code_only")
elif m_ok and not msg:
msg = ""
else:
m_ok = st_o == st_n
oj, nj = _safe_json(ot), _safe_json(nt)
if oj is not None and nj is not None and m_ok:
m_ok = compare.normalize(oj) == compare.normalize(nj)
elif m_ok and (ot or "") != (nt or ""):
m_ok = (ot or "").strip() == (nt or "").strip()
if not m_ok:
msg = compare.describe_diff(ot, nt, "deep") or f"http {st_o}/{st_n} or body mismatch"
return ParityResult(
name=name,
method=m,
path=path,
old_status=st_o,
new_status=st_n,
old_text=ot,
new_text=nt,
match=m_ok,
message=msg,
)
def _safe_json(text: str) -> Any:
if not (text or "").strip():
return None
try:
return compare.parse_json_loose(text)
except Exception:
return None
def call_single(
name: str,
method: str,
path: str,
body: Any,
base_url: str,
session: requests.Session | None = None,
) -> dict[str, Any]:
"""One HTTP call for smoke coverage; returns a flat dict for reporting."""
s = session or requests.Session()
h = default_headers()
url = base_url.rstrip("/") + path
data = (
None
if body is None
else (body if isinstance(body, str) else json.dumps(body, ensure_ascii=False))
)
m = method.upper()
t0 = time.perf_counter()
if m == "GET":
r = s.get(url, headers=h, timeout=120)
else:
r = s.post(url, headers=h, data=data, timeout=120)
elapsed_ms = int((time.perf_counter() - t0) * 1000)
txt = r.text or ""
oj = _safe_json(txt)
bc = compare.business_code(oj) if oj is not None else None
head = txt[:400].replace("\n", " ")
return {
"name": name,
"method": m,
"path": path,
"http_status": r.status_code,
"elapsed_ms": elapsed_ms,
"business_code": bc,
"response_head": head,
"reachable": True,
}
def can_reach_one(base_url: str, s: requests.Session | None = None) -> tuple[bool, str]:
s = s or requests.Session()
_, ok, _ = probe_healthy(base_url, s)
return ok, base_url
def can_reach_both(
base_old: str, base_new: str, s: requests.Session | None = None
) -> tuple[bool, str]:
s = s or requests.Session()
p1, ok1, _ = probe_healthy(base_old, s)
p2, ok2, _ = probe_healthy(base_new, s)
if ok1 and ok2:
return True, f"ok old health path~{p1} new probed"
return False, f"health old ok={ok1} new ok={ok2} (paths like {p1})"
@@ -0,0 +1,94 @@
"""
Compare two JSON response bodies. CloudwalkResult: compare top-level `code` when
both parse as objects; also deep-compare with optional key stripping.
"""
from __future__ import annotations
import copy
import json
from typing import Any, List, Set
# JSONPath-like: tuple of keys in order for nesting
DEFAULT_STRIP_PATHS: Set[tuple] = {
# ("data", "ts"),
}
def _strip(obj: Any, rules: Set[tuple], prefix: tuple) -> Any:
if not isinstance(obj, (dict, list)):
return obj
if isinstance(obj, list):
return [_strip(x, rules, prefix + (i,)) for i, x in enumerate(obj)]
out = copy.deepcopy(obj)
for k, v in list(obj.items()) if isinstance(obj, dict) else []:
p = prefix + (k,)
if p in rules:
if k in out:
del out[k]
continue
if isinstance(v, (dict, list)):
out[k] = _strip(v, rules, p)
return out
def parse_json_loose(text: str) -> Any:
if not (text or "").strip():
return None
return json.loads(text)
def normalize(
data: Any,
strip_paths: Set[tuple] | None = None,
) -> str:
rules = strip_paths if strip_paths is not None else DEFAULT_STRIP_PATHS
s = _strip(data, rules, tuple())
return json.dumps(s, ensure_ascii=False, sort_keys=True, default=str)
def business_code(data: Any) -> str | None:
if isinstance(data, dict) and "code" in data:
c = data.get("code")
return str(c) if c is not None else None
return None
def same_shape(old_text: str, new_text: str) -> bool:
try:
a = parse_json_loose(old_text)
b = parse_json_loose(new_text)
except json.JSONDecodeError:
return False
if type(a) != type(b):
return False
if isinstance(a, dict) and isinstance(b, dict) and a.keys() == b.keys():
return all(type(a[k]) is type(b[k]) for k in a)
if isinstance(a, list) and isinstance(b, list):
if len(a) == len(b) and len(a) == 0:
return True
return False
def describe_diff(
old_text: str, new_text: str, compare_norm: str = "deep"
) -> str:
try:
o = parse_json_loose(old_text)
n = parse_json_loose(new_text)
except json.JSONDecodeError as e:
return f"json parse: {e}"
if compare_norm == "code_only":
c1, c2 = business_code(o), business_code(n)
if c1 == c2:
return ""
return f"business code: old={c1!r} new={c2!r}"
no = normalize(o)
nn = normalize(n)
if no == nn:
return ""
if business_code(o) is not None and business_code(n) is not None and business_code(o) != business_code(
n
):
return f"normalized JSON differs; first code: old={business_code(o)} new={business_code(n)}"
return "normalized JSON differs (see full bodies in test log)"
@@ -0,0 +1,29 @@
"""Probes that work for both Spring Boot 1.5/2.x with common actuator settings."""
import requests
_CANDIDATES = (
"/actuator/health",
"/actuator/health/", # noqa: PIE
"/health",
"/application/health", # very old
)
def probe_healthy(base_url: str, session: requests.Session, timeout: float = 2.0) -> tuple[str, bool, int]:
base = base_url.rstrip("/")
last_status = 0
for path in _CANDIDATES:
try:
r = session.get(base + path, timeout=timeout, allow_redirects=True)
last_status = r.status_code
if r.status_code == 200 and _body_up(r.text):
return path, True, r.status_code
except requests.RequestException:
last_status = -1
return _CANDIDATES[0], False, last_status
def _body_up(text: str) -> bool:
t = (text or "").lower()
return "up" in t or '"status":"ok"' in t or '"status": "ok"' in t