Files
starRiverProperty/docs/superpowers/plans/2026-05-01-org-policy-verify.md
hpd840321 7b2bd307f1 Initial commit: reorganized source tree
- backend/: 13 Maven modules (cw-elevator-application, cloudwalk-cloud, intelligent-cwoscomponent, ninca-crk, etc.)
- frontend/: 4 Vue projects (elevator-front, cwos-portal, alarm-front, front_acs) + decompiled + scripts
- scripts/: build, test-env, tools (Docker Compose, service templates, API parity)
- docs/: AGENTS.md, superpowers specs, architecture docs
- .gitignore: standard Java/Maven exclusions

Moved from legacy maven-*/ root layout to backend/ organized structure.
2026-05-09 09:56:45 +08:00

17 KiB
Raw Permalink Blame History

org_id 策略修复验证脚本 — 实施计划

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: 编写 Python 无鉴权验证脚本 verify_org_policy_fix.py,自动准备测试数据、执行 7 个用例、清理数据、输出 JSON 报告。

Architecture: 单脚本 4 个 PhasePhase0 健康检查 → Phase1 MySQL 准备数据 → Phase2 HTTP 逐用例调用 add/visitor + passRule/image → Phase3 MySQL 清理 → Phase4 输出 JSON。复用现有 quick_verify_visitor_floor_policy.py 的 noauth 调用模式。

Tech Stack: Python 3.8+, requests, pymysql, JSON

Spec: docs/superpowers/specs/2026-05-01-org-policy-verify-design.md


Task 1: 脚本骨架与配置

Files:

  • Create: maven-cw-elevator-application/tools/visitor_floor_verification/scripts/verify_org_policy_fix.py

  • Step 1: 写入脚本骨架

#!/usr/bin/env python3
"""org_id 策略修复 — 无鉴权验证脚本"""

import argparse
import json
import sys
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional

import pymysql
import requests

# ===== 配置常量 =====
DB_CONFIG = {
    "host": "192.168.3.12",
    "port": 3307,
    "user": "root",
    "password": "123456",
    "db_org": "component-organization",
    "db_elevator": "cw-elevator-application",
}

BUSINESS_ID = "2524639890ba4f2cba9ba1a4eeaa4015"

# 测试用组织节点
ORG_1403 = "72fb65ec5de94201b909a98b8bae1892"
ORG_1405 = "2095de3d541f44eba686c78fda68336f"
ORG_GUANGFA = "488b8ad049bb43408a6fbcc50bcb89ac"

# 被访人
HOST_CHEN = "1060601019894960128"       # 陈国辉 (1403+星中心)
HOST_WANG = "1090779433129840640"       # 王姣 (1405)
HOST_QIN = "1072908835884208128"        # 秦夏 (广发基金)

# 访客(测试专用号段)
VISITOR_IDS = [
    "9199000100000000001", "9199000100000000002", "9199000100000000003",
    "9199000100000000004", "9199000100000000005", "9199000100000000006",
    "9199000100000000007",
]

ZONE_28F = "605560545117995008"
ZONE_99F = "605560540000000000"  # 不存在,用于 T3

OK_CODES = {"0", "200"}

TEST_CASES = [
    {
        "id": "T1", "name": "有策略→allow替换floorList",
        "host_id": HOST_CHEN, "visitor_id": VISITOR_IDS[0],
        "policy_id": "policy_t1_1403", "expected_pass": True,
        "expected_floors": [ZONE_28F],
    },
    {
        "id": "T2", "name": "无策略→floorList",
        "host_id": HOST_WANG, "visitor_id": VISITOR_IDS[1],
        "policy_id": None, "expected_pass": True,
        "expected_floors": None,  # 不做楼层精确比对,只验证成功
    },
    {
        "id": "T3", "name": "allow含无效zone→拒绝",
        "host_id": HOST_CHEN, "visitor_id": VISITOR_IDS[2],
        "policy_id": "policy_t3_invalid", "expected_pass": False,
        "expected_code": "76260533",
    },
    {
        "id": "T4", "name": "多组织命中第一个策略",
        "host_id": HOST_CHEN, "visitor_id": VISITOR_IDS[3],
        "policy_id": "policy_t1_1403", "expected_pass": True,
        "expected_floors": [ZONE_28F],
    },
    {
        "id": "T5", "name": "enabled=0等同无策略",
        "host_id": HOST_CHEN, "visitor_id": VISITOR_IDS[4],
        "policy_id": "policy_t5_disabled", "expected_pass": True,
        "expected_floors": None,
    },
    {
        "id": "T6", "name": "UC-02策略优先",
        "host_id": HOST_CHEN, "visitor_id": VISITOR_IDS[5],
        "policy_id": "policy_t1_1403", "expected_pass": True,
        "expected_floors": [ZONE_28F],
        "floor_ids_override": ["605560541473144832"],  # 传6F,策略应覆盖为28F
    },
    {
        "id": "T7", "name": "广发基金迁移验证",
        "host_id": HOST_QIN, "visitor_id": VISITOR_IDS[6],
        "policy_id": "gf_vstr_policy_guangfa_fund_001x", "expected_pass": True,
        "expected_floors": [ZONE_28F],
    },
]


def parse_args():
    p = argparse.ArgumentParser(description="org_id 策略修复验证")
    p.add_argument("--elevator-base-url", default="http://127.0.0.1:18081")
    p.add_argument("--skip-db", action="store_true", help="跳过数据库准备/清理")
    return p.parse_args()


if __name__ == "__main__":
    args = parse_args()
    print(f"elevator: {args.elevator_base_url}")
    print(f"skip-db: {args.skip_db}")
    print(f"cases: {len(TEST_CASES)}")
  • Step 2: 验证导入可用
cd maven-cw-elevator-application/tools/visitor_floor_verification
python3 -c "import requests; import pymysql; print('OK')"

期望: OK

  • Step 3: 提交
git add maven-cw-elevator-application/tools/visitor_floor_verification/scripts/verify_org_policy_fix.py
git commit -m "test: scaffold verify_org_policy_fix.py with config and test cases"

Task 2: Phase 0 — 健康检查 + Phase 1 — 数据准备/清理

Files:

  • Modify: verify_org_policy_fix.py (追加函数)

  • Step 1: 添加健康检查函数

def health_check(base_url: str) -> bool:
    """GET /actuator/health"""
    try:
        r = requests.get(f"{base_url}/actuator/health", timeout=10)
        ok = r.status_code == 200
        print(f"[HEALTH] {base_url} -> {r.status_code} {'OK' if ok else 'FAIL'}")
        return ok
    except Exception as e:
        print(f"[HEALTH] {base_url} -> ERROR: {e}")
        return False
  • Step 2: 添加数据库连接函数
def get_db_conn():
    return pymysql.connect(
        host=DB_CONFIG["host"], port=DB_CONFIG["port"],
        user=DB_CONFIG["user"], password=DB_CONFIG["password"],
        database=DB_CONFIG["db_elevator"],
        charset="utf8mb4", autocommit=True,
    )


def execute_sql(sql: str, params=None):
    conn = get_db_conn()
    try:
        with conn.cursor() as cur:
            cur.execute(sql, params)
    finally:
        conn.close()
  • Step 3: 添加数据准备函数
def prepare_test_data():
    """INSERT 测试策略 + UPDATE 广发基金 org_id"""
    policies = [
        ("policy_t1_1403", ORG_1403, f'["{ZONE_28F}"]', 1),
        ("policy_t3_invalid", ORG_1403, f'["{ZONE_28F}","{ZONE_99F}"]', 1),
        ("policy_t5_disabled", ORG_1403, f'["{ZONE_28F}"]', 0),
    ]
    for pid, oid, zones_json, enabled in policies:
        execute_sql("DELETE FROM tenant_visitor_floor_policy WHERE id=%s", (pid,))
        execute_sql(
            "INSERT INTO tenant_visitor_floor_policy "
            "(id, org_id, business_id, policy_type, allow_zone_ids, building_id, enabled, policy_version, created_at, updated_at) "
            "VALUES (%s, %s, NULL, 'INTERSECT_ALLOWLIST', %s, NULL, %s, 1, UNIX_TIMESTAMP(NOW())*1000, UNIX_TIMESTAMP(NOW())*1000)",
            (pid, oid, zones_json, enabled),
        )
        print(f"  INSERT policy {pid} org={oid} enabled={enabled}")

    # 广发基金迁移
    execute_sql(
        "UPDATE tenant_visitor_floor_policy SET org_id=%s WHERE id='gf_vstr_policy_guangfa_fund_001x'",
        (ORG_GUANGFA,),
    )
    print(f"  UPDATE 广发基金 org_id={ORG_GUANGFA}")
  • Step 4: 添加数据清理函数
def cleanup_test_data():
    """DELETE 测试策略 + 回滚广发基金 org_id"""
    for pid in ["policy_t1_1403", "policy_t3_invalid", "policy_t5_disabled"]:
        execute_sql("DELETE FROM tenant_visitor_floor_policy WHERE id=%s", (pid,))
        print(f"  DELETE {pid}")

    execute_sql(
        "UPDATE tenant_visitor_floor_policy SET org_id=NULL WHERE id='gf_vstr_policy_guangfa_fund_001x'"
    )
    print("  UPDATE 广发基金 org_id=NULL (回滚)")
  • Step 5: 测试 DB 函数
python3 -c "
import verify_org_policy_fix as v
v.prepare_test_data()
print('prepared')
v.cleanup_test_data()
print('cleaned')
"

期望: 看到 INSERT/DELETE 输出,无异常。

  • Step 6: 提交
git add verify_org_policy_fix.py
git commit -m "test: add Phase 0-1: health check + DB prepare/cleanup"

Task 3: Phase 2 — noauth HTTP 调用 + 回读验证

Files:

  • Modify: verify_org_policy_fix.py (追加函数)

  • Step 1: 添加 noauth 请求头构建

def build_noauth_headers() -> Dict[str, str]:
    return {
        "Content-Type": "application/json",
        "businessid": BUSINESS_ID,
    }


def now_ms() -> int:
    return int(time.time() * 1000)


def tomorrow_ms() -> int:
    return int((time.time() + 86400) * 1000)
  • Step 2: 添加 add_visitor 调用函数
def call_add_visitor(base_url: str, person_id: str, visitor_id: str,
                     floor_ids: Optional[List[str]] = None) -> Dict[str, Any]:
    """POST /elevator/person/add/visitor"""
    body = {
        "personId": person_id,
        "visitorId": visitor_id,
        "floorIds": floor_ids if floor_ids is not None else [],
        "begVisitorTime": now_ms(),
        "endVisitorTime": tomorrow_ms(),
    }
    try:
        r = requests.post(
            f"{base_url}/elevator/person/add/visitor",
            json=body, headers=build_noauth_headers(), timeout=30,
        )
        return {
            "http_status": r.status_code,
            "body": r.json() if r.headers.get("content-type", "").startswith("application/json") else r.text,
        }
    except Exception as e:
        return {"http_status": 0, "error": str(e)}
  • Step 3: 添加 passRule/image 回读函数
def call_passrule_image(base_url: str, visitor_id: str) -> Dict[str, Any]:
    """POST /elevator/passRule/image"""
    body = {"personId": visitor_id}
    try:
        r = requests.post(
            f"{base_url}/elevator/passRule/image",
            json=body, headers=build_noauth_headers(), timeout=30,
        )
        return {
            "http_status": r.status_code,
            "body": r.json() if r.headers.get("content-type", "").startswith("application/json") else r.text,
        }
    except Exception as e:
        return {"http_status": 0, "error": str(e)}


def extract_zone_ids(passrule_response: Dict) -> List[str]:
    """从 passRule/image 响应中提取 zoneId 列表"""
    try:
        datas = passrule_response["body"]["data"]["datas"]
        return [d["zoneId"] for d in datas if "zoneId" in d]
    except (KeyError, TypeError):
        return []
  • Step 4: 测试 HTTP 调用(需 V2 运行中)
python3 -c "
import verify_org_policy_fix as v
r = v.call_add_visitor('http://127.0.0.1:18081', '1060601019894960128', '9199000100000000001')
print(json.dumps(r, indent=2, ensure_ascii=False))
"

期望: HTTP 200,响应中包含 success 字段。

  • Step 5: 提交
git add verify_org_policy_fix.py
git commit -m "test: add Phase 2: noauth HTTP calls + passRule/image extraction"

Task 4: Phase 2 — 用例执行器 + Phase 3-4 — 报告

Files:

  • Modify: verify_org_policy_fix.py (追加函数 + main)

  • Step 1: 添加用例执行函数

def run_case(base_url: str, case: Dict[str, Any]) -> Dict[str, Any]:
    """执行单个用例,返回结果 dict"""
    cid = case["id"]
    print(f"\n[{cid}] {case['name']}")

    floor_ids = case.get("floor_ids_override")

    # Step A: 确保正确的策略行生效
    pid = case.get("policy_id")
    if pid:
        # T3: 需要切换到 policy_t3_invalid(先停用 policy_t1_1403
        if cid == "T3":
            execute_sql("DELETE FROM tenant_visitor_floor_policy WHERE id='policy_t1_1403'")
            print(f"  [DB] 临时删除 policy_t1_1403 以启用 T3 策略")

    result = {"id": cid, "name": case["name"]}

    # Step B: add/visitor
    r = call_add_visitor(base_url, case["host_id"], case["visitor_id"], floor_ids)
    result["add_visitor"] = {
        "http_status": r.get("http_status"),
        "success": r.get("body", {}).get("success") if isinstance(r.get("body"), dict) else None,
        "code": r.get("body", {}).get("code") if isinstance(r.get("body"), dict) else None,
        "message": r.get("body", {}).get("message") if isinstance(r.get("body"), dict) else None,
        "error": r.get("error"),
    }
    av = result["add_visitor"]
    business_ok = av["http_status"] == 200 and str(av.get("code", "")) in OK_CODES

    # Step C: 判定
    if case["expected_pass"]:
        if business_ok:
            # 回读楼层
            pr = call_passrule_image(base_url, case["visitor_id"])
            actual_zones = extract_zone_ids(pr)
            result["passrule_image"] = {"zones": actual_zones}
            expected = case.get("expected_floors")
            if expected is not None:
                match = set(actual_zones) == set(expected)
                result["floor_match"] = match
                result["passed"] = match
                print(f"  add/visitor OK, floors: actual={actual_zones} expected={expected} match={match}")
            else:
                result["passed"] = True
                print(f"  add/visitor OK, floors={actual_zones} (no strict check)")
        else:
            result["passed"] = False
            print(f"  expected success but got code={av.get('code')} msg={av.get('message')}")
    else:
        # 期望失败
        expected_code = case.get("expected_code")
        actual_code = str(av.get("code", ""))
        result["passed"] = (not business_ok) and (actual_code == expected_code)
        print(f"  expected fail code={expected_code} actual={actual_code} passed={result['passed']}")

    # Step D: 恢复策略(T3 执行后)
    if cid == "T3":
        execute_sql(
            "INSERT INTO tenant_visitor_floor_policy "
            "(id, org_id, business_id, policy_type, allow_zone_ids, building_id, enabled, policy_version, created_at, updated_at) "
            "VALUES ('policy_t1_1403', %s, NULL, 'INTERSECT_ALLOWLIST', %s, NULL, 1, 1, UNIX_TIMESTAMP(NOW())*1000, UNIX_TIMESTAMP(NOW())*1000)",
            (ORG_1403, f'["{ZONE_28F}"]'),
        )
        print(f"  [DB] 恢复 policy_t1_1403")

    return result
  • Step 2: 添加报告生成函数
def generate_report(results: List[Dict], base_url: str) -> Dict:
    passed = sum(1 for r in results if r.get("passed"))
    failed = len(results) - passed
    return {
        "test": "org_id policy fix verification",
        "timestamp": datetime.now().isoformat(),
        "elevator_url": base_url,
        "mode": "noauth-probe",
        "business_id": BUSINESS_ID,
        "summary": {"total": len(results), "passed": passed, "failed": failed},
        "results": results,
    }
  • Step 3: 完善 main 函数
if __name__ == "__main__":
    args = parse_args()
    base = args.elevator_base_url.rstrip("/")

    # Phase 0
    if not health_check(base):
        print("FATAL: elevator service not reachable")
        sys.exit(1)

    # Phase 1
    if not args.skip_db:
        print("\n=== Phase 1: prepare test data ===")
        prepare_test_data()

    # Phase 2
    print(f"\n=== Phase 2: run {len(TEST_CASES)} cases ===")
    results = []
    for case in TEST_CASES:
        r = run_case(base, case)
        results.append(r)

    # Phase 3
    if not args.skip_db:
        print("\n=== Phase 3: cleanup ===")
        cleanup_test_data()

    # Phase 4
    report = generate_report(results, base)
    report_path = f"report/org-policy-fix-verify-{datetime.now().strftime('%Y%m%d-%H%M%S')}.json"
    import os
    os.makedirs("report", exist_ok=True)
    with open(report_path, "w", encoding="utf-8") as f:
        json.dump(report, f, indent=2, ensure_ascii=False)

    print(f"\n=== Report: {report_path} ===")
    print(f"Passed: {report['summary']['passed']}/{report['summary']['total']}")
    for r in results:
        status = "✅" if r.get("passed") else "❌"
        print(f"  {status} [{r['id']}] {r['name']}")

    sys.exit(0 if report["summary"]["failed"] == 0 else 1)
  • Step 4: 提交
git add verify_org_policy_fix.py
git commit -m "test: add Phase 2-4: case runner + report generation + main entry"

Task 5: 端到端运行验证

  • Step 1: 确保 V2 运行中
curl -s http://127.0.0.1:18081/actuator/health

期望: {"status":"UP"}

  • Step 2: 执行全量验证
cd maven-cw-elevator-application/tools/visitor_floor_verification
python3 scripts/verify_org_policy_fix.py --elevator-base-url http://127.0.0.1:18081
  • Step 3: 检查报告
ls -la report/org-policy-fix-verify-*.json | tail -1
python3 -c "import json; r=json.load(open('$(ls -t report/org-policy-fix-verify-*.json | head -1)')); print(f\"{r['summary']['passed']}/{r['summary']['total']} passed\")"

期望: 7/7 passed

  • Step 4: 提交报告(可选,不提交 JSON 到 git)
git status

完成检查清单

  • verify_org_policy_fix.py 存在且可导入
  • Phase 0: health_check() 返回 True
  • Phase 1: prepare_test_data() 无异常
  • Phase 2: 7 个用例全部执行
  • Phase 3: cleanup_test_data() 无异常
  • Phase 4: JSON 报告生成,7/7 passed
  • 无脱敏泄露(报告中不出现真实姓名/手机号)