feat: add service config templates and extraction script

Former-commit-id: 1de24b7eb79676d1aba9d799a58c5a753290cf52
This commit is contained in:
反编译工作区
2026-05-01 19:38:01 +08:00
parent 3175b7074b
commit 8b15445328
2433 changed files with 8322164 additions and 1604 deletions
@@ -0,0 +1,256 @@
#!/usr/bin/env python3
"""
广发基金租户:访客默认仅能派梯至指定楼层(典型验收:28 层)的 HTTP 验证脚本。
业务依据(电梯应用):
PersonRuleServiceImpl.addVisitor — floorIds 为空时取被访人 floorList
若 tenant_visitor_floor_policy 启用 INTERSECT_ALLOWLIST,则与 allow_zone_ids 求交后再写规则。
验证思路:
1. POST /elevator/person/add/visitor — body 中 floorIds 为空列表(不显式传楼层)。
2. POST /elevator/passRule/image — personId=visitorId,回读该访客已开通的楼层 zone 列表。
3. 断言楼层条数与名称满足预期(默认:仅 1 条,zoneName 含「28」)。
环境变量(与 api 对拍一致,须指向广发基金租户):
ELEVATOR_HEADER_BUSINESSID — 机构/租户 ID(必填)
ELEVATOR_HEADER_LOGINID
ELEVATOR_HEADER_PLATFORMUSERID
ELEVATOR_HEADER_AUTHORIZATION
ELEVATOR_HEADER_APPLICATIONID
用法示例:
export ELEVATOR_HEADER_BUSINESSID='<广发基金 businessId>'
export ELEVATOR_HEADER_AUTHORIZATION='Bearer ...'
python3 scripts/verify_gf_visitor_default_floor.py \\
--base-url http://127.0.0.1:18081 \\
--person-id '<被访人员工 personId>' \\
--visitor-id '<访客 personId>' \\
--beg-visitor-time 1714406400000 \\
--end-visitor-time 1714492800000
可选:仅回读已有访客楼层(跳过开通)
python3 scripts/verify_gf_visitor_default_floor.py ... --skip-add
可选:校验库内策略行(需 pip install pymysql
export MYSQL_HOST=... MYSQL_PORT=3307 MYSQL_USER=root MYSQL_PASSWORD=... MYSQL_DATABASE=cw-elevator-application
python3 scripts/verify_gf_visitor_default_floor.py ... --check-policy-row
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
from pathlib import Path
import requests
_ROOT = Path(__file__).resolve().parents[1]
if str(_ROOT) not in sys.path:
sys.path.insert(0, str(_ROOT))
from parity.client import default_headers # noqa: E402
def _post_json(session: requests.Session, base: str, path: str, body: dict) -> tuple[int, dict | list | None, str]:
url = base.rstrip("/") + path
r = session.post(
url,
headers={**default_headers(), **dict(session.headers)},
data=json.dumps(body, ensure_ascii=False),
timeout=120,
)
text = r.text or ""
try:
parsed = json.loads(text) if text.strip() else None
except json.JSONDecodeError:
parsed = None
return r.status_code, parsed, text
def _business_code(payload: dict | list | None) -> str | None:
if isinstance(payload, dict) and "code" in payload:
c = payload.get("code")
return str(c) if c is not None else None
return None
def check_mysql_policy(business_id: str) -> tuple[bool, str]:
try:
import pymysql # type: ignore
except ImportError:
return False, "未安装 pymysql,跳过库表校验(pip install pymysql"
host = os.environ.get("MYSQL_HOST", "127.0.0.1").strip()
port = int(os.environ.get("MYSQL_PORT", "3306"))
user = os.environ.get("MYSQL_USER", "root").strip()
password = os.environ.get("MYSQL_PASSWORD", "").strip()
database = os.environ.get("MYSQL_DATABASE", "cw-elevator-application").strip()
if not password and not os.environ.get("MYSQL_ALLOW_EMPTY_PASSWORD"):
return False, "未设置 MYSQL_PASSWORD,跳过(或设置 MYSQL_ALLOW_EMPTY_PASSWORD=1"
conn = pymysql.connect(
host=host, port=port, user=user, password=password, database=database, charset="utf8mb4"
)
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, business_id, policy_type, allow_zone_ids, enabled, policy_version
FROM tenant_visitor_floor_policy
WHERE business_id = %s AND enabled = 1
AND policy_type = 'INTERSECT_ALLOWLIST'
AND (building_id IS NULL OR building_id = '')
ORDER BY updated_at DESC, policy_version DESC
LIMIT 1
""",
(business_id,),
)
row = cur.fetchone()
if not row:
return False, f"未查到启用中的租户默认策略 business_id={business_id!r}"
cols = [d[0] for d in cur.description]
detail = dict(zip(cols, row))
return True, json.dumps(detail, ensure_ascii=False)
finally:
conn.close()
def parse_floor_list(data_obj: dict | list | None) -> list[dict]:
if data_obj is None:
return []
if isinstance(data_obj, list):
return [x for x in data_obj if isinstance(x, dict)]
return []
def assert_floors(js2: dict | list | None, args: argparse.Namespace) -> int:
try:
cre = re.compile(args.floor_name_regex)
except re.error as e:
print(f"floor-name-regex 无效: {e}", file=sys.stderr)
return 2
data = parse_floor_list((js2 or {}).get("data") if isinstance(js2, dict) else None)
print(f"[floors] count={len(data)}")
for row in data:
print(f" zoneId={row.get('zoneId')!r} zoneName={row.get('zoneName')!r}")
if len(data) != args.expected_zone_count:
print(
f"断言失败:楼层条数期望 {args.expected_zone_count},实际 {len(data)}",
file=sys.stderr,
)
return 1
for row in data:
name = str(row.get("zoneName") or "")
if not cre.search(name):
print(
f"断言失败:zoneName={name!r} 不匹配正则 {args.floor_name_regex!r}",
file=sys.stderr,
)
return 1
print("OK:访客生效楼层与预期一致(默认策略求交后仅接待层)。")
return 0
def run_http_flow(session: requests.Session, args: argparse.Namespace, biz: str) -> int:
if not args.skip_add:
add_body = {
"visitorId": args.visitor_id,
"personId": args.person_id,
"begVisitorTime": args.beg_visitor_time,
"endVisitorTime": args.end_visitor_time,
"floorIds": [],
}
st, js, raw = _post_json(session, args.base_url, "/elevator/person/add/visitor", add_body)
code = _business_code(js if isinstance(js, dict) else None)
print(f"[add/visitor] http={st} code={code!r}")
if st != 200:
print(raw[:2000], file=sys.stderr)
return 1
if code is not None and str(code) not in ("0", "200"):
print(raw[:2000], file=sys.stderr)
return 1
img_body = {"personId": args.visitor_id, "businessId": biz}
st2, js2, raw2 = _post_json(session, args.base_url, "/elevator/passRule/image", img_body)
code2 = _business_code(js2 if isinstance(js2, dict) else None)
print(f"[passRule/image] http={st2} code={code2!r}")
if st2 != 200:
print(raw2[:2000], file=sys.stderr)
return 1
if code2 is not None and str(code2) not in ("0", "200"):
print(raw2[:2000], file=sys.stderr)
return 1
return assert_floors(js2 if isinstance(js2, dict) else None, args)
def main() -> int:
p = argparse.ArgumentParser(description="广发基金租户访客默认楼层(典型 28 层)HTTP 验证")
p.add_argument("--base-url", default=os.environ.get("ELEVATOR_VERIFY_BASE", "http://127.0.0.1:18081"))
p.add_argument("--person-id", required=True, help="被访人 personId(组织侧有 floorList);--skip-add 时仍可填占位")
p.add_argument("--visitor-id", required=True, help="访客 personId")
p.add_argument(
"--beg-visitor-time",
type=int,
default=int(os.environ.get("ELEVATOR_BEG_VISITOR_TIME", "0")),
help="访客有效期开始 epoch 毫秒",
)
p.add_argument(
"--end-visitor-time",
type=int,
default=int(os.environ.get("ELEVATOR_END_VISITOR_TIME", "0")),
help="访客有效期结束 epoch 毫秒",
)
p.add_argument("--skip-add", action="store_true", help="跳过开通,仅 passRule/image 回读(访客须已开通)")
p.add_argument("--expected-zone-count", type=int, default=1, help="期望访客生效楼层条数")
p.add_argument(
"--floor-name-regex",
default=os.environ.get("ELEVATOR_EXPECT_FLOOR_REGEX", r".*28.*"),
help="对每条 zoneName 匹配的正则(默认含 28)",
)
p.add_argument(
"--check-policy-row",
action="store_true",
help="用 MySQL 校验 tenant_visitor_floor_policy 存在启用行(需 pymysql",
)
args = p.parse_args()
if not os.environ.get("ELEVATOR_HEADER_BUSINESSID", "").strip():
print("错误:请设置环境变量 ELEVATOR_HEADER_BUSINESSID(广发基金租户 companyId", file=sys.stderr)
return 2
if not args.skip_add:
if args.beg_visitor_time <= 0 or args.end_visitor_time <= 0:
print(
"错误:开通访客时请提供 --beg-visitor-time / --end-visitor-time(毫秒)或对应环境变量",
file=sys.stderr,
)
return 2
biz = os.environ.get("ELEVATOR_HEADER_BUSINESSID", "").strip()
if args.check_policy_row:
ok, msg = check_mysql_policy(biz)
print(f"[policy-db] ok={ok} {msg}")
if not ok:
return 1
session = requests.Session()
session.headers.update(default_headers())
try:
return run_http_flow(session, args, biz)
except requests.RequestException as e:
print(f"HTTP 请求失败(服务是否已启动、--base-url 是否正确?): {e}", file=sys.stderr)
return 3
if __name__ == "__main__":
raise SystemExit(main())