- backend/: 13 Maven modules (cw-elevator-application, cloudwalk-cloud, intelligent-cwoscomponent, ninca-crk, etc.) - frontend/: 4 Vue projects (elevator-front, cwos-portal, alarm-front, front_acs) + decompiled + scripts - scripts/: build, test-env, tools (Docker Compose, service templates, API parity) - docs/: AGENTS.md, superpowers specs, architecture docs - .gitignore: standard Java/Maven exclusions Moved from legacy maven-*/ root layout to backend/ organized structure.
17 KiB
org_id 策略修复验证脚本 — 实施计划
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: 编写 Python 无鉴权验证脚本 verify_org_policy_fix.py,自动准备测试数据、执行 7 个用例、清理数据、输出 JSON 报告。
Architecture: 单脚本 4 个 Phase:Phase0 健康检查 → Phase1 MySQL 准备数据 → Phase2 HTTP 逐用例调用 add/visitor + passRule/image → Phase3 MySQL 清理 → Phase4 输出 JSON。复用现有 quick_verify_visitor_floor_policy.py 的 noauth 调用模式。
Tech Stack: Python 3.8+, requests, pymysql, JSON
Spec: docs/superpowers/specs/2026-05-01-org-policy-verify-design.md
Task 1: 脚本骨架与配置
Files:
-
Create:
maven-cw-elevator-application/tools/visitor_floor_verification/scripts/verify_org_policy_fix.py -
Step 1: 写入脚本骨架
#!/usr/bin/env python3
"""org_id 策略修复 — 无鉴权验证脚本"""
import argparse
import json
import sys
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
import pymysql
import requests
# ===== 配置常量 =====
DB_CONFIG = {
"host": "192.168.3.12",
"port": 3307,
"user": "root",
"password": "123456",
"db_org": "component-organization",
"db_elevator": "cw-elevator-application",
}
BUSINESS_ID = "2524639890ba4f2cba9ba1a4eeaa4015"
# 测试用组织节点
ORG_1403 = "72fb65ec5de94201b909a98b8bae1892"
ORG_1405 = "2095de3d541f44eba686c78fda68336f"
ORG_GUANGFA = "488b8ad049bb43408a6fbcc50bcb89ac"
# 被访人
HOST_CHEN = "1060601019894960128" # 陈国辉 (1403+星中心)
HOST_WANG = "1090779433129840640" # 王姣 (1405)
HOST_QIN = "1072908835884208128" # 秦夏 (广发基金)
# 访客(测试专用号段)
VISITOR_IDS = [
"9199000100000000001", "9199000100000000002", "9199000100000000003",
"9199000100000000004", "9199000100000000005", "9199000100000000006",
"9199000100000000007",
]
ZONE_28F = "605560545117995008"
ZONE_99F = "605560540000000000" # 不存在,用于 T3
OK_CODES = {"0", "200"}
TEST_CASES = [
{
"id": "T1", "name": "有策略→allow替换floorList",
"host_id": HOST_CHEN, "visitor_id": VISITOR_IDS[0],
"policy_id": "policy_t1_1403", "expected_pass": True,
"expected_floors": [ZONE_28F],
},
{
"id": "T2", "name": "无策略→floorList",
"host_id": HOST_WANG, "visitor_id": VISITOR_IDS[1],
"policy_id": None, "expected_pass": True,
"expected_floors": None, # 不做楼层精确比对,只验证成功
},
{
"id": "T3", "name": "allow含无效zone→拒绝",
"host_id": HOST_CHEN, "visitor_id": VISITOR_IDS[2],
"policy_id": "policy_t3_invalid", "expected_pass": False,
"expected_code": "76260533",
},
{
"id": "T4", "name": "多组织命中第一个策略",
"host_id": HOST_CHEN, "visitor_id": VISITOR_IDS[3],
"policy_id": "policy_t1_1403", "expected_pass": True,
"expected_floors": [ZONE_28F],
},
{
"id": "T5", "name": "enabled=0等同无策略",
"host_id": HOST_CHEN, "visitor_id": VISITOR_IDS[4],
"policy_id": "policy_t5_disabled", "expected_pass": True,
"expected_floors": None,
},
{
"id": "T6", "name": "UC-02策略优先",
"host_id": HOST_CHEN, "visitor_id": VISITOR_IDS[5],
"policy_id": "policy_t1_1403", "expected_pass": True,
"expected_floors": [ZONE_28F],
"floor_ids_override": ["605560541473144832"], # 传6F,策略应覆盖为28F
},
{
"id": "T7", "name": "广发基金迁移验证",
"host_id": HOST_QIN, "visitor_id": VISITOR_IDS[6],
"policy_id": "gf_vstr_policy_guangfa_fund_001x", "expected_pass": True,
"expected_floors": [ZONE_28F],
},
]
def parse_args():
p = argparse.ArgumentParser(description="org_id 策略修复验证")
p.add_argument("--elevator-base-url", default="http://127.0.0.1:18081")
p.add_argument("--skip-db", action="store_true", help="跳过数据库准备/清理")
return p.parse_args()
if __name__ == "__main__":
args = parse_args()
print(f"elevator: {args.elevator_base_url}")
print(f"skip-db: {args.skip_db}")
print(f"cases: {len(TEST_CASES)}")
- Step 2: 验证导入可用
cd maven-cw-elevator-application/tools/visitor_floor_verification
python3 -c "import requests; import pymysql; print('OK')"
期望: OK
- Step 3: 提交
git add maven-cw-elevator-application/tools/visitor_floor_verification/scripts/verify_org_policy_fix.py
git commit -m "test: scaffold verify_org_policy_fix.py with config and test cases"
Task 2: Phase 0 — 健康检查 + Phase 1 — 数据准备/清理
Files:
-
Modify:
verify_org_policy_fix.py(追加函数) -
Step 1: 添加健康检查函数
def health_check(base_url: str) -> bool:
"""GET /actuator/health"""
try:
r = requests.get(f"{base_url}/actuator/health", timeout=10)
ok = r.status_code == 200
print(f"[HEALTH] {base_url} -> {r.status_code} {'OK' if ok else 'FAIL'}")
return ok
except Exception as e:
print(f"[HEALTH] {base_url} -> ERROR: {e}")
return False
- Step 2: 添加数据库连接函数
def get_db_conn():
return pymysql.connect(
host=DB_CONFIG["host"], port=DB_CONFIG["port"],
user=DB_CONFIG["user"], password=DB_CONFIG["password"],
database=DB_CONFIG["db_elevator"],
charset="utf8mb4", autocommit=True,
)
def execute_sql(sql: str, params=None):
conn = get_db_conn()
try:
with conn.cursor() as cur:
cur.execute(sql, params)
finally:
conn.close()
- Step 3: 添加数据准备函数
def prepare_test_data():
"""INSERT 测试策略 + UPDATE 广发基金 org_id"""
policies = [
("policy_t1_1403", ORG_1403, f'["{ZONE_28F}"]', 1),
("policy_t3_invalid", ORG_1403, f'["{ZONE_28F}","{ZONE_99F}"]', 1),
("policy_t5_disabled", ORG_1403, f'["{ZONE_28F}"]', 0),
]
for pid, oid, zones_json, enabled in policies:
execute_sql("DELETE FROM tenant_visitor_floor_policy WHERE id=%s", (pid,))
execute_sql(
"INSERT INTO tenant_visitor_floor_policy "
"(id, org_id, business_id, policy_type, allow_zone_ids, building_id, enabled, policy_version, created_at, updated_at) "
"VALUES (%s, %s, NULL, 'INTERSECT_ALLOWLIST', %s, NULL, %s, 1, UNIX_TIMESTAMP(NOW())*1000, UNIX_TIMESTAMP(NOW())*1000)",
(pid, oid, zones_json, enabled),
)
print(f" INSERT policy {pid} org={oid} enabled={enabled}")
# 广发基金迁移
execute_sql(
"UPDATE tenant_visitor_floor_policy SET org_id=%s WHERE id='gf_vstr_policy_guangfa_fund_001x'",
(ORG_GUANGFA,),
)
print(f" UPDATE 广发基金 org_id={ORG_GUANGFA}")
- Step 4: 添加数据清理函数
def cleanup_test_data():
"""DELETE 测试策略 + 回滚广发基金 org_id"""
for pid in ["policy_t1_1403", "policy_t3_invalid", "policy_t5_disabled"]:
execute_sql("DELETE FROM tenant_visitor_floor_policy WHERE id=%s", (pid,))
print(f" DELETE {pid}")
execute_sql(
"UPDATE tenant_visitor_floor_policy SET org_id=NULL WHERE id='gf_vstr_policy_guangfa_fund_001x'"
)
print(" UPDATE 广发基金 org_id=NULL (回滚)")
- Step 5: 测试 DB 函数
python3 -c "
import verify_org_policy_fix as v
v.prepare_test_data()
print('prepared')
v.cleanup_test_data()
print('cleaned')
"
期望: 看到 INSERT/DELETE 输出,无异常。
- Step 6: 提交
git add verify_org_policy_fix.py
git commit -m "test: add Phase 0-1: health check + DB prepare/cleanup"
Task 3: Phase 2 — noauth HTTP 调用 + 回读验证
Files:
-
Modify:
verify_org_policy_fix.py(追加函数) -
Step 1: 添加 noauth 请求头构建
def build_noauth_headers() -> Dict[str, str]:
return {
"Content-Type": "application/json",
"businessid": BUSINESS_ID,
}
def now_ms() -> int:
return int(time.time() * 1000)
def tomorrow_ms() -> int:
return int((time.time() + 86400) * 1000)
- Step 2: 添加 add_visitor 调用函数
def call_add_visitor(base_url: str, person_id: str, visitor_id: str,
floor_ids: Optional[List[str]] = None) -> Dict[str, Any]:
"""POST /elevator/person/add/visitor"""
body = {
"personId": person_id,
"visitorId": visitor_id,
"floorIds": floor_ids if floor_ids is not None else [],
"begVisitorTime": now_ms(),
"endVisitorTime": tomorrow_ms(),
}
try:
r = requests.post(
f"{base_url}/elevator/person/add/visitor",
json=body, headers=build_noauth_headers(), timeout=30,
)
return {
"http_status": r.status_code,
"body": r.json() if r.headers.get("content-type", "").startswith("application/json") else r.text,
}
except Exception as e:
return {"http_status": 0, "error": str(e)}
- Step 3: 添加 passRule/image 回读函数
def call_passrule_image(base_url: str, visitor_id: str) -> Dict[str, Any]:
"""POST /elevator/passRule/image"""
body = {"personId": visitor_id}
try:
r = requests.post(
f"{base_url}/elevator/passRule/image",
json=body, headers=build_noauth_headers(), timeout=30,
)
return {
"http_status": r.status_code,
"body": r.json() if r.headers.get("content-type", "").startswith("application/json") else r.text,
}
except Exception as e:
return {"http_status": 0, "error": str(e)}
def extract_zone_ids(passrule_response: Dict) -> List[str]:
"""从 passRule/image 响应中提取 zoneId 列表"""
try:
datas = passrule_response["body"]["data"]["datas"]
return [d["zoneId"] for d in datas if "zoneId" in d]
except (KeyError, TypeError):
return []
- Step 4: 测试 HTTP 调用(需 V2 运行中)
python3 -c "
import verify_org_policy_fix as v
r = v.call_add_visitor('http://127.0.0.1:18081', '1060601019894960128', '9199000100000000001')
print(json.dumps(r, indent=2, ensure_ascii=False))
"
期望: HTTP 200,响应中包含 success 字段。
- Step 5: 提交
git add verify_org_policy_fix.py
git commit -m "test: add Phase 2: noauth HTTP calls + passRule/image extraction"
Task 4: Phase 2 — 用例执行器 + Phase 3-4 — 报告
Files:
-
Modify:
verify_org_policy_fix.py(追加函数 + main) -
Step 1: 添加用例执行函数
def run_case(base_url: str, case: Dict[str, Any]) -> Dict[str, Any]:
"""执行单个用例,返回结果 dict"""
cid = case["id"]
print(f"\n[{cid}] {case['name']}")
floor_ids = case.get("floor_ids_override")
# Step A: 确保正确的策略行生效
pid = case.get("policy_id")
if pid:
# T3: 需要切换到 policy_t3_invalid(先停用 policy_t1_1403)
if cid == "T3":
execute_sql("DELETE FROM tenant_visitor_floor_policy WHERE id='policy_t1_1403'")
print(f" [DB] 临时删除 policy_t1_1403 以启用 T3 策略")
result = {"id": cid, "name": case["name"]}
# Step B: add/visitor
r = call_add_visitor(base_url, case["host_id"], case["visitor_id"], floor_ids)
result["add_visitor"] = {
"http_status": r.get("http_status"),
"success": r.get("body", {}).get("success") if isinstance(r.get("body"), dict) else None,
"code": r.get("body", {}).get("code") if isinstance(r.get("body"), dict) else None,
"message": r.get("body", {}).get("message") if isinstance(r.get("body"), dict) else None,
"error": r.get("error"),
}
av = result["add_visitor"]
business_ok = av["http_status"] == 200 and str(av.get("code", "")) in OK_CODES
# Step C: 判定
if case["expected_pass"]:
if business_ok:
# 回读楼层
pr = call_passrule_image(base_url, case["visitor_id"])
actual_zones = extract_zone_ids(pr)
result["passrule_image"] = {"zones": actual_zones}
expected = case.get("expected_floors")
if expected is not None:
match = set(actual_zones) == set(expected)
result["floor_match"] = match
result["passed"] = match
print(f" add/visitor OK, floors: actual={actual_zones} expected={expected} match={match}")
else:
result["passed"] = True
print(f" add/visitor OK, floors={actual_zones} (no strict check)")
else:
result["passed"] = False
print(f" expected success but got code={av.get('code')} msg={av.get('message')}")
else:
# 期望失败
expected_code = case.get("expected_code")
actual_code = str(av.get("code", ""))
result["passed"] = (not business_ok) and (actual_code == expected_code)
print(f" expected fail code={expected_code} actual={actual_code} passed={result['passed']}")
# Step D: 恢复策略(T3 执行后)
if cid == "T3":
execute_sql(
"INSERT INTO tenant_visitor_floor_policy "
"(id, org_id, business_id, policy_type, allow_zone_ids, building_id, enabled, policy_version, created_at, updated_at) "
"VALUES ('policy_t1_1403', %s, NULL, 'INTERSECT_ALLOWLIST', %s, NULL, 1, 1, UNIX_TIMESTAMP(NOW())*1000, UNIX_TIMESTAMP(NOW())*1000)",
(ORG_1403, f'["{ZONE_28F}"]'),
)
print(f" [DB] 恢复 policy_t1_1403")
return result
- Step 2: 添加报告生成函数
def generate_report(results: List[Dict], base_url: str) -> Dict:
passed = sum(1 for r in results if r.get("passed"))
failed = len(results) - passed
return {
"test": "org_id policy fix verification",
"timestamp": datetime.now().isoformat(),
"elevator_url": base_url,
"mode": "noauth-probe",
"business_id": BUSINESS_ID,
"summary": {"total": len(results), "passed": passed, "failed": failed},
"results": results,
}
- Step 3: 完善 main 函数
if __name__ == "__main__":
args = parse_args()
base = args.elevator_base_url.rstrip("/")
# Phase 0
if not health_check(base):
print("FATAL: elevator service not reachable")
sys.exit(1)
# Phase 1
if not args.skip_db:
print("\n=== Phase 1: prepare test data ===")
prepare_test_data()
# Phase 2
print(f"\n=== Phase 2: run {len(TEST_CASES)} cases ===")
results = []
for case in TEST_CASES:
r = run_case(base, case)
results.append(r)
# Phase 3
if not args.skip_db:
print("\n=== Phase 3: cleanup ===")
cleanup_test_data()
# Phase 4
report = generate_report(results, base)
report_path = f"report/org-policy-fix-verify-{datetime.now().strftime('%Y%m%d-%H%M%S')}.json"
import os
os.makedirs("report", exist_ok=True)
with open(report_path, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"\n=== Report: {report_path} ===")
print(f"Passed: {report['summary']['passed']}/{report['summary']['total']}")
for r in results:
status = "✅" if r.get("passed") else "❌"
print(f" {status} [{r['id']}] {r['name']}")
sys.exit(0 if report["summary"]["failed"] == 0 else 1)
- Step 4: 提交
git add verify_org_policy_fix.py
git commit -m "test: add Phase 2-4: case runner + report generation + main entry"
Task 5: 端到端运行验证
- Step 1: 确保 V2 运行中
curl -s http://127.0.0.1:18081/actuator/health
期望: {"status":"UP"}
- Step 2: 执行全量验证
cd maven-cw-elevator-application/tools/visitor_floor_verification
python3 scripts/verify_org_policy_fix.py --elevator-base-url http://127.0.0.1:18081
- Step 3: 检查报告
ls -la report/org-policy-fix-verify-*.json | tail -1
python3 -c "import json; r=json.load(open('$(ls -t report/org-policy-fix-verify-*.json | head -1)')); print(f\"{r['summary']['passed']}/{r['summary']['total']} passed\")"
期望: 7/7 passed
- Step 4: 提交报告(可选,不提交 JSON 到 git)
git status
完成检查清单
verify_org_policy_fix.py存在且可导入- Phase 0:
health_check()返回 True - Phase 1:
prepare_test_data()无异常 - Phase 2: 7 个用例全部执行
- Phase 3:
cleanup_test_data()无异常 - Phase 4: JSON 报告生成,7/7 passed
- 无脱敏泄露(报告中不出现真实姓名/手机号)