feat(webhook): forward BitAnswer callbacks to platform after first receipt

Made-with: Cursor
This commit is contained in:
2026-04-06 22:40:26 +08:00
parent fc0c4b1930
commit e34b420168
4 changed files with 227 additions and 5 deletions
@@ -1,5 +1,6 @@
package cn.craftlabs.platform.webhook;
import jakarta.servlet.http.HttpServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
@@ -25,16 +26,20 @@ public class CallbackIngestController {
public static final String HEADER_TOKEN = "x-bitanswer-token";
private final CallbackReceiptService receiptService;
private final PlatformCallbackForwarder platformCallbackForwarder;
@Value("${craftlabs.webhook.expected-token:}")
private String expectedToken;
public CallbackIngestController(CallbackReceiptService receiptService) {
public CallbackIngestController(
CallbackReceiptService receiptService, PlatformCallbackForwarder platformCallbackForwarder) {
this.receiptService = receiptService;
this.platformCallbackForwarder = platformCallbackForwarder;
}
@PostMapping("/webhook/bitanswer/callback")
public ResponseEntity<Map<String, String>> ingest(
HttpServletRequest servletRequest,
@RequestHeader(value = HEADER_TOKEN, required = false) String token,
@RequestHeader(value = "Idempotency-Key", required = false) String idempotencyKey,
@RequestBody String rawBody) {
@@ -47,7 +52,11 @@ public class CallbackIngestController {
}
int bytes = rawBody != null ? rawBody.length() : 0;
receiptService.recordReceipt(idempotencyKey, bytes);
CallbackReceiptService.ReceiptOutcome outcome = receiptService.recordReceipt(idempotencyKey, bytes);
if (outcome.type() == CallbackReceiptService.OutcomeType.INSERTED && outcome.receiptId() != null) {
platformCallbackForwarder.forwardAfterReceipt(
servletRequest, rawBody, idempotencyKey, outcome.receiptId());
}
log.info(
"bitanswer callback accepted idempotencyKey={} bytes={}",
@@ -12,6 +12,42 @@ public class CallbackReceiptService {
private static final Logger log = LoggerFactory.getLogger(CallbackReceiptService.class);
public enum OutcomeType {
INSERTED,
DUPLICATE,
SKIPPED_NO_KEY
}
public static final class ReceiptOutcome {
private final OutcomeType type;
private final Long receiptId;
private ReceiptOutcome(OutcomeType type, Long receiptId) {
this.type = type;
this.receiptId = receiptId;
}
public static ReceiptOutcome skipped() {
return new ReceiptOutcome(OutcomeType.SKIPPED_NO_KEY, null);
}
public static ReceiptOutcome duplicate() {
return new ReceiptOutcome(OutcomeType.DUPLICATE, null);
}
public static ReceiptOutcome inserted(long id) {
return new ReceiptOutcome(OutcomeType.INSERTED, id);
}
public OutcomeType type() {
return type;
}
public Long receiptId() {
return receiptId;
}
}
private final WebhookCallbackReceiptMapper mapper;
public CallbackReceiptService(WebhookCallbackReceiptMapper mapper) {
@@ -19,19 +55,21 @@ public class CallbackReceiptService {
}
/**
* 记录幂等键;重复键忽略(对比特仍返回 2xx)。
* 记录幂等键;重复键返回 DUPLICATE(对比特仍返回 2xx)。
*/
public void recordReceipt(String idempotencyKey, int bodyBytes) {
public ReceiptOutcome recordReceipt(String idempotencyKey, int bodyBytes) {
if (idempotencyKey == null || idempotencyKey.isBlank()) {
return;
return ReceiptOutcome.skipped();
}
var row = new WebhookCallbackReceipt();
row.setIdempotencyKey(idempotencyKey.trim());
row.setBodyBytes(bodyBytes);
try {
mapper.insert(row);
return ReceiptOutcome.inserted(row.getId());
} catch (DataIntegrityViolationException e) {
log.debug("callback idempotent replay key={}", idempotencyKey);
return ReceiptOutcome.duplicate();
}
}
}
@@ -0,0 +1,171 @@
package cn.craftlabs.platform.webhook;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import jakarta.servlet.http.HttpServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestClient;
import org.springframework.web.client.RestClientException;
import java.util.function.Consumer;
/**
* 收据持久化后同步投递至 delivery-platform-apiMVP:短超时 + 有限重试)。
*/
@Service
public class PlatformCallbackForwarder {
private static final Logger log = LoggerFactory.getLogger(PlatformCallbackForwarder.class);
private static final String SOURCE_SYSTEM = "BITANSWER";
private final ObjectMapper objectMapper;
private final RestClient restClient;
@Value("${craftlabs.platform.internal.base-url:}")
private String baseUrl;
@Value("${craftlabs.platform.internal.token:}")
private String internalToken;
public PlatformCallbackForwarder(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
this.restClient = RestClient.create();
}
public void forwardAfterReceipt(
HttpServletRequest request,
String rawBody,
String idempotencyKey,
long webhookReceiptId) {
if (!StringUtils.hasText(baseUrl) || !StringUtils.hasText(internalToken)) {
return;
}
JsonNode payloadNode = parsePayloadNode(rawBody);
String externalMessageId = resolveExternalMessageId(payloadNode, idempotencyKey);
if (!StringUtils.hasText(externalMessageId)) {
log.warn("platform forward skipped: no external message id");
return;
}
String schemaVersion = firstNonBlank(textField(payloadNode, "schemaVersion"), "1.0");
String eventType =
firstNonBlank(
textField(payloadNode, "event"),
textField(payloadNode, "event_type"),
textField(payloadNode, "eventType"),
"unknown");
ObjectNode body = objectMapper.createObjectNode();
body.put("schemaVersion", schemaVersion);
body.put("sourceSystem", SOURCE_SYSTEM);
body.put("externalMessageId", externalMessageId.trim());
body.put("eventType", eventType);
body.set("rawPayload", payloadNode);
body.put("webhookReceiptId", String.valueOf(webhookReceiptId));
if (StringUtils.hasText(idempotencyKey)) {
body.put("idempotencyKey", idempotencyKey.trim());
}
String json;
try {
json = objectMapper.writeValueAsString(body);
} catch (Exception e) {
log.warn("platform forward skipped: cannot serialize body {}", e.toString());
return;
}
String url = baseUrl.replaceAll("/+$", "") + "/internal/v1/callback-events";
String idemHeader = StringUtils.hasText(idempotencyKey) ? idempotencyKey.trim() : externalMessageId.trim();
for (int attempt = 0; attempt < 3; attempt++) {
try {
restClient
.post()
.uri(url)
.header("X-Platform-Internal-Token", internalToken)
.header("Idempotency-Key", idemHeader)
.contentType(MediaType.APPLICATION_JSON)
.headers(copyTraceHeaders(request))
.body(json)
.retrieve()
.toBodilessEntity();
return;
} catch (RestClientException e) {
if (attempt == 2) {
log.warn("platform callback forward failed after retries: {}", e.toString());
} else {
try {
Thread.sleep(200L * (attempt + 1));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
}
}
}
}
private static Consumer<HttpHeaders> copyTraceHeaders(HttpServletRequest request) {
return headers -> {
String tp = request.getHeader("traceparent");
if (StringUtils.hasText(tp)) {
headers.add("traceparent", tp);
}
String rid = request.getHeader("X-Request-Id");
if (StringUtils.hasText(rid)) {
headers.add("X-Request-Id", rid);
}
};
}
private JsonNode parsePayloadNode(String rawBody) {
String raw = rawBody != null ? rawBody : "";
try {
return objectMapper.readTree(raw);
} catch (Exception e) {
ObjectNode wrapper = objectMapper.createObjectNode();
wrapper.put("_raw", raw);
return wrapper;
}
}
private static String resolveExternalMessageId(JsonNode payloadNode, String idempotencyKey) {
String fromPayload =
firstNonBlank(
textField(payloadNode, "message_id"),
textField(payloadNode, "messageId"),
payloadNode.isObject() ? textField(payloadNode, "id") : null);
return firstNonBlank(fromPayload, idempotencyKey);
}
private static String textField(JsonNode node, String field) {
if (node == null || !node.isObject()) {
return null;
}
JsonNode n = node.get(field);
if (n != null && n.isTextual()) {
String t = n.asText();
return StringUtils.hasText(t) ? t.trim() : null;
}
return null;
}
private static String firstNonBlank(String... values) {
if (values == null) {
return null;
}
for (String v : values) {
if (StringUtils.hasText(v)) {
return v.trim();
}
}
return null;
}
}
@@ -30,3 +30,7 @@ management:
craftlabs:
webhook:
expected-token: ${CRAFTLABS_WEBHOOK_EXPECTED_TOKEN:}
platform:
internal:
base-url: ${PLATFORM_INTERNAL_BASE_URL:}
token: ${CRAFTLABS_PLATFORM_INTERNAL_TOKEN:}