#!/usr/bin/env python3 # -*- coding: utf-8 -*- """从 V1/V2 fat-jar 二进制解析嵌套 lib,读取 META-INF/maven/**/pom.properties,生成与 Maven 坐标可比对的数据。 子命令/模式: - 默认:写 docs/testing/cw-elevator-v1-v2-dependency-diff.md - --gate:以 V1 为基准,对 candidate fat-jar 做嵌套 JAR 坐标 multiset 比对,不一致时非零退出(可配允许列表)。 """ from __future__ import annotations import argparse import io import sys import zipfile from collections import Counter, defaultdict from pathlib import Path def _parse_props(raw: str) -> dict[str, str]: props: dict[str, str] = {} for line in raw.splitlines(): line = line.strip() if "=" in line and not line.startswith("#"): k, _, v = line.partition("=") props[k.strip()] = v.strip() return props def coords_from_nested_jar(data: bytes, jar_basename: str) -> tuple[str, str, str]: """Returns (groupId, artifactId, version) using META-INF/maven/**/pom.properties.""" try: z = zipfile.ZipFile(io.BytesIO(data)) except zipfile.BadZipFile: return ("?", "?", "?") candidates: list[tuple[str, str, str]] = [] for name in z.namelist(): if not name.endswith("/pom.properties"): continue if "META-INF/maven/" not in name: continue try: raw = z.read(name).decode("utf-8", errors="replace") except KeyError: continue props = _parse_props(raw) gid = props.get("groupId") aid = props.get("artifactId") ver = props.get("version") if gid and aid and ver: candidates.append((gid, aid, ver)) if not candidates: return ("?", "?", "?") stem = jar_basename[: -len(".jar")] if jar_basename.endswith(".jar") else jar_basename for g, a, v in candidates: if stem.startswith(a) or a in stem or stem.startswith(a.split(".")[-1]): return (g, a, v) return candidates[0] def list_outer_nested( outer: Path, inner_dir_prefix: str ) -> dict[str, tuple[str, str, str]]: """path_in_zip -> (g,a,v)""" result: dict[str, tuple[str, str, str]] = {} z = zipfile.ZipFile(outer) for name in z.namelist(): if not name.startswith(inner_dir_prefix): continue if not name.endswith(".jar"): continue if name.endswith(".jar.original"): continue try: data = z.read(name) except KeyError: continue base = Path(name).name g, a, v = coords_from_nested_jar(data, base) result[name] = (g, a, v) return result def detect_lib_prefix(outer: Path) -> str: """可执行包为 V1 风格 lib/ 或 Spring Boot repackage 的 BOOT-INF/lib/。""" z = zipfile.ZipFile(outer) has_boot = any(n.startswith("BOOT-INF/lib/") and n.endswith(".jar") for n in z.namelist()) has_lib = any(n.startswith("lib/") and n.endswith(".jar") for n in z.namelist()) if has_boot and not has_lib: return "BOOT-INF/lib/" if has_lib: return "lib/" return "lib/" def key_ga(path: str, t: tuple[str, str, str]) -> str: g, a, v = t if g == "?" or a == "?" or v == "?": return f"unresolved:{Path(path).name}" return f"{g}:{a}:{v}" def multiset_from_jar(outer: Path) -> tuple[Counter[str], str, dict[str, tuple[str, str, str]]]: prefix = detect_lib_prefix(outer) jmap = list_outer_nested(outer, prefix) ctr: Counter[str] = Counter() for path, t in jmap.items(): ctr[key_ga(path, t)] += 1 return ctr, prefix, jmap def _read_allow_counter(path: Path | None) -> Counter[str]: if path is None or not path.is_file(): return Counter() c: Counter[str] = Counter() for line in path.read_text(encoding="utf-8", errors="replace").splitlines(): line = line.strip() if not line or line.startswith("#"): continue parts = line.split() if len(parts) >= 2 and parts[-1].isdigit(): key, cnt = " ".join(parts[:-1]), int(parts[-1]) else: key, cnt = line, 1 c[key] += cnt return c def _subtract_allow(diff: Counter[str], allow: Counter[str]) -> Counter[str]: """从 multiset 差值中扣减允许的盈余额度(每行 `coord` 或 `coord N`)。""" out = Counter(diff) for k, n in allow.items(): if k not in out: continue out[k] -= min(out[k], n) if out[k] <= 0: del out[k] return out def run_lib_gate( baseline_jar: Path, candidate_jar: Path, allow_baseline_only: Path | None, allow_candidate_only: Path | None, ) -> int: """Multiset 门禁:在应用允许偏差后 baseline 与 candidate 须一致。""" if not baseline_jar.is_file(): print("Missing baseline jar:", baseline_jar, file=sys.stderr) return 2 if not candidate_jar.is_file(): print("Missing candidate jar:", candidate_jar, file=sys.stderr) return 2 b_ctr, b_prefix, _ = multiset_from_jar(baseline_jar) c_ctr, c_prefix, _ = multiset_from_jar(candidate_jar) allow_b = _read_allow_counter(allow_baseline_only) allow_c = _read_allow_counter(allow_candidate_only) only_b = _subtract_allow(b_ctr - c_ctr, allow_b) only_c = _subtract_allow(c_ctr - b_ctr, allow_c) if not only_b and not only_c: print( "lib parity OK:", f"baseline={baseline_jar.name} ({b_prefix} n={sum(b_ctr.values())})", f"candidate={candidate_jar.name} ({c_prefix} n={sum(c_ctr.values())})", ) return 0 print("lib parity FAILED (multiset diff after allowlists)", file=sys.stderr) print(f" baseline: {baseline_jar} prefix={b_prefix} keys={len(b_ctr)}", file=sys.stderr) print(f" candidate: {candidate_jar} prefix={c_prefix} keys={len(c_ctr)}", file=sys.stderr) if only_b: print(" only in baseline (or excess count):", file=sys.stderr) for k in sorted(only_b.keys()): print(f" {k} x{only_b[k]}", file=sys.stderr) if only_c: print(" only in candidate (or excess count):", file=sys.stderr) for k in sorted(only_c.keys()): print(f" {k} x{only_c[k]}", file=sys.stderr) return 1 def parse_maven_dependency_list(path: Path) -> list[tuple[str, str, str, str]]: """Parse mvn dependency:list output lines like 'groupId:artifactId:jar:version:compile'.""" rows: list[tuple[str, str, str, str]] = [] text = path.read_text(encoding="utf-8", errors="replace") for line in text.splitlines(): line = line.strip() if not line or line.startswith("The following"): continue if line == "none": continue parts = line.split(":") if len(parts) >= 5 and parts[2] == "jar": gid, aid, _, ver, scope = parts[0], parts[1], parts[2], parts[3], parts[4] rows.append((gid, aid, ver, scope)) return sorted(rows, key=lambda x: (x[0], x[1], x[2])) def collect_ga_versions_from_jar_map( jmap: dict[str, tuple[str, str, str]], ) -> dict[tuple[str, str], set[str]]: """嵌套 jar -> 每个 (groupId, artifactId) 在 fat-jar 中出现的 version 集合(仅 pom.properties 可解析条目)。""" ga_ver: dict[tuple[str, str], set[str]] = defaultdict(set) for _path, (g, a, v) in jmap.items(): if g != "?" and a != "?" and v != "?": ga_ver[(g, a)].add(v) return ga_ver def ga_version_skew_rows( v1_ga_v: dict[tuple[str, str], set[str]], v2_ga_v: dict[tuple[str, str], set[str]], ) -> list[tuple[tuple[str, str], list[str], list[str]]]: """两侧均能解析到 version、且 version 集合不一致的 GA。""" out: list[tuple[tuple[str, str], list[str], list[str]]] = [] all_ga = set(v1_ga_v.keys()) | set(v2_ga_v.keys()) for ga in sorted(all_ga, key=lambda x: (x[0], x[1])): s1 = v1_ga_v.get(ga, set()) s2 = v2_ga_v.get(ga, set()) if s1 and s2 and s1 != s2: out.append((ga, sorted(s1), sorted(s2))) return out def main() -> int: root = Path(__file__).resolve().parents[1] ap = argparse.ArgumentParser(description="V1/V2 fat-jar 依赖对比或 lib multiset 门禁") ap.add_argument( "--gate", action="store_true", help="运行 multiset 门禁(默认基准 V1 jar,候选为 releases 或 starter/target)", ) ap.add_argument("--baseline-jar", type=Path, default=None, help="门禁基准 fat-jar") ap.add_argument("--candidate-jar", type=Path, default=None, help="门禁待测 fat-jar") ap.add_argument( "--allow-baseline-only", type=Path, default=None, help="允许仅出现在 baseline 的坐标(每行 `g:a:v` 或 `unresolved:name.jar`,可选末尾计数)", ) ap.add_argument( "--allow-candidate-only", type=Path, default=None, help="允许仅出现在 candidate 的坐标(格式同上)", ) args = ap.parse_args() if args.gate: b = ( args.baseline_jar if args.baseline_jar else root / "cw-elevator-application-V1.0.0.20211103" / "cw-elevator-application-V1.0.0.20211103.jar" ) cand = args.candidate_jar if cand is None: # 优先本地 package 产物(与当前父 POM / 插件一致);其次历史 releases 归档 cand = ( root / "maven-cw-elevator-application" / "cw-elevator-application-starter" / "target" / "cw-elevator-application-2.0.7.jar" ) if not cand.is_file(): cand = ( root / "maven-cw-elevator-application" / "cw-elevator-application-starter" / "target" / "cw-elevator-application-2.0.0.jar" ) if not cand.is_file(): cand = ( root / "maven-cw-elevator-application" / "releases" / "cw-elevator-application-V2.0.6.20260430" / "cw-elevator-application-2.0.6.jar" ) allow_b = args.allow_baseline_only if allow_b is None: p = root / "docs" / "testing" / "cw-elevator-fatjar-lib-parity-allow-baseline-only.txt" allow_b = p if p.is_file() else None allow_c = args.allow_candidate_only if allow_c is None: p = root / "docs" / "testing" / "cw-elevator-fatjar-lib-parity-allow-candidate-only.txt" allow_c = p if p.is_file() else None return run_lib_gate(b, cand, allow_b, allow_c) v1_jar = root / "cw-elevator-application-V1.0.0.20211103" / "cw-elevator-application-V1.0.0.20211103.jar" v2_jar = ( root / "maven-cw-elevator-application" / "cw-elevator-application-starter" / "target" / "cw-elevator-application-2.0.7.jar" ) if not v2_jar.is_file(): v2_jar = ( root / "maven-cw-elevator-application" / "cw-elevator-application-starter" / "target" / "cw-elevator-application-2.0.0.jar" ) if not v2_jar.is_file(): v2_jar = ( root / "maven-cw-elevator-application" / "releases" / "cw-elevator-application-V2.0.6.20260430" / "cw-elevator-application-2.0.6.jar" ) # reactor + dependency:list 时各模块写各自 target/;starter 模块输出才是入口 fat-jar 的 runtime 列表 mvn_list = ( root / "maven-cw-elevator-application" / "cw-elevator-application-starter" / "target" / "v2-maven-deps.txt" ) if not mvn_list.is_file(): mvn_list = root / "maven-cw-elevator-application" / "target" / "v2-maven-deps.txt" if not mvn_list.is_file(): mvn_list = Path("/tmp/v2-maven-deps.txt") out_md = ( root / "docs" / "testing" / "cw-elevator-v1-v2-dependency-diff.md" ) if not v1_jar.is_file(): print("Missing V1 jar:", v1_jar, file=sys.stderr) return 1 if not v2_jar.is_file(): print("Missing V2 jar:", v2_jar, file=sys.stderr) return 1 v1_map = list_outer_nested(v1_jar, "lib/") v2_prefix = detect_lib_prefix(v2_jar) v2_map = list_outer_nested(v2_jar, v2_prefix) v1_by_ga: dict[str, list[str]] = defaultdict(list) for path, t in v1_map.items(): v1_by_ga[key_ga(path, t)].append(path) v2_by_ga: dict[str, list[str]] = defaultdict(list) for path, t in v2_map.items(): v2_by_ga[key_ga(path, t)].append(path) keys1 = set(v1_by_ga.keys()) keys2 = set(v2_by_ga.keys()) only_v1 = sorted(keys1 - keys2) only_v2 = sorted(keys2 - keys1) both = sorted(keys1 & keys2) mvn_rows: list[tuple[str, str, str, str]] = [] if mvn_list.is_file(): mvn_rows = parse_maven_dependency_list(mvn_list) mvn_ga = {f"{g}:{a}:{v}" for g, a, v, _ in mvn_rows} lines: list[str] = [] lines.append("# cw-elevator-application V1 fat-jar 与 V2 fat-jar 依赖差异核对") lines.append("") lines.append("**生成方式**:脚本 `scripts/generate_v1_v2_elevator_dependency_diff.py`(可重复执行覆盖本文件)。") lines.append("") lines.append("## 样本路径") lines.append("") lines.append(f"- **V1**:`{v1_jar.relative_to(root)}`") lines.append(f"- **V2**:`{v2_jar.relative_to(root)}`") lines.append("") lines.append("| 指标 | V1 | V2 |") lines.append("|------|----|----|") lines.append(f"| 嵌套 jar 条目数(lib / BOOT-INF/lib) | {len(v1_map)} | {len(v2_map)} |") lines.append(f"| 解析出唯一坐标 `groupId:artifactId:version` 数 | {len(keys1)} | {len(keys2)} |") v1_ga_v = collect_ga_versions_from_jar_map(v1_map) v2_ga_v = collect_ga_versions_from_jar_map(v2_map) skew_ga = ga_version_skew_rows(v1_ga_v, v2_ga_v) lines.append( f"| 同名 GA、两侧均有解析且 version 集合不一致(§2.2.1)条数 | — | **{len(skew_ga)}** |" ) lines.append( f"| 与 Maven `dependency:list`(runtime)条目数 | — | {len(mvn_rows)} |" ) lines.append("") lines.append("---") lines.append("") lines.append("## 1. Maven 方式(仅 V2 reactor)") lines.append("") lines.append( "在 `maven-cw-elevator-application` 下执行:`mvn -pl cw-elevator-application-starter -am " "dependency:list -DincludeScope=runtime -Dsort=true " "-DoutputFile=target/v2-maven-deps.txt`。`-am` 时每个子模块写各自的 `target/`;" "**§1.1 使用 starter 模块文件**:`cw-elevator-application-starter/target/v2-maven-deps.txt`。" ) lines.append("") lines.append( "**说明**:历史 **V1 运行包** 当前仓库无对应 **1.0** 聚合工程可一键 `dependency:list`;" "V1 的 Maven 坐标视图见 **§2 二进制嵌套 JAR 的 pom.properties**。" ) lines.append("") lines.append("### 1.1 V2 `dependency:list` 全量(runtime)") lines.append("") lines.append("| # | groupId | artifactId | version | scope |") lines.append("|---|---------|--------------|---------|-------|") for i, (g, a, v, s) in enumerate(mvn_rows, 1): lines.append(f"| {i} | `{g}` | `{a}` | `{v}` | `{s}` |") lines.append("") lines.append("---") lines.append("") lines.append("## 2. 二进制方式(嵌套 JAR + pom.properties)") lines.append("") lines.append( f"- **V1**:`lib/*.jar`。\n" f"- **V2**:自动检测为 `{v2_prefix}*.jar`(与 spring-boot-maven-plugin 1.3.x + Boot 1.5 一致时为 `lib/`)。" ) lines.append("") lines.append( "对每个嵌套 jar 读取 `META-INF/maven/**/pom.properties` 得到 `groupId:artifactId:version`;" "无法读取时记为 `?:?:?`(多为无 Maven 元数据的第三方包)。" ) lines.append("") lines.append("### 2.1 仅在 V1 出现的坐标(相对 V2 二进制集合)") lines.append("") lines.append(f"**共 {len(only_v1)} 项**。") lines.append("") lines.append("| groupId:artifactId:version | V1 嵌套路径 |") lines.append("|------------------------------|-------------|") for k in only_v1: paths = ", ".join(f"`{p}`" for p in sorted(v1_by_ga[k])) lines.append(f"| `{k}` | {paths} |") lines.append("") lines.append("### 2.2 仅在 V2 出现的坐标(相对 V1 二进制集合)") lines.append("") lines.append(f"**共 {len(only_v2)} 项**。") lines.append("") lines.append("| groupId:artifactId:version | V2 嵌套路径 |") lines.append("|------------------------------|-------------|") for k in only_v2: paths = ", ".join(f"`{p}`" for p in sorted(v2_by_ga[k])) lines.append(f"| `{k}` | {paths} |") lines.append("") lines.append("### 2.2.1 同名构件(groupId:artifactId)在 V1 与 V2 中的版本集合差异") lines.append("") lines.append( "由嵌套 jar 的 `pom.properties` 聚合:若同一 **GA** 在 V1、V2 中均能解析出版本,且 **version 集合不同**," "则单独列出(与 §2.1 / §2.2 中分列的 `g:a:v` 键互为补充)。**不含**仅一侧出现的 GA。" ) lines.append("") lines.append(f"**共 {len(skew_ga)} 项**。") lines.append("") lines.append("| groupId:artifactId | V1 version(s) | V2 version(s) |") lines.append("|--------------------|---------------|---------------|") for (g, a), v1s, v2s in skew_ga: ga_s = f"`{g}:{a}`" lines.append( f"| {ga_s} | `{', '.join(v1s)}` | `{', '.join(v2s)}` |" ) lines.append("") lines.append("### 2.3 两边均存在且坐标一致的依赖") lines.append("") lines.append(f"**共 {len(both)} 项**(名称版本完全一致)。") lines.append("") lines.append("
") lines.append("展开长表") lines.append("") lines.append("| groupId:artifactId:version |") lines.append("|------------------------------|") for k in both: lines.append(f"| `{k}` |") lines.append("") lines.append("
") lines.append("") lines.append("### 2.4 V2 二进制坐标 vs Maven dependency:list") lines.append("") lines.append( "- **版本字符串不一致**:例如 reactor 在 `dependency:list` 中为 **`2.0-SNAPSHOT`**," "而 fat-jar 内嵌模块 **`cw-elevator-application-*-2.0.6.jar`** 的 `pom.properties` 为 **`2.0.6`**," "字符串比对会视为「仅一侧存在」,属**同名构件不同表述**,非缺失依赖。" ) lines.append( "- **在 dependency:list 中但不在嵌套 jar 元数据中的**:多为 **仅存在于解析树、与本模块 jar 文件命名不一致**,需对照 §1 表格。" ) lines.append( "- **未解析 `unresolved:*`**:见 §3,此类条目不参与坐标相等判断。" ) lines.append("") only_mvn = sorted(mvn_ga - keys2) only_bin = sorted(keys2 - mvn_ga) lines.append(f"- **仅在 Maven list(runtime)**:{len(only_mvn)} 项") lines.append("") if only_mvn: lines.append("|坐标|") lines.append("|----|") for k in only_mvn[:80]: lines.append(f"| `{k}` |") if len(only_mvn) > 80: lines.append(f"| … 其余 {len(only_mvn) - 80} 项省略 |") lines.append("") lines.append(f"- **仅在二进制坐标集合**:{len(only_bin)} 项") lines.append("") if only_bin: lines.append("|坐标|") lines.append("|----|") for k in only_bin[:80]: lines.append(f"| `{k}` |") if len(only_bin) > 80: lines.append(f"| … 其余 {len(only_bin) - 80} 项省略 |") lines.append("") lines.append("---") lines.append("") lines.append("## 3. 无法解析 pom.properties 的嵌套 JAR(仅列文件名)") lines.append("") bad_v1 = [p for p, t in v1_map.items() if t[0] == "?" or t[2] == "?"] bad_v2 = [p for p, t in v2_map.items() if t[0] == "?" or t[2] == "?"] lines.append(f"- **V1** 未解析条目:**{len(bad_v1)}**") for p in sorted(bad_v1)[:50]: lines.append(f" - `{Path(p).name}`") if len(bad_v1) > 50: lines.append(f" - … 省略 {len(bad_v1) - 50} 条") lines.append(f"- **V2** 未解析条目:**{len(bad_v2)}**") for p in sorted(bad_v2)[:50]: lines.append(f" - `{Path(p).name}`") if len(bad_v2) > 50: lines.append(f" - … 省略 {len(bad_v2) - 50} 条") lines.append("") out_md.parent.mkdir(parents=True, exist_ok=True) out_md.write_text("\n".join(lines) + "\n", encoding="utf-8") print("Wrote", out_md) return 0 if __name__ == "__main__": sys.exit(main())