feat(elevator): 对齐 V1 lib 的 Davinci/扫描/事件与部署配置

- davinci-manager-storage:FilePart 路径与基址按 V1 JAR(/portal/file、/part/*、GET /download)
- 启动类:扫描 cn.cloudwalk.serial 与 cn.cloudwalk.cwos.client.resource,补 UUIDSerial 与 ApplicationService
- deploy:v1/v2 application 中 cloudwalk.serial.enabled、Kafka 指向 192.168.3.12:9092;deploy/.gitignore 忽略日志
- cloudwalk-common-serial:补充 META-INF/spring.factories(Boot 自动配置)
- 电梯:Session 配置、Davinci Bean、Feign 包、MQTT/Visitor/Zone Feign;部署脚本与 API parity 工具更新
- 文档与根脚本若干;未纳入大体积 jar/zip 与 v1 CFR 对比目录

Made-with: Cursor

Former-commit-id: b76d142d13ebb5c0898de2d9d11bc583876829c2
This commit is contained in:
反编译工作区
2026-04-28 01:02:31 +08:00
parent be7a8e9d89
commit 418c7db202
61 changed files with 2967 additions and 461 deletions
@@ -1,5 +1,6 @@
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from typing import List
@@ -40,3 +41,16 @@ def write_file(
f"\n## 汇总\n- 通过: {ok},不一致: {bad}\n- **上线前请人工与联调/业务确认**。\n"
)
out_path.write_text("".join(lines), encoding="utf-8")
payload = {
"meta": {
"generated_at": datetime.now().isoformat(),
"base_old": base_old,
"base_new": base_new,
"markdown": str(out_path.resolve()),
},
"rows": rows,
"summary": {"match_ok": ok, "match_bad": bad},
}
json_path = out_path.with_suffix(".json")
json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
@@ -0,0 +1,53 @@
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from typing import List
def write_file(
out_path: Path,
base_url: str,
label: str,
rows: List[dict],
) -> None:
lines = [
f"# 电梯应用 API 单机冒烟报告 — {label}\n",
f"- **时间**: {datetime.now().isoformat()}\n",
f"- **Base URL**: {base_url}\n",
"\n## 接口探测\n\n",
"| 用例 | 方法+路径 | HTTP | ms | 业务code | 响应摘要 |\n",
"| ---- | -------- | ---- | -- | -------- | -------- |\n",
]
ok_http = 0
for r in rows:
name = r.get("name", "")
pth = f"{r.get('method', '')} {r.get('path', '')}"
hs = r.get("http_status", "")
ms = r.get("elapsed_ms", "")
bc = r.get("business_code", "") or ""
head = (r.get("response_head", "") or "").replace("|", "\\|")[:180]
if isinstance(hs, int) and 200 <= hs < 300:
ok_http += 1
lines.append(f"| {name} | `{pth}` | {hs} | {ms} | {bc} | {head} |\n")
lines.append(
f"\n## 汇总\n"
f"- 用例数: {len(rows)}HTTP 2xx 数量: {ok_http}\n"
f"- 业务失败(非 0 code)仍可能为**预期**(缺数据/缺 token);本报告仅证明路由可达且返回 Cloudwalk 风格 JSON。\n"
)
out_path.write_text("".join(lines), encoding="utf-8")
payload = {
"meta": {
"generated_at": datetime.now().isoformat(),
"base_url": base_url,
"label": label,
"markdown": str(out_path.resolve()),
},
"rows": rows,
"summary": {"total": len(rows), "http_2xx": ok_http},
}
out_path.with_suffix(".json").write_text(
json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8"
)
@@ -0,0 +1,232 @@
#!/usr/bin/env python3
"""合并电梯 API 测试套件总览:全量清单(catalog)+ 测试结果矩阵(JSON)+ 可选附录(子报告 MD)。"""
from __future__ import annotations
import argparse
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
def _tool_root() -> Path:
return Path(__file__).resolve().parent.parent
def _load_catalog(path: Path) -> List[dict]:
data = json.loads(path.read_text(encoding="utf-8"))
return list(data.get("endpoints") or [])
def _safe_json_load(p: Optional[Path]) -> Optional[dict]:
if not p or not p.is_file():
return None
try:
return json.loads(p.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
def _rows_by_id(rows: Any) -> Dict[str, dict]:
out: Dict[str, dict] = {}
if not isinstance(rows, list):
return out
for r in rows:
if not isinstance(r, dict):
continue
eid = r.get("id")
if not eid:
# 对拍行可能在 id 字段,否则用 path 不可靠
continue
out[str(eid)] = r
return out
def _parity_by_id(rows: Any) -> Dict[str, dict]:
out: Dict[str, dict] = {}
if not isinstance(rows, list):
return out
for r in rows:
if isinstance(r, dict) and r.get("id"):
out[str(r["id"])] = r
return out
def _resolve_json_arg(md_or_json: Optional[str]) -> Optional[Path]:
if not md_or_json:
return None
p = Path(md_or_json)
if p.suffix.lower() == ".json" and p.is_file():
return p
if p.suffix.lower() == ".md":
cand = p.with_suffix(".json")
if cand.is_file():
return cand
return None
def _markdown_table_catalog(endpoints: List[dict]) -> str:
lines = [
"| ID | 名称 | 方法 | Path | 冒烟 | 横向对拍 |\n",
"| ---- | ---- | ---- | ---- | ---- | -------- |\n",
]
for ep in endpoints:
eid = ep.get("id", "")
name = (ep.get("name") or "").replace("|", "\\|")
method = ep.get("method") or "POST"
path = (ep.get("path") or "").replace("|", "\\|")
sm = "" if ep.get("include_in_smoke", True) else ""
py = "" if ep.get("include_in_parity", True) else ""
lines.append(f"| `{eid}` | {name} | {method} | `{path}` | {sm} | {py} |\n")
return "".join(lines)
def _markdown_matrix(
endpoints: List[dict],
smoke_v1: Optional[dict],
smoke_v2: Optional[dict],
parity: Optional[dict],
) -> str:
m1 = _rows_by_id((smoke_v1 or {}).get("rows"))
m2 = _rows_by_id((smoke_v2 or {}).get("rows"))
mp = _parity_by_id((parity or {}).get("rows"))
meta_v1 = (smoke_v1 or {}).get("meta") or {}
meta_v2 = (smoke_v2 or {}).get("meta") or {}
meta_pr = (parity or {}).get("meta") or {}
head = ""
head += (
f"- **V1 冒烟数据来源**: {meta_v1.get('markdown') or meta_v1.get('base_url') or '(无 JSON,未执行或已跳过)'}\n"
)
head += (
f"- **V2 冒烟数据来源**: {meta_v2.get('markdown') or meta_v2.get('base_url') or '(无)'}\n"
)
head += f"- **对拍数据来源**: {meta_pr.get('markdown') or '(无)'}\n\n"
lines = [
head,
"### 测试结果矩阵(按 catalog `id` 对齐)\n\n",
"| catalog id | V1 HTTP | V1 code | V2 HTTP | V2 code | 对拍一致(仅参与对拍的条目) | 备注 |\n",
"| ---------- | ------- | ------- | ------- | ------- | ---------------------------- | ---- |\n",
]
for ep in endpoints:
eid = str(ep.get("id", ""))
include_p = ep.get("include_in_parity", True)
v1 = m1.get(eid)
v2 = m2.get(eid)
pr_row = mp.get(eid) if include_p else None
def cell_smoke(row: Optional[dict]) -> tuple[str, str]:
if not row:
return "", ""
return str(row.get("http_status", "")), str(row.get("business_code") or "")
h1, c1 = cell_smoke(v1)
h2, c2 = cell_smoke(v2)
if not include_p:
par_c = "(不参与)"
elif not parity:
par_c = "—(未执行)"
elif pr_row:
par_c = "**Y**" if pr_row.get("match") else "**N**"
if not pr_row.get("match"):
par_c += " " + (pr_row.get("message") or "")[:60].replace("|", "\\|")
else:
par_c = "—(本次对拍清单无此项)"
remark = ""
if v1 is None and v2 is None:
remark = "冒烟未执行或无该 id 结果"
lines.append(
f"| `{eid}` | {h1} | {c1} | {h2} | {c2} | {par_c} | {remark} |\n"
)
lines.append(
"\n**说明**`code` 为 CloudwalkResult 顶层业务码;HTTP 为传输层状态。"
"对拍列为 **Y** 表示旧/新 HTTP 状态一致且业务 code 一致(`code_only` 模式)。\n"
)
return "".join(lines)
def main() -> None:
ap = argparse.ArgumentParser(description="Generate SUITE markdown with catalog + matrix")
ap.add_argument("--out", required=True, help="Output SUITE-*.md path")
ap.add_argument("--catalog", help="api_catalog.json path")
ap.add_argument("--smoke-v1", help="smoke-v1_*.md 或同名 .json")
ap.add_argument("--smoke-v2", help="smoke-v2_*.md 或同名 .json")
ap.add_argument("--parity", help="parity-*.md 或同名 .json")
ap.add_argument(
"--embed-full",
action="store_true",
help="附录中嵌入子报告 Markdown 全文(较长)",
)
args = ap.parse_args()
root = _tool_root()
catalog_path = Path(args.catalog or (root / "api_catalog.json"))
endpoints = _load_catalog(catalog_path)
js_v1 = _resolve_json_arg(args.smoke_v1)
js_v2 = _resolve_json_arg(args.smoke_v2)
js_pr = _resolve_json_arg(args.parity)
doc_v1 = _safe_json_load(js_v1)
doc_v2 = _safe_json_load(js_v2)
doc_pr = _safe_json_load(js_pr)
out = Path(args.out)
out.parent.mkdir(parents=True, exist_ok=True)
lines: List[str] = [
"# 电梯应用 API 测试套件总览\n\n",
f"- **生成时间**: {datetime.now().isoformat()}\n",
f"- **清单来源**: `{catalog_path.resolve()}`(共 **{len(endpoints)}** 条接口定义)\n\n",
"## 1. 说明\n\n",
"- **全量清单**:第二节,来自 `api_catalog.json`,含是否参与冒烟/对拍。\n",
"- **测试结果矩阵**:第三节,与本次运行生成的 **JSON** 侧车文件对齐(与 `.md` 同名的 `.json`)。"
"若应用未启动导致 pytest **skip**,则无 JSON,矩阵中表现为 **—**。\n",
"- **横向对拍**:仅 `include_in_parity=true` 的条目会写入对拍 JSON 并参与对比。\n\n",
"## 2. 全量接口测试清单(catalog)\n\n",
_markdown_table_catalog(endpoints),
"\n",
"## 3. 测试结果矩阵\n\n",
_markdown_matrix(endpoints, doc_v1, doc_v2, doc_pr),
]
sec = 4
if args.embed_full:
for title, path_s in (
("V1 单机冒烟 Markdown", args.smoke_v1),
("V2 单机冒烟 Markdown", args.smoke_v2),
("横向对拍 Markdown", args.parity),
):
if not path_s:
continue
pp = Path(path_s)
if pp.suffix.lower() != ".md":
pp = pp.with_suffix(".md") if pp.with_suffix(".md").is_file() else pp
if not pp.is_file():
continue
lines.append(f"## {sec}. {title}\n\n")
lines.append(f"源文件: `{pp.resolve()}`\n\n---\n\n")
lines.append(pp.read_text(encoding="utf-8"))
lines.append("\n\n")
sec += 1
lines.append(
f"\n## {sec}. 原始报告路径(便于回放)\n\n"
f"- V1 冒烟: `{args.smoke_v1 or '(未生成)'}`\n"
f"- V2 冒烟: `{args.smoke_v2 or '(未生成)'}`\n"
f"- 对拍: `{args.parity or '(未生成)'}`\n"
f"- 同名 **`.json`** 与 `.md` 一并生成时可自动填充第三节矩阵。\n"
)
out.write_text("".join(lines), encoding="utf-8")
print(out.resolve())
if __name__ == "__main__":
main()