diff --git a/services/license-webhook-ingress/src/main/java/cn/craftlabs/platform/webhook/CallbackIngestController.java b/services/license-webhook-ingress/src/main/java/cn/craftlabs/platform/webhook/CallbackIngestController.java index 7a38bad..2ce67e0 100644 --- a/services/license-webhook-ingress/src/main/java/cn/craftlabs/platform/webhook/CallbackIngestController.java +++ b/services/license-webhook-ingress/src/main/java/cn/craftlabs/platform/webhook/CallbackIngestController.java @@ -1,5 +1,6 @@ package cn.craftlabs.platform.webhook; +import jakarta.servlet.http.HttpServletRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; @@ -25,16 +26,20 @@ public class CallbackIngestController { public static final String HEADER_TOKEN = "x-bitanswer-token"; private final CallbackReceiptService receiptService; + private final PlatformCallbackForwarder platformCallbackForwarder; @Value("${craftlabs.webhook.expected-token:}") private String expectedToken; - public CallbackIngestController(CallbackReceiptService receiptService) { + public CallbackIngestController( + CallbackReceiptService receiptService, PlatformCallbackForwarder platformCallbackForwarder) { this.receiptService = receiptService; + this.platformCallbackForwarder = platformCallbackForwarder; } @PostMapping("/webhook/bitanswer/callback") public ResponseEntity> ingest( + HttpServletRequest servletRequest, @RequestHeader(value = HEADER_TOKEN, required = false) String token, @RequestHeader(value = "Idempotency-Key", required = false) String idempotencyKey, @RequestBody String rawBody) { @@ -47,7 +52,11 @@ public class CallbackIngestController { } int bytes = rawBody != null ? rawBody.length() : 0; - receiptService.recordReceipt(idempotencyKey, bytes); + CallbackReceiptService.ReceiptOutcome outcome = receiptService.recordReceipt(idempotencyKey, bytes); + if (outcome.type() == CallbackReceiptService.OutcomeType.INSERTED && outcome.receiptId() != null) { + platformCallbackForwarder.forwardAfterReceipt( + servletRequest, rawBody, idempotencyKey, outcome.receiptId()); + } log.info( "bitanswer callback accepted idempotencyKey={} bytes={}", diff --git a/services/license-webhook-ingress/src/main/java/cn/craftlabs/platform/webhook/CallbackReceiptService.java b/services/license-webhook-ingress/src/main/java/cn/craftlabs/platform/webhook/CallbackReceiptService.java index eba7c83..f276b8d 100644 --- a/services/license-webhook-ingress/src/main/java/cn/craftlabs/platform/webhook/CallbackReceiptService.java +++ b/services/license-webhook-ingress/src/main/java/cn/craftlabs/platform/webhook/CallbackReceiptService.java @@ -12,6 +12,42 @@ public class CallbackReceiptService { private static final Logger log = LoggerFactory.getLogger(CallbackReceiptService.class); + public enum OutcomeType { + INSERTED, + DUPLICATE, + SKIPPED_NO_KEY + } + + public static final class ReceiptOutcome { + private final OutcomeType type; + private final Long receiptId; + + private ReceiptOutcome(OutcomeType type, Long receiptId) { + this.type = type; + this.receiptId = receiptId; + } + + public static ReceiptOutcome skipped() { + return new ReceiptOutcome(OutcomeType.SKIPPED_NO_KEY, null); + } + + public static ReceiptOutcome duplicate() { + return new ReceiptOutcome(OutcomeType.DUPLICATE, null); + } + + public static ReceiptOutcome inserted(long id) { + return new ReceiptOutcome(OutcomeType.INSERTED, id); + } + + public OutcomeType type() { + return type; + } + + public Long receiptId() { + return receiptId; + } + } + private final WebhookCallbackReceiptMapper mapper; public CallbackReceiptService(WebhookCallbackReceiptMapper mapper) { @@ -19,19 +55,21 @@ public class CallbackReceiptService { } /** - * 记录幂等键;重复键忽略(对比特仍返回 2xx)。 + * 记录幂等键;重复键返回 DUPLICATE(对比特仍返回 2xx)。 */ - public void recordReceipt(String idempotencyKey, int bodyBytes) { + public ReceiptOutcome recordReceipt(String idempotencyKey, int bodyBytes) { if (idempotencyKey == null || idempotencyKey.isBlank()) { - return; + return ReceiptOutcome.skipped(); } var row = new WebhookCallbackReceipt(); row.setIdempotencyKey(idempotencyKey.trim()); row.setBodyBytes(bodyBytes); try { mapper.insert(row); + return ReceiptOutcome.inserted(row.getId()); } catch (DataIntegrityViolationException e) { log.debug("callback idempotent replay key={}", idempotencyKey); + return ReceiptOutcome.duplicate(); } } } diff --git a/services/license-webhook-ingress/src/main/java/cn/craftlabs/platform/webhook/PlatformCallbackForwarder.java b/services/license-webhook-ingress/src/main/java/cn/craftlabs/platform/webhook/PlatformCallbackForwarder.java new file mode 100644 index 0000000..2fa0ae9 --- /dev/null +++ b/services/license-webhook-ingress/src/main/java/cn/craftlabs/platform/webhook/PlatformCallbackForwarder.java @@ -0,0 +1,171 @@ +package cn.craftlabs.platform.webhook; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import jakarta.servlet.http.HttpServletRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; +import org.springframework.web.client.RestClient; +import org.springframework.web.client.RestClientException; + +import java.util.function.Consumer; + +/** + * 收据持久化后同步投递至 delivery-platform-api(MVP:短超时 + 有限重试)。 + */ +@Service +public class PlatformCallbackForwarder { + + private static final Logger log = LoggerFactory.getLogger(PlatformCallbackForwarder.class); + + private static final String SOURCE_SYSTEM = "BITANSWER"; + + private final ObjectMapper objectMapper; + private final RestClient restClient; + + @Value("${craftlabs.platform.internal.base-url:}") + private String baseUrl; + + @Value("${craftlabs.platform.internal.token:}") + private String internalToken; + + public PlatformCallbackForwarder(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + this.restClient = RestClient.create(); + } + + public void forwardAfterReceipt( + HttpServletRequest request, + String rawBody, + String idempotencyKey, + long webhookReceiptId) { + if (!StringUtils.hasText(baseUrl) || !StringUtils.hasText(internalToken)) { + return; + } + JsonNode payloadNode = parsePayloadNode(rawBody); + String externalMessageId = resolveExternalMessageId(payloadNode, idempotencyKey); + if (!StringUtils.hasText(externalMessageId)) { + log.warn("platform forward skipped: no external message id"); + return; + } + String schemaVersion = firstNonBlank(textField(payloadNode, "schemaVersion"), "1.0"); + String eventType = + firstNonBlank( + textField(payloadNode, "event"), + textField(payloadNode, "event_type"), + textField(payloadNode, "eventType"), + "unknown"); + + ObjectNode body = objectMapper.createObjectNode(); + body.put("schemaVersion", schemaVersion); + body.put("sourceSystem", SOURCE_SYSTEM); + body.put("externalMessageId", externalMessageId.trim()); + body.put("eventType", eventType); + body.set("rawPayload", payloadNode); + body.put("webhookReceiptId", String.valueOf(webhookReceiptId)); + if (StringUtils.hasText(idempotencyKey)) { + body.put("idempotencyKey", idempotencyKey.trim()); + } + + String json; + try { + json = objectMapper.writeValueAsString(body); + } catch (Exception e) { + log.warn("platform forward skipped: cannot serialize body {}", e.toString()); + return; + } + + String url = baseUrl.replaceAll("/+$", "") + "/internal/v1/callback-events"; + String idemHeader = StringUtils.hasText(idempotencyKey) ? idempotencyKey.trim() : externalMessageId.trim(); + + for (int attempt = 0; attempt < 3; attempt++) { + try { + restClient + .post() + .uri(url) + .header("X-Platform-Internal-Token", internalToken) + .header("Idempotency-Key", idemHeader) + .contentType(MediaType.APPLICATION_JSON) + .headers(copyTraceHeaders(request)) + .body(json) + .retrieve() + .toBodilessEntity(); + return; + } catch (RestClientException e) { + if (attempt == 2) { + log.warn("platform callback forward failed after retries: {}", e.toString()); + } else { + try { + Thread.sleep(200L * (attempt + 1)); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + } + } + } + } + + private static Consumer copyTraceHeaders(HttpServletRequest request) { + return headers -> { + String tp = request.getHeader("traceparent"); + if (StringUtils.hasText(tp)) { + headers.add("traceparent", tp); + } + String rid = request.getHeader("X-Request-Id"); + if (StringUtils.hasText(rid)) { + headers.add("X-Request-Id", rid); + } + }; + } + + private JsonNode parsePayloadNode(String rawBody) { + String raw = rawBody != null ? rawBody : ""; + try { + return objectMapper.readTree(raw); + } catch (Exception e) { + ObjectNode wrapper = objectMapper.createObjectNode(); + wrapper.put("_raw", raw); + return wrapper; + } + } + + private static String resolveExternalMessageId(JsonNode payloadNode, String idempotencyKey) { + String fromPayload = + firstNonBlank( + textField(payloadNode, "message_id"), + textField(payloadNode, "messageId"), + payloadNode.isObject() ? textField(payloadNode, "id") : null); + return firstNonBlank(fromPayload, idempotencyKey); + } + + private static String textField(JsonNode node, String field) { + if (node == null || !node.isObject()) { + return null; + } + JsonNode n = node.get(field); + if (n != null && n.isTextual()) { + String t = n.asText(); + return StringUtils.hasText(t) ? t.trim() : null; + } + return null; + } + + private static String firstNonBlank(String... values) { + if (values == null) { + return null; + } + for (String v : values) { + if (StringUtils.hasText(v)) { + return v.trim(); + } + } + return null; + } +} diff --git a/services/license-webhook-ingress/src/main/resources/application.yml b/services/license-webhook-ingress/src/main/resources/application.yml index c842819..f417d7d 100644 --- a/services/license-webhook-ingress/src/main/resources/application.yml +++ b/services/license-webhook-ingress/src/main/resources/application.yml @@ -30,3 +30,7 @@ management: craftlabs: webhook: expected-token: ${CRAFTLABS_WEBHOOK_EXPECTED_TOKEN:} + platform: + internal: + base-url: ${PLATFORM_INTERNAL_BASE_URL:} + token: ${CRAFTLABS_PLATFORM_INTERNAL_TOKEN:}