Files
starRiverProperty/docs/testing/visitor_invite_page_init_example.py
T
反编译工作区 25db029859 feat: 租户访客策略 SQL、访客邀约验证包、component-org 与发布脚本
- docs/sql: organization_* 与 tenant_* 访客楼层策略脚本
- docs/testing: 访客邀约页初始化验证、pack 脚本与 README(忽略 dist/__pycache__)
- maven-ninca-common-component-organization: CpImageStoreServiceImpl、starter、run-verify、releases 脚本与 javap 审计 JSON
- docs/superpowers: component-org 生产问题修复计划
- scripts/test-env/prepare-db.sh 更新

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-06 22:00:16 +08:00

373 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
访客邀约页初始化 —— 一键测试(默认使用数据库中的样本数据)
1)从 MySQL 读取:
- component-organization:一名在职被访人(排除「访客」标签)的 BUSINESS_ID、人员 ID
- cw-elevator-applicationtenant_visitor_floor_policy(启用策略,优先匹配同一 business_id)的 allow_zone_ids
2)调用组织组件 POST /component/person/detail,计算 effectiveZoneIds 预览
依赖:pip install requests pymysql
一键(推荐走测试环境变量):
source ../../scripts/test-env/config/env.sh
python3 visitor_invite_page_init_example.py
或直接:
bash run_visitor_invite_page_one_click.sh
输出:仅向 stdout 打印 **一行合法 JSON**(便于 jq);进度写在 stderr。
文档:docs/business/租户访客默认楼层技术产品方案.md §2.4
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Set, Tuple
import requests
def log(msg: str) -> None:
print(msg, file=sys.stderr)
def cloudwalk_ok(body: Optional[Dict[str, Any]]) -> bool:
if not body or not isinstance(body, dict):
return False
code = body.get("code")
if code is None and "data" in body:
return True
return str(code) in ("0", "200", "00000000")
def extract_host_zone_ids(person_data: Dict[str, Any]) -> List[str]:
fl = person_data.get("floorList")
if not fl:
return []
out: List[str] = []
for x in fl:
if x is None:
continue
s = str(x).strip()
if s:
out.append(s)
return out
def compute_effective_preview(
host_zone_ids: List[str],
policy_allow_zone_ids_json: Optional[str],
) -> Dict[str, Any]:
if not policy_allow_zone_ids_json or not str(policy_allow_zone_ids_json).strip():
return {
"effective_zone_ids": list(host_zone_ids),
"mode": "host_only_no_policy",
}
try:
allow_raw = json.loads(policy_allow_zone_ids_json)
except json.JSONDecodeError as e:
return {"error": str(e), "effective_zone_ids": [], "mode": "policy_json_invalid"}
allow_set: Set[str] = {str(x).strip() for x in allow_raw if x is not None}
eff = [z for z in host_zone_ids if z in allow_set]
return {
"effective_zone_ids": eff,
"mode": "intersect_host_with_allow_list",
"allow_zone_count": len(allow_set),
}
def fetch_sample_from_mysql(
host: str,
port: int,
user: str,
password: str,
org_db: str,
elevator_db: str,
) -> Tuple[Dict[str, Any], Optional[str]]:
"""
Returns (resolution_meta, policy_allow_zone_ids_json or None).
resolution_meta 至少含 business_id, host_person_id, host_name。
"""
try:
import pymysql # type: ignore
except ImportError as e:
raise RuntimeError(f"需要安装 pymysql: pip install pymysql ({e})") from e
meta: Dict[str, Any] = {
"org_database": org_db,
"elevator_database": elevator_db,
"host_row": None,
"policy_row": None,
}
conn_kw = dict(
host=host,
port=port,
user=user,
password=password,
charset="utf8mb4",
connect_timeout=10,
read_timeout=30,
write_timeout=30,
)
business_id = ""
host_person_id = ""
host_name = ""
with pymysql.connect(database=org_db, cursorclass=pymysql.cursors.DictCursor, **conn_kw) as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT p.BUSINESS_ID AS business_id, p.ID AS host_person_id, p.NAME AS host_name
FROM cw_is_person p
INNER JOIN cw_is_person_organization_ref por ON por.PERSON_ID = p.ID
WHERE IFNULL(p.IS_DEL, 0) = 0
AND p.BUSINESS_ID IS NOT NULL AND p.BUSINESS_ID <> ''
AND NOT EXISTS (
SELECT 1
FROM cw_is_person_label_ref lr
INNER JOIN cw_is_label lb ON lb.ID = lr.LABEL_ID
WHERE lr.PERSON_ID = p.ID
AND lb.BUSINESS_ID = p.BUSINESS_ID
AND lb.NAME = '访客'
)
ORDER BY IFNULL(p.LAST_UPDATE_TIME, 0) DESC
LIMIT 1
"""
)
row = cur.fetchone()
if not row:
raise RuntimeError(
f"组织库 {org_db} 未查询到可用被访人样本(需 cw_is_person + 组织挂靠且非访客标签)"
)
business_id = str(row["business_id"]).strip()
host_person_id = str(row["host_person_id"]).strip()
host_name = str(row.get("host_name") or "").strip()
meta["host_row"] = {
"business_id": business_id,
"host_person_id": host_person_id,
"host_name": host_name,
}
policy_allow_json: Optional[str] = None
try:
with pymysql.connect(database=elevator_db, cursorclass=pymysql.cursors.DictCursor, **conn_kw) as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, business_id, org_id, allow_zone_ids, enabled, remark
FROM tenant_visitor_floor_policy
WHERE IFNULL(enabled, 0) = 1
ORDER BY
CASE WHEN business_id <=> %s THEN 0 ELSE 1 END,
IFNULL(updated_at, 0) DESC
LIMIT 1
""",
(business_id,),
)
prow = cur.fetchone()
if prow and prow.get("allow_zone_ids"):
policy_allow_json = str(prow["allow_zone_ids"]).strip()
meta["policy_row"] = {
"id": prow.get("id"),
"business_id": prow.get("business_id"),
"org_id": prow.get("org_id"),
"allow_zone_ids": policy_allow_json,
"remark": (prow.get("remark") or "")[:200],
}
except Exception as ex:
meta["policy_query_error"] = str(ex)
return meta, policy_allow_json
def run_http_flow(
org_base_url: str,
business_id: str,
host_person_id: str,
policy_allow_json: Optional[str],
common_base_url: str,
timeout: float,
) -> Dict[str, Any]:
session = requests.Session()
detail_url = org_base_url.rstrip("/") + "/component/person/detail"
headers = {
"Content-Type": "application/json;charset=UTF-8",
"businessid": business_id,
}
payload = {"id": host_person_id, "businessId": business_id}
log(f"POST {detail_url}")
r = session.post(detail_url, json=payload, headers=headers, timeout=timeout)
detail_http = r.status_code
try:
detail_body: Any = r.json()
except Exception:
detail_body = {"_parse_error": True, "text_head": (r.text or "")[:2000]}
step_detail = {
"request": payload,
"http_status": detail_http,
"response": detail_body,
}
if isinstance(detail_body, dict) and not cloudwalk_ok(detail_body):
return {
"ok": False,
"error": "person_detail_business_failed",
"steps": {"person_detail": step_detail},
}
data = detail_body.get("data") if isinstance(detail_body, dict) else {}
if not isinstance(data, dict):
data = {}
host_zone_ids = extract_host_zone_ids(data)
preview = compute_effective_preview(host_zone_ids, policy_allow_json)
zone_try: Optional[Dict[str, Any]] = None
if common_base_url.strip():
url = common_base_url.rstrip("/") + "/sysetting/zone/list"
zpayload: Dict[str, Any] = {"zoneIds": preview.get("effective_zone_ids") or []}
try:
zr = session.post(url, json=zpayload, headers=headers, timeout=timeout)
zone_try = {"url": url, "http_status": zr.status_code}
try:
zone_try["response"] = zr.json()
except Exception:
zone_try["text_head"] = (zr.text or "")[:1200]
except Exception as ex:
zone_try = {"url": url, "error": str(ex)}
page_init = {
"businessId": business_id,
"hostPersonId": host_person_id,
"hostPersonName": data.get("name"),
"hostZoneIds": host_zone_ids,
"effectiveZoneIds": preview.get("effective_zone_ids"),
"policyMode": preview.get("mode"),
}
return {
"ok": True,
"started_at": datetime.now(timezone.utc).isoformat(),
"steps": {
"person_detail": step_detail,
"floor_preview": {
"policy_allow_zone_ids_json": policy_allow_json,
**preview,
},
"zone_optional": zone_try,
},
"page_init": page_init,
}
def main() -> int:
ap = argparse.ArgumentParser(description="访客邀约页初始化一键测试")
ap.add_argument(
"--org-base-url",
default=os.environ.get("ORG_BASE", os.environ.get("COMPONENT_ORG_URL", "http://127.0.0.1:33011")),
help="组织组件 HTTP 基址(测试环境常见 PORT_COMPONENT_ORG",
)
ap.add_argument("--business-id", default=os.environ.get("BUSINESS_ID", "").strip())
ap.add_argument("--host-person-id", default=os.environ.get("HOST_PERSON_ID", "").strip())
ap.add_argument("--policy-allow-json", default=os.environ.get("POLICY_ALLOW_ZONE_IDS_JSON", "").strip())
ap.add_argument("--common-base-url", default=os.environ.get("COMMON_BASE", "").strip())
ap.add_argument("--timeout", type=float, default=float(os.environ.get("HTTP_TIMEOUT", "30")))
ap.add_argument(
"--mysql-host",
default=os.environ.get("MYSQL_HOST", "127.0.0.1"),
)
ap.add_argument(
"--mysql-port",
type=int,
default=int(os.environ.get("MYSQL_PORT", "3307")),
)
ap.add_argument("--mysql-user", default=os.environ.get("MYSQL_USER", "root"))
ap.add_argument(
"--mysql-password",
default=os.environ.get("MYSQL_PASS", os.environ.get("MYSQL_PASSWORD", "")),
)
ap.add_argument(
"--org-database",
default=os.environ.get("DB_COMPONENT_ORG", "component-organization"),
)
ap.add_argument(
"--elevator-database",
default=os.environ.get("DB_ELEVATOR", "cw-elevator-application"),
)
ap.add_argument(
"--no-db",
action="store_true",
help="不从数据库拉样本(必须提供 --business-id 与 --host-person-id",
)
args = ap.parse_args()
report: Dict[str, Any] = {"tool": "visitor_invite_page_init_example", "ok": False}
policy_allow = args.policy_allow_json or None
meta_db: Optional[Dict[str, Any]] = None
try:
if not args.no_db and (not args.business_id or not args.host_person_id):
log("从数据库加载默认样本(BUSINESS_ID / 被访人 / 策略 allow_zone_ids)…")
meta_db, policy_allow = fetch_sample_from_mysql(
args.mysql_host,
args.mysql_port,
args.mysql_user,
args.mysql_password,
args.org_database,
args.elevator_database,
)
args.business_id = meta_db["host_row"]["business_id"]
args.host_person_id = meta_db["host_row"]["host_person_id"]
report["resolved_from_database"] = meta_db
elif not args.business_id or not args.host_person_id:
log("错误:未指定 --business-id / --host-person-id,且未使用数据库默认。", file=sys.stderr)
report["error"] = "missing_ids_use_db_or_pass_explicit"
print(json.dumps(report, ensure_ascii=False))
return 2
if policy_allow:
report["policy_allow_zone_ids_source"] = (
"database" if meta_db and meta_db.get("policy_row") else "cli_or_env"
)
else:
report["policy_allow_zone_ids_source"] = "none"
result = run_http_flow(
args.org_base_url,
args.business_id,
args.host_person_id,
policy_allow,
args.common_base_url,
args.timeout,
)
report.update(result)
if not report.get("ok"):
print(json.dumps(report, ensure_ascii=False))
return 1
log("测试完成。完整 JSON 见 stdout。")
print(json.dumps(report, ensure_ascii=False))
return 0
except Exception as ex:
report["ok"] = False
report["error"] = str(ex)
print(json.dumps(report, ensure_ascii=False))
log(str(ex), file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())