#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 前端运行时自动验收(Playwright): 1) 打开页面并等待首屏稳定 2) 采集 console/pageerror/requestfailed 3) 统计重复资源请求(按 URL 去 query 后聚合) 4) 输出 markdown 报告 + 截图 """ from __future__ import annotations import argparse import asyncio import datetime as dt import json import re import sys from collections import Counter from pathlib import Path from typing import Dict, List, Tuple from urllib.parse import urlsplit, urlunsplit from playwright.async_api import Error as PlaywrightError from playwright.async_api import async_playwright def _strip_query(url: str) -> str: p = urlsplit(url) return urlunsplit((p.scheme, p.netloc, p.path, "", "")) def _now_tag() -> str: return dt.datetime.now().strftime("%Y%m%d-%H%M%S") def _classify_resource(url: str) -> str: path = urlsplit(url).path.lower() if path.endswith(".js"): return "js" if path.endswith(".css"): return "css" if re.search(r"\.(png|jpg|jpeg|gif|svg|ico|webp)$", path): return "image" if "/api/" in path or "/elevator/" in path: return "api" return "other" async def run_check(url: str, wait_ms: int, screenshot_path: Path) -> Dict[str, object]: console_errors: List[str] = [] console_warnings: List[str] = [] page_errors: List[str] = [] failed_requests: List[str] = [] requests: List[Tuple[str, str]] = [] async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context(ignore_https_errors=True) page = await context.new_page() page.set_default_timeout(5_000) page.on( "console", lambda msg: ( console_errors.append(msg.text) if msg.type == "error" else console_warnings.append(msg.text) if msg.type in {"warning", "warn"} else None ), ) page.on("pageerror", lambda err: page_errors.append(str(err))) def _on_request_failed(req) -> None: failure = req.failure if isinstance(failure, dict): err_text = failure.get("errorText", "unknown") elif isinstance(failure, str): err_text = failure elif failure is None: err_text = "unknown" else: err_text = getattr(failure, "error_text", str(failure)) failed_requests.append(f"{req.url} | {err_text}") page.on("requestfailed", _on_request_failed) page.on("requestfinished", lambda req: requests.append((req.method, req.url))) navigation_mode = "domcontentloaded" try: try: await page.goto(url, wait_until="domcontentloaded", timeout=12_000) except PlaywrightError: # 某些历史页面会因脚本阻塞导致 domcontentloaded 不触发,降级到 commit 保证可诊断 navigation_mode = "commit(fallback)" await page.goto(url, wait_until="commit", timeout=20_000) await page.wait_for_timeout(wait_ms) try: # 避免 full_page 在超长文档上触发超时,先抓取可视区域。 await page.screenshot(path=str(screenshot_path), full_page=False, timeout=15_000) except Exception: screenshot_path.write_bytes(b"") try: title = await page.title() except Exception: title = "" try: body_non_empty = bool(await page.evaluate("Boolean(document.body && document.body.innerText && document.body.innerText.trim())")) except Exception: body_non_empty = False try: app_count = await page.locator("#app *").count() except Exception: app_count = -1 except PlaywrightError as e: await browser.close() return { "fatal_error": str(e), "console_errors": console_errors, "console_warnings": console_warnings, "page_errors": page_errors, "failed_requests": failed_requests, "requests": requests, } await browser.close() normalized_urls = [_strip_query(u) for _, u in requests] url_counter = Counter(normalized_urls) duplicate_urls = [(u, c) for u, c in url_counter.items() if c > 1] duplicate_urls.sort(key=lambda x: x[1], reverse=True) resource_counter = Counter(_classify_resource(u) for u in normalized_urls) return { "fatal_error": "", "navigation_mode": navigation_mode, "title": title, "body_non_empty": body_non_empty, "app_count": app_count, "console_errors": console_errors, "console_warnings": console_warnings, "page_errors": page_errors, "failed_requests": failed_requests, "requests_total": len(requests), "requests_unique": len(url_counter), "resource_counter": dict(resource_counter), "duplicate_urls": duplicate_urls[:30], } def _render_report(url: str, result: Dict[str, object], screenshot_path: Path) -> str: lines: List[str] = [] lines.append("# 前端运行时自动验收报告") lines.append("") lines.append(f"- URL: `{url}`") lines.append(f"- 时间: `{dt.datetime.now().isoformat(timespec='seconds')}`") lines.append(f"- 截图: `{screenshot_path}`") lines.append("") fatal = str(result.get("fatal_error", "")).strip() if fatal: lines.append("## 结论") lines.append("- **失败**:页面打开阶段异常") lines.append(f"- 异常: `{fatal}`") return "\n".join(lines) + "\n" lines.append("## 页面状态") lines.append(f"- 导航模式: `{result.get('navigation_mode', '')}`") lines.append(f"- title: `{result.get('title', '')}`") lines.append(f"- body 是否非空: `{result.get('body_non_empty', False)}`") lines.append(f"- `#app` 子节点数: `{result.get('app_count', 0)}`") lines.append("") lines.append("## 请求统计") lines.append(f"- 总请求数: `{result.get('requests_total', 0)}`") lines.append(f"- 去重后 URL 数: `{result.get('requests_unique', 0)}`") lines.append(f"- 类型分布: `{json.dumps(result.get('resource_counter', {}), ensure_ascii=False)}`") lines.append("") dup = result.get("duplicate_urls", []) lines.append("## 重复请求(Top)") if dup: for u, c in dup: lines.append(f"- `{c}x` {u}") else: lines.append("- 无重复 URL 请求") lines.append("") def _section(title: str, rows: List[str]) -> None: lines.append(f"## {title}") if rows: for row in rows[:50]: lines.append(f"- {row}") else: lines.append("- 无") lines.append("") _section("Console Error", result.get("console_errors", [])) _section("Page Error", result.get("page_errors", [])) _section("Request Failed", result.get("failed_requests", [])) _section("Console Warning", result.get("console_warnings", [])) return "\n".join(lines) async def _amain() -> int: parser = argparse.ArgumentParser(description="自动检测前端运行时错误/重复加载") parser.add_argument("--url", default="http://127.0.0.1:8090/login", help="待检测 URL") parser.add_argument( "--wait-ms", type=int, default=5000, help="页面打开后额外等待毫秒数(默认 5000)" ) parser.add_argument( "--out-dir", default="artifacts/frontend-check", help="报告与截图输出目录(默认 artifacts/frontend-check)", ) args = parser.parse_args() out_dir = Path(args.out_dir).resolve() out_dir.mkdir(parents=True, exist_ok=True) tag = _now_tag() screenshot = out_dir / f"frontend-{tag}.png" report = out_dir / f"frontend-{tag}.md" result = await run_check(args.url, args.wait_ms, screenshot) report.write_text(_render_report(args.url, result, screenshot), encoding="utf-8") print(f"[frontend-check] report={report}") print(f"[frontend-check] screenshot={screenshot}") if result.get("fatal_error"): return 2 if result.get("console_errors") or result.get("page_errors") or result.get("failed_requests"): return 1 return 0 def main() -> int: try: return asyncio.run(_amain()) except KeyboardInterrupt: return 130 except Exception as e: print(f"[frontend-check] unexpected error: {e}", file=sys.stderr) return 2 if __name__ == "__main__": raise SystemExit(main())