mirror of
https://github.com/hpd840321/craftlabs-authorization-sdk.git
synced 2026-06-09 10:00:30 +08:00
feat(i7): async webhook delivery queue, OPS RBAC, UI role routing; docs and runbook
- Architect: I7_DESIGN.md, I7_IMPLEMENTATION_REVIEW.md; parallel index + track B - Backend: @EnableMethodSecurity; OPS login; CallbackInbox PreAuthorize; IntegrationCatalog triple role - Webhook: V2 webhook_platform_delivery; planner + scheduler + single-shot forwarder; tests - Frontend: Pinia hasAnyRole; MainLayout/HomeView/router for OPS vs dev - Runbook §10.5 delivery config Made-with: Cursor
This commit is contained in:
+4
-5
@@ -26,15 +26,15 @@ public class CallbackIngestController {
|
||||
public static final String HEADER_TOKEN = "x-bitanswer-token";
|
||||
|
||||
private final CallbackReceiptService receiptService;
|
||||
private final PlatformCallbackForwarder platformCallbackForwarder;
|
||||
private final PlatformDeliveryService platformDeliveryService;
|
||||
|
||||
@Value("${craftlabs.webhook.expected-token:}")
|
||||
private String expectedToken;
|
||||
|
||||
public CallbackIngestController(
|
||||
CallbackReceiptService receiptService, PlatformCallbackForwarder platformCallbackForwarder) {
|
||||
CallbackReceiptService receiptService, PlatformDeliveryService platformDeliveryService) {
|
||||
this.receiptService = receiptService;
|
||||
this.platformCallbackForwarder = platformCallbackForwarder;
|
||||
this.platformDeliveryService = platformDeliveryService;
|
||||
}
|
||||
|
||||
@PostMapping("/webhook/bitanswer/callback")
|
||||
@@ -54,8 +54,7 @@ public class CallbackIngestController {
|
||||
int bytes = rawBody != null ? rawBody.length() : 0;
|
||||
CallbackReceiptService.ReceiptOutcome outcome = receiptService.recordReceipt(idempotencyKey, bytes);
|
||||
if (outcome.type() == CallbackReceiptService.OutcomeType.INSERTED && outcome.receiptId() != null) {
|
||||
platformCallbackForwarder.forwardAfterReceipt(
|
||||
servletRequest, rawBody, idempotencyKey, outcome.receiptId());
|
||||
platformDeliveryService.enqueueAfterReceipt(servletRequest, rawBody, idempotencyKey, outcome.receiptId());
|
||||
}
|
||||
|
||||
log.info(
|
||||
|
||||
+6
@@ -0,0 +1,6 @@
|
||||
package cn.craftlabs.platform.webhook;
|
||||
|
||||
/**
|
||||
* 发往 {@code POST /internal/v1/callback-events} 的一次投递计划。
|
||||
*/
|
||||
public record PlannedPlatformDelivery(String requestBodyJson, String idempotencyHeader, String traceHeadersJson) {}
|
||||
+34
-125
@@ -1,9 +1,6 @@
|
||||
package cn.craftlabs.platform.webhook;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
@@ -17,17 +14,15 @@ import org.springframework.web.client.RestClientException;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* 收据持久化后同步投递至 delivery-platform-api(MVP:短超时 + 有限重试)。
|
||||
* 向 {@code delivery-platform-api} 发送单次 HTTP 投递(重试由 {@link PlatformDeliveryService} 调度)。
|
||||
*/
|
||||
@Service
|
||||
public class PlatformCallbackForwarder {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PlatformCallbackForwarder.class);
|
||||
|
||||
private static final String SOURCE_SYSTEM = "BITANSWER";
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
private final RestClient restClient;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
@Value("${craftlabs.platform.internal.base-url:}")
|
||||
private String baseUrl;
|
||||
@@ -40,132 +35,46 @@ public class PlatformCallbackForwarder {
|
||||
this.restClient = RestClient.create();
|
||||
}
|
||||
|
||||
public void forwardAfterReceipt(
|
||||
HttpServletRequest request,
|
||||
String rawBody,
|
||||
String idempotencyKey,
|
||||
long webhookReceiptId) {
|
||||
/**
|
||||
* 单次 POST;成功无声返回;失败抛 {@link RestClientException} 由调用方记重试/DEAD。
|
||||
*/
|
||||
public void postOnce(String requestBodyJson, String idempotencyHeader, String traceHeadersJson)
|
||||
throws RestClientException {
|
||||
if (!StringUtils.hasText(baseUrl) || !StringUtils.hasText(internalToken)) {
|
||||
return;
|
||||
throw new IllegalStateException("platform base-url or token not configured");
|
||||
}
|
||||
JsonNode payloadNode = parsePayloadNode(rawBody);
|
||||
String externalMessageId = resolveExternalMessageId(payloadNode, idempotencyKey);
|
||||
if (!StringUtils.hasText(externalMessageId)) {
|
||||
log.warn("platform forward skipped: no external message id");
|
||||
return;
|
||||
}
|
||||
String schemaVersion = firstNonBlank(textField(payloadNode, "schemaVersion"), "1.0");
|
||||
String eventType =
|
||||
firstNonBlank(
|
||||
textField(payloadNode, "event"),
|
||||
textField(payloadNode, "event_type"),
|
||||
textField(payloadNode, "eventType"),
|
||||
"unknown");
|
||||
|
||||
ObjectNode body = objectMapper.createObjectNode();
|
||||
body.put("schemaVersion", schemaVersion);
|
||||
body.put("sourceSystem", SOURCE_SYSTEM);
|
||||
body.put("externalMessageId", externalMessageId.trim());
|
||||
body.put("eventType", eventType);
|
||||
body.set("rawPayload", payloadNode);
|
||||
body.put("webhookReceiptId", String.valueOf(webhookReceiptId));
|
||||
if (StringUtils.hasText(idempotencyKey)) {
|
||||
body.put("idempotencyKey", idempotencyKey.trim());
|
||||
}
|
||||
|
||||
String json;
|
||||
try {
|
||||
json = objectMapper.writeValueAsString(body);
|
||||
} catch (Exception e) {
|
||||
log.warn("platform forward skipped: cannot serialize body {}", e.toString());
|
||||
return;
|
||||
}
|
||||
|
||||
String url = baseUrl.replaceAll("/+$", "") + "/internal/v1/callback-events";
|
||||
String idemHeader = StringUtils.hasText(idempotencyKey) ? idempotencyKey.trim() : externalMessageId.trim();
|
||||
|
||||
for (int attempt = 0; attempt < 3; attempt++) {
|
||||
try {
|
||||
restClient
|
||||
.post()
|
||||
.uri(url)
|
||||
.header("X-Platform-Internal-Token", internalToken)
|
||||
.header("Idempotency-Key", idemHeader)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.headers(copyTraceHeaders(request))
|
||||
.body(json)
|
||||
.retrieve()
|
||||
.toBodilessEntity();
|
||||
return;
|
||||
} catch (RestClientException e) {
|
||||
if (attempt == 2) {
|
||||
log.warn("platform callback forward failed after retries: {}", e.toString());
|
||||
} else {
|
||||
try {
|
||||
Thread.sleep(200L * (attempt + 1));
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
restClient
|
||||
.post()
|
||||
.uri(url)
|
||||
.header("X-Platform-Internal-Token", internalToken)
|
||||
.header("Idempotency-Key", idempotencyHeader)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.headers(applyTrace(traceHeadersJson))
|
||||
.body(requestBodyJson)
|
||||
.retrieve()
|
||||
.toBodilessEntity();
|
||||
}
|
||||
|
||||
private static Consumer<HttpHeaders> copyTraceHeaders(HttpServletRequest request) {
|
||||
private Consumer<HttpHeaders> applyTrace(String traceHeadersJson) {
|
||||
return headers -> {
|
||||
String tp = request.getHeader("traceparent");
|
||||
if (StringUtils.hasText(tp)) {
|
||||
headers.add("traceparent", tp);
|
||||
if (!StringUtils.hasText(traceHeadersJson)) {
|
||||
return;
|
||||
}
|
||||
String rid = request.getHeader("X-Request-Id");
|
||||
if (StringUtils.hasText(rid)) {
|
||||
headers.add("X-Request-Id", rid);
|
||||
try {
|
||||
var node = objectMapper.readTree(traceHeadersJson);
|
||||
if (node.isObject()) {
|
||||
node.fields()
|
||||
.forEachRemaining(
|
||||
e -> {
|
||||
if (e.getValue().isTextual()) {
|
||||
headers.add(e.getKey(), e.getValue().asText());
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.debug("skip trace headers: {}", e.toString());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private JsonNode parsePayloadNode(String rawBody) {
|
||||
String raw = rawBody != null ? rawBody : "";
|
||||
try {
|
||||
return objectMapper.readTree(raw);
|
||||
} catch (Exception e) {
|
||||
ObjectNode wrapper = objectMapper.createObjectNode();
|
||||
wrapper.put("_raw", raw);
|
||||
return wrapper;
|
||||
}
|
||||
}
|
||||
|
||||
private static String resolveExternalMessageId(JsonNode payloadNode, String idempotencyKey) {
|
||||
String fromPayload =
|
||||
firstNonBlank(
|
||||
textField(payloadNode, "message_id"),
|
||||
textField(payloadNode, "messageId"),
|
||||
payloadNode.isObject() ? textField(payloadNode, "id") : null);
|
||||
return firstNonBlank(fromPayload, idempotencyKey);
|
||||
}
|
||||
|
||||
private static String textField(JsonNode node, String field) {
|
||||
if (node == null || !node.isObject()) {
|
||||
return null;
|
||||
}
|
||||
JsonNode n = node.get(field);
|
||||
if (n != null && n.isTextual()) {
|
||||
String t = n.asText();
|
||||
return StringUtils.hasText(t) ? t.trim() : null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static String firstNonBlank(String... values) {
|
||||
if (values == null) {
|
||||
return null;
|
||||
}
|
||||
for (String v : values) {
|
||||
if (StringUtils.hasText(v)) {
|
||||
return v.trim();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
+134
@@ -0,0 +1,134 @@
|
||||
package cn.craftlabs.platform.webhook;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* 从 BitAnswer 原始 body 构造发往平台的内部 API 正文与幂等键。
|
||||
*/
|
||||
@Component
|
||||
public class PlatformCallbackRequestPlanner {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PlatformCallbackRequestPlanner.class);
|
||||
private static final String SOURCE_SYSTEM = "BITANSWER";
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public PlatformCallbackRequestPlanner(ObjectMapper objectMapper) {
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
public Optional<PlannedPlatformDelivery> plan(
|
||||
HttpServletRequest request,
|
||||
String rawBody,
|
||||
String idempotencyKey,
|
||||
long webhookReceiptId) {
|
||||
JsonNode payloadNode = parsePayloadNode(rawBody);
|
||||
String externalMessageId = resolveExternalMessageId(payloadNode, idempotencyKey);
|
||||
if (!StringUtils.hasText(externalMessageId)) {
|
||||
log.warn("platform enqueue skipped: no external message id");
|
||||
return Optional.empty();
|
||||
}
|
||||
String schemaVersion = firstNonBlank(textField(payloadNode, "schemaVersion"), "1.0");
|
||||
String eventType =
|
||||
firstNonBlank(
|
||||
textField(payloadNode, "event"),
|
||||
textField(payloadNode, "event_type"),
|
||||
textField(payloadNode, "eventType"),
|
||||
"unknown");
|
||||
|
||||
ObjectNode body = objectMapper.createObjectNode();
|
||||
body.put("schemaVersion", schemaVersion);
|
||||
body.put("sourceSystem", SOURCE_SYSTEM);
|
||||
body.put("externalMessageId", externalMessageId.trim());
|
||||
body.put("eventType", eventType);
|
||||
body.set("rawPayload", payloadNode);
|
||||
body.put("webhookReceiptId", String.valueOf(webhookReceiptId));
|
||||
if (StringUtils.hasText(idempotencyKey)) {
|
||||
body.put("idempotencyKey", idempotencyKey.trim());
|
||||
}
|
||||
|
||||
String json;
|
||||
try {
|
||||
json = objectMapper.writeValueAsString(body);
|
||||
} catch (Exception e) {
|
||||
log.warn("platform enqueue skipped: cannot serialize body {}", e.toString());
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
String idemHeader = StringUtils.hasText(idempotencyKey) ? idempotencyKey.trim() : externalMessageId.trim();
|
||||
String traceJson = buildTraceJson(request);
|
||||
return Optional.of(new PlannedPlatformDelivery(json, idemHeader, traceJson));
|
||||
}
|
||||
|
||||
private String buildTraceJson(HttpServletRequest request) {
|
||||
Map<String, String> m = new LinkedHashMap<>();
|
||||
String tp = request.getHeader("traceparent");
|
||||
if (StringUtils.hasText(tp)) {
|
||||
m.put("traceparent", tp.trim());
|
||||
}
|
||||
String rid = request.getHeader("X-Request-Id");
|
||||
if (StringUtils.hasText(rid)) {
|
||||
m.put("X-Request-Id", rid.trim());
|
||||
}
|
||||
try {
|
||||
return m.isEmpty() ? null : objectMapper.writeValueAsString(m);
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private JsonNode parsePayloadNode(String rawBody) {
|
||||
String raw = rawBody != null ? rawBody : "";
|
||||
try {
|
||||
return objectMapper.readTree(raw);
|
||||
} catch (Exception e) {
|
||||
ObjectNode wrapper = objectMapper.createObjectNode();
|
||||
wrapper.put("_raw", raw);
|
||||
return wrapper;
|
||||
}
|
||||
}
|
||||
|
||||
private static String resolveExternalMessageId(JsonNode payloadNode, String idempotencyKey) {
|
||||
String fromPayload =
|
||||
firstNonBlank(
|
||||
textField(payloadNode, "message_id"),
|
||||
textField(payloadNode, "messageId"),
|
||||
payloadNode.isObject() ? textField(payloadNode, "id") : null);
|
||||
return firstNonBlank(fromPayload, idempotencyKey);
|
||||
}
|
||||
|
||||
private static String textField(JsonNode node, String field) {
|
||||
if (node == null || !node.isObject()) {
|
||||
return null;
|
||||
}
|
||||
JsonNode n = node.get(field);
|
||||
if (n != null && n.isTextual()) {
|
||||
String t = n.asText();
|
||||
return StringUtils.hasText(t) ? t.trim() : null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static String firstNonBlank(String... values) {
|
||||
if (values == null) {
|
||||
return null;
|
||||
}
|
||||
for (String v : values) {
|
||||
if (StringUtils.hasText(v)) {
|
||||
return v.trim();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
+32
@@ -0,0 +1,32 @@
|
||||
package cn.craftlabs.platform.webhook;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* I7:周期拉取 {@link PlatformDeliveryService} 待发送行。
|
||||
*/
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "craftlabs.platform.delivery.scheduler-enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class PlatformDeliveryScheduler {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PlatformDeliveryScheduler.class);
|
||||
|
||||
private final PlatformDeliveryService deliveryService;
|
||||
|
||||
public PlatformDeliveryScheduler(PlatformDeliveryService deliveryService) {
|
||||
this.deliveryService = deliveryService;
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelayString = "${craftlabs.platform.delivery.tick-ms:5000}")
|
||||
public void tick() {
|
||||
try {
|
||||
deliveryService.processDueBatch();
|
||||
} catch (Exception e) {
|
||||
log.warn("platform delivery batch failed: {}", e.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
+154
@@ -0,0 +1,154 @@
|
||||
package cn.craftlabs.platform.webhook;
|
||||
|
||||
import cn.craftlabs.platform.webhook.persistence.WebhookPlatformDelivery;
|
||||
import cn.craftlabs.platform.webhook.persistence.WebhookPlatformDeliveryMapper;
|
||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.client.RestClientException;
|
||||
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* I7:平台投递入库 + 异步拉取发送。
|
||||
*/
|
||||
@Service
|
||||
public class PlatformDeliveryService {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PlatformDeliveryService.class);
|
||||
|
||||
public static final String STATUS_PENDING = "PENDING";
|
||||
public static final String STATUS_SENT = "SENT";
|
||||
public static final String STATUS_DEAD = "DEAD";
|
||||
|
||||
private final PlatformCallbackRequestPlanner planner;
|
||||
private final PlatformCallbackForwarder forwarder;
|
||||
private final WebhookPlatformDeliveryMapper deliveryMapper;
|
||||
|
||||
@Value("${craftlabs.platform.internal.base-url:}")
|
||||
private String baseUrl;
|
||||
|
||||
@Value("${craftlabs.platform.internal.token:}")
|
||||
private String internalToken;
|
||||
|
||||
@Value("${craftlabs.platform.delivery.max-attempts:8}")
|
||||
private int maxAttempts;
|
||||
|
||||
@Value("${craftlabs.platform.delivery.batch-size:20}")
|
||||
private int batchSize;
|
||||
|
||||
public PlatformDeliveryService(
|
||||
PlatformCallbackRequestPlanner planner,
|
||||
PlatformCallbackForwarder forwarder,
|
||||
WebhookPlatformDeliveryMapper deliveryMapper) {
|
||||
this.planner = planner;
|
||||
this.forwarder = forwarder;
|
||||
this.deliveryMapper = deliveryMapper;
|
||||
}
|
||||
|
||||
public void enqueueAfterReceipt(
|
||||
HttpServletRequest request, String rawBody, String idempotencyKey, long receiptId) {
|
||||
if (!StringUtils.hasText(baseUrl) || !StringUtils.hasText(internalToken)) {
|
||||
return;
|
||||
}
|
||||
var planned = planner.plan(request, rawBody, idempotencyKey, receiptId);
|
||||
if (planned.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
PlannedPlatformDelivery p = planned.get();
|
||||
OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC);
|
||||
var row = new WebhookPlatformDelivery();
|
||||
row.setReceiptId(receiptId);
|
||||
row.setIdempotencyKey(p.idempotencyHeader());
|
||||
row.setRequestBody(p.requestBodyJson());
|
||||
row.setTraceHeadersJson(p.traceHeadersJson());
|
||||
row.setStatus(STATUS_PENDING);
|
||||
row.setAttempts(0);
|
||||
row.setNextRetryAt(null);
|
||||
row.setCreatedAt(now);
|
||||
row.setUpdatedAt(now);
|
||||
try {
|
||||
deliveryMapper.insert(row);
|
||||
} catch (Exception e) {
|
||||
log.warn("platform delivery enqueue failed receiptId={}: {}", receiptId, e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
public void processDueBatch() {
|
||||
if (!StringUtils.hasText(baseUrl) || !StringUtils.hasText(internalToken)) {
|
||||
return;
|
||||
}
|
||||
List<WebhookPlatformDelivery> due = deliveryMapper.selectPendingDue(batchSize);
|
||||
for (WebhookPlatformDelivery d : due) {
|
||||
processOne(d);
|
||||
}
|
||||
}
|
||||
|
||||
private void processOne(WebhookPlatformDelivery d) {
|
||||
OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC);
|
||||
try {
|
||||
String idem =
|
||||
StringUtils.hasText(d.getIdempotencyKey())
|
||||
? d.getIdempotencyKey().trim()
|
||||
: inferIdempotencyFromBody(d.getRequestBody());
|
||||
forwarder.postOnce(d.getRequestBody(), idem, d.getTraceHeadersJson());
|
||||
d.setStatus(STATUS_SENT);
|
||||
d.setUpdatedAt(now);
|
||||
d.setLastError(null);
|
||||
deliveryMapper.updateById(d);
|
||||
} catch (RestClientException | IllegalStateException e) {
|
||||
int nextAttempt = (d.getAttempts() == null ? 0 : d.getAttempts()) + 1;
|
||||
d.setAttempts(nextAttempt);
|
||||
d.setLastError(trimError(e));
|
||||
d.setUpdatedAt(now);
|
||||
if (nextAttempt >= maxAttempts) {
|
||||
d.setStatus(STATUS_DEAD);
|
||||
d.setNextRetryAt(null);
|
||||
log.warn("platform delivery DEAD id={} attempts={} err={}", d.getId(), nextAttempt, d.getLastError());
|
||||
} else {
|
||||
long backoffMs = Math.min(60_000L, 500L * (1L << Math.min(nextAttempt, 10)));
|
||||
d.setNextRetryAt(now.plusNanos(backoffMs * 1_000_000L));
|
||||
log.debug(
|
||||
"platform delivery retry scheduled id={} attempt={} next={}",
|
||||
d.getId(),
|
||||
nextAttempt,
|
||||
d.getNextRetryAt());
|
||||
}
|
||||
deliveryMapper.updateById(d);
|
||||
}
|
||||
}
|
||||
|
||||
private static String inferIdempotencyFromBody(String json) {
|
||||
if (!StringUtils.hasText(json)) {
|
||||
return "missing-body";
|
||||
}
|
||||
try {
|
||||
var om = new com.fasterxml.jackson.databind.ObjectMapper();
|
||||
var n = om.readTree(json);
|
||||
if (n.hasNonNull("externalMessageId")) {
|
||||
return n.get("externalMessageId").asText();
|
||||
}
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
private static String trimError(Throwable e) {
|
||||
String m = e.getMessage();
|
||||
if (!StringUtils.hasText(m)) {
|
||||
m = e.getClass().getSimpleName();
|
||||
}
|
||||
return m.length() > 2000 ? m.substring(0, 2000) : m;
|
||||
}
|
||||
|
||||
/** 单测:出站队列行数。 */
|
||||
public long countAll() {
|
||||
return deliveryMapper.selectCount(Wrappers.emptyWrapper());
|
||||
}
|
||||
}
|
||||
+2
@@ -3,8 +3,10 @@ package cn.craftlabs.platform.webhook;
|
||||
import org.mybatis.spring.annotation.MapperScan;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableScheduling
|
||||
@MapperScan("cn.craftlabs.platform.webhook.persistence")
|
||||
public class WebhookApplication {
|
||||
|
||||
|
||||
+114
@@ -0,0 +1,114 @@
|
||||
package cn.craftlabs.platform.webhook.persistence;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType;
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
|
||||
import java.time.OffsetDateTime;
|
||||
|
||||
@TableName("webhook_platform_delivery")
|
||||
public class WebhookPlatformDelivery {
|
||||
|
||||
@TableId(type = IdType.AUTO)
|
||||
private Long id;
|
||||
|
||||
private Long receiptId;
|
||||
private String idempotencyKey;
|
||||
private String requestBody;
|
||||
private String traceHeadersJson;
|
||||
/** PENDING / SENT / DEAD */
|
||||
private String status;
|
||||
private Integer attempts;
|
||||
private String lastError;
|
||||
private OffsetDateTime nextRetryAt;
|
||||
private OffsetDateTime createdAt;
|
||||
private OffsetDateTime updatedAt;
|
||||
|
||||
public Long getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(Long id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public Long getReceiptId() {
|
||||
return receiptId;
|
||||
}
|
||||
|
||||
public void setReceiptId(Long receiptId) {
|
||||
this.receiptId = receiptId;
|
||||
}
|
||||
|
||||
public String getIdempotencyKey() {
|
||||
return idempotencyKey;
|
||||
}
|
||||
|
||||
public void setIdempotencyKey(String idempotencyKey) {
|
||||
this.idempotencyKey = idempotencyKey;
|
||||
}
|
||||
|
||||
public String getRequestBody() {
|
||||
return requestBody;
|
||||
}
|
||||
|
||||
public void setRequestBody(String requestBody) {
|
||||
this.requestBody = requestBody;
|
||||
}
|
||||
|
||||
public String getTraceHeadersJson() {
|
||||
return traceHeadersJson;
|
||||
}
|
||||
|
||||
public void setTraceHeadersJson(String traceHeadersJson) {
|
||||
this.traceHeadersJson = traceHeadersJson;
|
||||
}
|
||||
|
||||
public String getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public void setStatus(String status) {
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public Integer getAttempts() {
|
||||
return attempts;
|
||||
}
|
||||
|
||||
public void setAttempts(Integer attempts) {
|
||||
this.attempts = attempts;
|
||||
}
|
||||
|
||||
public String getLastError() {
|
||||
return lastError;
|
||||
}
|
||||
|
||||
public void setLastError(String lastError) {
|
||||
this.lastError = lastError;
|
||||
}
|
||||
|
||||
public OffsetDateTime getNextRetryAt() {
|
||||
return nextRetryAt;
|
||||
}
|
||||
|
||||
public void setNextRetryAt(OffsetDateTime nextRetryAt) {
|
||||
this.nextRetryAt = nextRetryAt;
|
||||
}
|
||||
|
||||
public OffsetDateTime getCreatedAt() {
|
||||
return createdAt;
|
||||
}
|
||||
|
||||
public void setCreatedAt(OffsetDateTime createdAt) {
|
||||
this.createdAt = createdAt;
|
||||
}
|
||||
|
||||
public OffsetDateTime getUpdatedAt() {
|
||||
return updatedAt;
|
||||
}
|
||||
|
||||
public void setUpdatedAt(OffsetDateTime updatedAt) {
|
||||
this.updatedAt = updatedAt;
|
||||
}
|
||||
}
|
||||
+22
@@ -0,0 +1,22 @@
|
||||
package cn.craftlabs.platform.webhook.persistence;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
import org.apache.ibatis.annotations.Select;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Mapper
|
||||
public interface WebhookPlatformDeliveryMapper extends BaseMapper<WebhookPlatformDelivery> {
|
||||
|
||||
@Select(
|
||||
"""
|
||||
SELECT * FROM webhook_platform_delivery
|
||||
WHERE status = 'PENDING'
|
||||
AND (next_retry_at IS NULL OR next_retry_at <= CURRENT_TIMESTAMP)
|
||||
ORDER BY id
|
||||
LIMIT #{limit}
|
||||
""")
|
||||
List<WebhookPlatformDelivery> selectPendingDue(@Param("limit") int limit);
|
||||
}
|
||||
@@ -34,3 +34,8 @@ craftlabs:
|
||||
internal:
|
||||
base-url: ${PLATFORM_INTERNAL_BASE_URL:}
|
||||
token: ${CRAFTLABS_PLATFORM_INTERNAL_TOKEN:}
|
||||
delivery:
|
||||
scheduler-enabled: true
|
||||
tick-ms: ${PLATFORM_DELIVERY_TICK_MS:5000}
|
||||
max-attempts: ${PLATFORM_DELIVERY_MAX_ATTEMPTS:8}
|
||||
batch-size: ${PLATFORM_DELIVERY_BATCH_SIZE:20}
|
||||
|
||||
+17
@@ -0,0 +1,17 @@
|
||||
-- I7:平台投递异步出库(可重试 / DEAD)
|
||||
CREATE TABLE webhook_platform_delivery (
|
||||
id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
|
||||
receipt_id BIGINT NOT NULL,
|
||||
idempotency_key VARCHAR(512),
|
||||
request_body TEXT NOT NULL,
|
||||
trace_headers_json TEXT,
|
||||
status VARCHAR(32) NOT NULL,
|
||||
attempts INT NOT NULL DEFAULT 0,
|
||||
last_error VARCHAR(2048),
|
||||
next_retry_at TIMESTAMP WITH TIME ZONE,
|
||||
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
CONSTRAINT uq_webhook_platform_delivery_receipt UNIQUE (receipt_id)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_webhook_platform_delivery_pending ON webhook_platform_delivery (status, next_retry_at, id);
|
||||
+36
@@ -0,0 +1,36 @@
|
||||
package cn.craftlabs.platform.webhook;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.test.web.servlet.MockMvc;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
|
||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
|
||||
|
||||
@SpringBootTest
|
||||
@AutoConfigureMockMvc
|
||||
class PlatformDeliveryEnqueueTest {
|
||||
|
||||
@Autowired
|
||||
private MockMvc mockMvc;
|
||||
|
||||
@Autowired
|
||||
private PlatformDeliveryService platformDeliveryService;
|
||||
|
||||
@Test
|
||||
void firstCallbackInsertsPendingDeliveryRow() throws Exception {
|
||||
long before = platformDeliveryService.countAll();
|
||||
mockMvc.perform(
|
||||
post("/webhook/bitanswer/callback")
|
||||
.header(CallbackIngestController.HEADER_TOKEN, "test-secret")
|
||||
.header("Idempotency-Key", "enqueue-it-" + System.nanoTime())
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.content("{\"message_id\":\"msg-e1\",\"event\":\"sn:x\"}"))
|
||||
.andExpect(status().isOk());
|
||||
assertThat(platformDeliveryService.countAll()).isEqualTo(before + 1);
|
||||
}
|
||||
}
|
||||
@@ -18,3 +18,9 @@ mybatis-plus:
|
||||
craftlabs:
|
||||
webhook:
|
||||
expected-token: test-secret
|
||||
platform:
|
||||
internal:
|
||||
base-url: http://127.0.0.1:65509
|
||||
token: unit-test-internal-token
|
||||
delivery:
|
||||
scheduler-enabled: false
|
||||
|
||||
@@ -6,3 +6,18 @@ CREATE TABLE IF NOT EXISTS webhook_callback_receipt (
|
||||
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
CONSTRAINT uq_webhook_idempotency UNIQUE (idempotency_key)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS webhook_platform_delivery (
|
||||
id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
|
||||
receipt_id BIGINT NOT NULL,
|
||||
idempotency_key VARCHAR(512),
|
||||
request_body TEXT NOT NULL,
|
||||
trace_headers_json TEXT,
|
||||
status VARCHAR(32) NOT NULL,
|
||||
attempts INT NOT NULL DEFAULT 0,
|
||||
last_error VARCHAR(2048),
|
||||
next_retry_at TIMESTAMP WITH TIME ZONE,
|
||||
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
CONSTRAINT uq_webhook_platform_delivery_receipt UNIQUE (receipt_id)
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user