fix: relocate cwos-portal decompiled output to correct path; remove nested directory

Former-commit-id: dc30d42a8c55ed8b2382a41dc2434233fbed9930
This commit is contained in:
反编译工作区
2026-04-29 12:09:48 +08:00
parent ea8e492076
commit e8672a3c7b
1759 changed files with 547735 additions and 280 deletions
@@ -1,7 +1,7 @@
from __future__ import annotations
import pytest
from parity.catalog_loader import endpoint_body as cb_body
from parity.body_variants import iter_parity_request_cases
from parity.catalog_loader import include_in_parity as cb_parity
from parity.catalog_loader import load as load_catalog
from parity.client import call_both
@@ -15,37 +15,44 @@ def test_parity_from_catalog(
base_new,
session_http,
):
"""按 api_catalog 双端对拍(仅 ``include_in_parity`` 为 true 的条目)。compare_mode: deep | code_only | status_only"""
"""按 api_catalog 双端对拍(仅 ``include_in_parity`` 为 true 的条目)。
基线体 + 可选 ``boundary_patches``(需 ``ELEVATOR_PARITY_BOUNDARY=1``)对入参做边界/范围扩展。
compare_mode: deep | code_only | status_only
"""
cat = load_catalog()["endpoints"] # type: ignore[operator]
for ep in cat:
if not cb_parity(ep):
continue
name = ep["id"]
body = cb_body(ep)
pr = call_both(
name=ep.get("name", name),
method=ep.get("method", "POST"),
path=ep["path"],
body=body,
base_old=base_old,
base_new=base_new,
session=session_http,
compare_mode=ep.get("compare_mode", "code_only"),
)
row = {
"id": name,
"name": ep.get("name", name),
"method": pr.method,
"path": pr.path,
"old_status": pr.old_status,
"new_status": pr.new_status,
"match": pr.match,
"message": pr.message,
}
request.config._parity_rows.append(row) # type: ignore
# 在 code_only 下通常允许两边同一业务 code;若都非 200 也记录
assert pr.match, (
f"{name} mismatch: {pr.message}\n"
f"old_http={pr.old_status} new_http={pr.new_status}\n"
f"old_head={pr.old_text[:500]!r}\nnew_head={pr.new_text[:500]!r}"
)
display = ep.get("name", name)
for case_id, body in iter_parity_request_cases(ep):
case_label = f"{name}:{case_id}" if case_id != "default" else name
call_name = f"{display} [{case_id}]" if case_id != "default" else display
pr = call_both(
name=call_name,
method=ep.get("method", "POST"),
path=ep["path"],
body=body,
base_old=base_old,
base_new=base_new,
session=session_http,
compare_mode=ep.get("compare_mode", "code_only"),
)
row = {
"id": case_label,
"name": call_name,
"case_id": case_id,
"method": pr.method,
"path": pr.path,
"old_status": pr.old_status,
"new_status": pr.new_status,
"match": pr.match,
"message": pr.message,
}
request.config._parity_rows.append(row) # type: ignore
assert pr.match, (
f"{case_label} mismatch: {pr.message}\n"
f"old_http={pr.old_status} new_http={pr.new_status}\n"
f"old_head={pr.old_text[:500]!r}\nnew_head={pr.new_text[:500]!r}"
)
@@ -0,0 +1,58 @@
from __future__ import annotations
"""全 catalog 双端对拍(仅 include_in_parity=true 的条目,与 test_parity_from_catalog 范围一致,但每接口独立用例)。"""
import pytest
from parity.catalog_loader import endpoint_body as cb_body
from parity.catalog_loader import include_in_parity as cb_parity
from parity.catalog_loader import load as load_catalog
from parity.client import call_both
def pytest_generate_tests(metafunc):
if "catalog_ep" not in metafunc.fixturenames:
return
cat_all = list((load_catalog() or {}).get("endpoints") or [])
# 与 test_parity_from_catalog 对齐:全 catalog 用例同样尊重 include_in_parity
cat = [ep for ep in cat_all if cb_parity(ep)]
ids = [str(ep.get("id", i)) for i, ep in enumerate(cat)]
metafunc.parametrize("catalog_ep", cat, ids=ids)
@pytest.mark.usefixtures("two_instances_ready")
@pytest.mark.live
def test_parity_single_endpoint(
request,
base_old,
base_new,
session_http,
catalog_ep: dict,
):
name = catalog_ep["id"]
body = cb_body(catalog_ep)
pr = call_both(
name=catalog_ep.get("name", name),
method=catalog_ep.get("method", "POST"),
path=catalog_ep["path"],
body=body,
base_old=base_old,
base_new=base_new,
session=session_http,
compare_mode=catalog_ep.get("compare_mode", "code_only"),
)
row = {
"id": name,
"name": catalog_ep.get("name", name),
"method": pr.method,
"path": pr.path,
"old_status": pr.old_status,
"new_status": pr.new_status,
"match": pr.match,
"message": pr.message,
}
request.config._parity_full_rows.append(row) # type: ignore
assert pr.match, (
f"{name} mismatch: {pr.message}\n"
f"old_http={pr.old_status} new_http={pr.new_status}\n"
f"old_head={pr.old_text[:500]!r}\nnew_head={pr.new_text[:500]!r}"
)
@@ -3,6 +3,7 @@
from __future__ import annotations
from parity import compare as C
from parity import body_variants as BV
def test_normalize_equal():
@@ -15,3 +16,26 @@ def test_business_code_from_cloudwalk_result():
j = '{"code":"0","data":{}}'
p = C.parse_json_loose(j)
assert C.business_code(p) == "0"
def test_deep_merge_nested():
base = {"a": 1, "nested": {"x": 1, "y": 2}}
patch = {"nested": {"y": 99}, "b": 2}
m = BV.deep_merge(base, patch)
assert m == {"a": 1, "b": 2, "nested": {"x": 1, "y": 99}}
def test_iter_parity_request_cases_respects_env(monkeypatch):
monkeypatch.delenv("ELEVATOR_PARITY_BOUNDARY", raising=False)
ep = {
"id": "x",
"body": {"p": 1},
"boundary_patches": [{"id": "zero", "patch": {"p": 0}}],
}
cases = list(BV.iter_parity_request_cases(ep))
assert cases == [("default", {"p": 1})]
monkeypatch.setenv("ELEVATOR_PARITY_BOUNDARY", "1")
cases2 = list(BV.iter_parity_request_cases(ep))
assert cases2[0] == ("default", {"p": 1})
assert cases2[1] == ("zero", {"p": 0})