mirror of
https://github.com/hpd840321/craftlabs-authorization-sdk.git
synced 2026-06-09 10:00:30 +08:00
feat(i8-i9): webhook DEAD replay, read-only delivery status, and callback UI
I8: platform proxies replay to webhook; webhook ops token filter and internal replay endpoint; delivery service supports read/replay flows. I9: platform GET callback webhook delivery status by inbox id; UI shows read-only status block and handles load errors without blocking the page. Also refresh OpenAPI, Runbook notes, test fixtures and YAML; fix Vite dev axios baseURL so /api uses proxy; improve login error messaging. Made-with: Cursor
This commit is contained in:
@@ -144,3 +144,7 @@ curl -sS -o /dev/null -w "%{http_code}\n" \
|
||||
| `craftlabs.platform.delivery.batch-size` | 每 tick 最多拉取条数 |
|
||||
|
||||
比特 Callback **2xx** 在收据落库与 **出站行入队** 之后返回;真正 `POST` 平台由后台线程执行。`DEAD` 行需人工依据 `last_error` 与平台侧幂等处理。
|
||||
|
||||
**I8 — DEAD 重放入队**:在平台与 Webhook 配置 **`LICENSE_WEBHOOK_BASE_URL`**(Webhook 根 URL)与 **`LICENSE_WEBHOOK_OPS_TOKEN`**(两侧相同;保护 Webhook `POST /internal/v1/platform-deliveries/by-receipt/{receiptId}/replay`)。**OPS / SYS_ADMIN** 可在 UI **Callback 详情** 触发「重新入队出库」,平台会按收件箱的 `webhookReceiptId` 代调 Webhook;仅当出库行为 **`DEAD`** 时成功。亦可手工:`curl -X POST -H "X-Webhook-Ops-Token: …" "http://<webhook>/internal/v1/platform-deliveries/by-receipt/<receiptId>/replay"`。
|
||||
|
||||
**I9 — 出库状态只读**:平台 `GET /api/v1/callback-inbox/{id}/webhook-delivery` 代调 Webhook `GET …/by-receipt/{receiptId}`(同一 Ops Token);UI 详情展示 `status` / `attempts` / `lastError` 等。
|
||||
|
||||
+15
@@ -4,6 +4,8 @@ import cn.craftlabs.platform.api.service.CallbackInboxService;
|
||||
import cn.craftlabs.platform.api.web.dto.CallbackInboxLinkPatchRequest;
|
||||
import cn.craftlabs.platform.api.web.dto.CallbackInboxResponse;
|
||||
import cn.craftlabs.platform.api.web.dto.CallbackInboxStatusPatchRequest;
|
||||
import cn.craftlabs.platform.api.web.dto.CallbackWebhookDeliveryStatusResponse;
|
||||
import cn.craftlabs.platform.api.web.dto.CallbackWebhookReplayResponse;
|
||||
import cn.craftlabs.platform.api.web.dto.PageResponse;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.Max;
|
||||
@@ -12,6 +14,7 @@ import org.springframework.validation.annotation.Validated;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PatchMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
@@ -73,4 +76,16 @@ public class CallbackInboxController {
|
||||
@PathVariable("id") long id, @Valid @RequestBody CallbackInboxLinkPatchRequest request) {
|
||||
return callbackInboxService.patchLink(id, request);
|
||||
}
|
||||
|
||||
/** I8:代理 OPS 调用 Webhook,将关联收据的 {@code DEAD} 出库重新入队。 */
|
||||
@PostMapping("/{id}/replay-webhook-delivery")
|
||||
public CallbackWebhookReplayResponse replayWebhookDelivery(@PathVariable("id") long id) {
|
||||
return callbackInboxService.replayWebhookDelivery(id);
|
||||
}
|
||||
|
||||
/** I9:只读查询与收件箱关联的 Webhook 平台投递状态。 */
|
||||
@GetMapping("/{id}/webhook-delivery")
|
||||
public CallbackWebhookDeliveryStatusResponse getWebhookDelivery(@PathVariable("id") long id) {
|
||||
return callbackInboxService.getWebhookDeliveryStatus(id);
|
||||
}
|
||||
}
|
||||
|
||||
+63
-1
@@ -11,7 +11,10 @@ import cn.craftlabs.platform.api.persistence.project.PlatformProjectMapper;
|
||||
import cn.craftlabs.platform.api.web.dto.CallbackInboxLinkPatchRequest;
|
||||
import cn.craftlabs.platform.api.web.dto.CallbackInboxResponse;
|
||||
import cn.craftlabs.platform.api.web.dto.CallbackInboxStatusPatchRequest;
|
||||
import cn.craftlabs.platform.api.web.dto.CallbackWebhookDeliveryStatusResponse;
|
||||
import cn.craftlabs.platform.api.web.dto.CallbackWebhookReplayResponse;
|
||||
import cn.craftlabs.platform.api.web.dto.PageResponse;
|
||||
import cn.craftlabs.platform.api.webhook.WebhookDeliveryReplayClient;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
@@ -39,6 +42,7 @@ public class CallbackInboxService {
|
||||
private final PlatformContractMapper contractMapper;
|
||||
private final AuditService auditService;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final WebhookDeliveryReplayClient webhookDeliveryReplayClient;
|
||||
|
||||
public CallbackInboxService(
|
||||
PlatformCallbackInboxMapper inboxMapper,
|
||||
@@ -46,13 +50,15 @@ public class CallbackInboxService {
|
||||
PlatformProjectMapper projectMapper,
|
||||
PlatformContractMapper contractMapper,
|
||||
AuditService auditService,
|
||||
ObjectMapper objectMapper) {
|
||||
ObjectMapper objectMapper,
|
||||
WebhookDeliveryReplayClient webhookDeliveryReplayClient) {
|
||||
this.inboxMapper = inboxMapper;
|
||||
this.licenseSnMapper = licenseSnMapper;
|
||||
this.projectMapper = projectMapper;
|
||||
this.contractMapper = contractMapper;
|
||||
this.auditService = auditService;
|
||||
this.objectMapper = objectMapper;
|
||||
this.webhookDeliveryReplayClient = webhookDeliveryReplayClient;
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
@@ -180,6 +186,62 @@ public class CallbackInboxService {
|
||||
return toResponse(row, true);
|
||||
}
|
||||
|
||||
/** I8:按收件箱关联的 {@code webhook_receipt_id} 请求 Webhook 将 DEAD 出库重新入队。 */
|
||||
public CallbackWebhookReplayResponse replayWebhookDelivery(long inboxId) {
|
||||
PlatformCallbackInbox row = requireInbox(inboxId);
|
||||
String receiptStr = row.getWebhookReceiptId();
|
||||
if (!StringUtils.hasText(receiptStr)) {
|
||||
throw new ResponseStatusException(
|
||||
HttpStatus.BAD_REQUEST,
|
||||
"callback inbox has no webhookReceiptId; cannot replay platform delivery");
|
||||
}
|
||||
long receiptId;
|
||||
try {
|
||||
receiptId = Long.parseLong(receiptStr.trim());
|
||||
} catch (NumberFormatException e) {
|
||||
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "invalid webhookReceiptId");
|
||||
}
|
||||
if (!webhookDeliveryReplayClient.isConfigured()) {
|
||||
throw new ResponseStatusException(
|
||||
HttpStatus.SERVICE_UNAVAILABLE,
|
||||
"webhook replay is not configured (set LICENSE_WEBHOOK_BASE_URL and LICENSE_WEBHOOK_OPS_TOKEN)");
|
||||
}
|
||||
try {
|
||||
webhookDeliveryReplayClient.replay(receiptId);
|
||||
} catch (IllegalStateException e) {
|
||||
throw new ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, e.getMessage());
|
||||
}
|
||||
return new CallbackWebhookReplayResponse("REQUEUED", receiptStr.trim());
|
||||
}
|
||||
|
||||
/** I9:按收件箱 {@code webhookReceiptId} 拉取 Webhook 出库行只读摘要。 */
|
||||
@Transactional(readOnly = true)
|
||||
public CallbackWebhookDeliveryStatusResponse getWebhookDeliveryStatus(long inboxId) {
|
||||
PlatformCallbackInbox row = requireInbox(inboxId);
|
||||
String receiptStr = row.getWebhookReceiptId();
|
||||
if (!StringUtils.hasText(receiptStr)) {
|
||||
throw new ResponseStatusException(
|
||||
HttpStatus.BAD_REQUEST,
|
||||
"callback inbox has no webhookReceiptId; no platform delivery row linked");
|
||||
}
|
||||
long receiptId;
|
||||
try {
|
||||
receiptId = Long.parseLong(receiptStr.trim());
|
||||
} catch (NumberFormatException e) {
|
||||
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "invalid webhookReceiptId");
|
||||
}
|
||||
if (!webhookDeliveryReplayClient.isConfigured()) {
|
||||
throw new ResponseStatusException(
|
||||
HttpStatus.SERVICE_UNAVAILABLE,
|
||||
"webhook ops is not configured (set LICENSE_WEBHOOK_BASE_URL and LICENSE_WEBHOOK_OPS_TOKEN)");
|
||||
}
|
||||
try {
|
||||
return webhookDeliveryReplayClient.fetchDeliveryStatus(receiptId);
|
||||
} catch (IllegalStateException e) {
|
||||
throw new ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private PlatformCallbackInbox requireInbox(long id) {
|
||||
PlatformCallbackInbox row = inboxMapper.selectById(id);
|
||||
if (row == null) {
|
||||
|
||||
+62
@@ -0,0 +1,62 @@
|
||||
package cn.craftlabs.platform.api.web.dto;
|
||||
|
||||
import java.time.OffsetDateTime;
|
||||
|
||||
/** I9:与 Webhook {@code GET .../platform-deliveries/by-receipt/{id}} 对齐的只读摘要。 */
|
||||
public class CallbackWebhookDeliveryStatusResponse {
|
||||
|
||||
private Long receiptId;
|
||||
private String status;
|
||||
private Integer attempts;
|
||||
private String lastError;
|
||||
private OffsetDateTime nextRetryAt;
|
||||
private OffsetDateTime updatedAt;
|
||||
|
||||
public Long getReceiptId() {
|
||||
return receiptId;
|
||||
}
|
||||
|
||||
public void setReceiptId(Long receiptId) {
|
||||
this.receiptId = receiptId;
|
||||
}
|
||||
|
||||
public String getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public void setStatus(String status) {
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public Integer getAttempts() {
|
||||
return attempts;
|
||||
}
|
||||
|
||||
public void setAttempts(Integer attempts) {
|
||||
this.attempts = attempts;
|
||||
}
|
||||
|
||||
public String getLastError() {
|
||||
return lastError;
|
||||
}
|
||||
|
||||
public void setLastError(String lastError) {
|
||||
this.lastError = lastError;
|
||||
}
|
||||
|
||||
public OffsetDateTime getNextRetryAt() {
|
||||
return nextRetryAt;
|
||||
}
|
||||
|
||||
public void setNextRetryAt(OffsetDateTime nextRetryAt) {
|
||||
this.nextRetryAt = nextRetryAt;
|
||||
}
|
||||
|
||||
public OffsetDateTime getUpdatedAt() {
|
||||
return updatedAt;
|
||||
}
|
||||
|
||||
public void setUpdatedAt(OffsetDateTime updatedAt) {
|
||||
this.updatedAt = updatedAt;
|
||||
}
|
||||
}
|
||||
+33
@@ -0,0 +1,33 @@
|
||||
package cn.craftlabs.platform.api.web.dto;
|
||||
|
||||
/**
|
||||
* I8:请求 Webhook 将 {@code DEAD} 出库任务重新入队后的响应。
|
||||
*/
|
||||
public class CallbackWebhookReplayResponse {
|
||||
|
||||
private String status;
|
||||
private String receiptId;
|
||||
|
||||
public CallbackWebhookReplayResponse() {}
|
||||
|
||||
public CallbackWebhookReplayResponse(String status, String receiptId) {
|
||||
this.status = status;
|
||||
this.receiptId = receiptId;
|
||||
}
|
||||
|
||||
public String getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public void setStatus(String status) {
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public String getReceiptId() {
|
||||
return receiptId;
|
||||
}
|
||||
|
||||
public void setReceiptId(String receiptId) {
|
||||
this.receiptId = receiptId;
|
||||
}
|
||||
}
|
||||
+119
@@ -0,0 +1,119 @@
|
||||
package cn.craftlabs.platform.api.webhook;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
import cn.craftlabs.platform.api.web.dto.CallbackWebhookDeliveryStatusResponse;
|
||||
import org.springframework.web.client.RestClient;
|
||||
import org.springframework.web.client.RestClientException;
|
||||
import org.springframework.web.client.RestClientResponseException;
|
||||
import org.springframework.web.server.ResponseStatusException;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* I8 / I9:代 OPS 调用 {@code license-webhook-ingress}(重放与只读查询),须配置 base-url + ops-token。
|
||||
*/
|
||||
@Component
|
||||
public class WebhookDeliveryReplayClient {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(WebhookDeliveryReplayClient.class);
|
||||
/** 与 license-webhook-ingress {@code WebhookOpsTokenFilter} 请求头一致 */
|
||||
private static final String HEADER_OPS_TOKEN = "X-Webhook-Ops-Token";
|
||||
|
||||
private final RestClient restClient = RestClient.create();
|
||||
|
||||
@Value("${craftlabs.webhook.base-url:}")
|
||||
private String baseUrl;
|
||||
|
||||
@Value("${craftlabs.webhook.ops-token:}")
|
||||
private String opsToken;
|
||||
|
||||
public boolean isConfigured() {
|
||||
return StringUtils.hasText(baseUrl) && StringUtils.hasText(opsToken);
|
||||
}
|
||||
|
||||
public void replay(long receiptId) {
|
||||
if (!isConfigured()) {
|
||||
throw new IllegalStateException("webhook replay is not configured");
|
||||
}
|
||||
String url =
|
||||
baseUrl.replaceAll("/+$", "")
|
||||
+ "/internal/v1/platform-deliveries/by-receipt/"
|
||||
+ receiptId
|
||||
+ "/replay";
|
||||
try {
|
||||
restClient
|
||||
.post()
|
||||
.uri(url)
|
||||
.header(HEADER_OPS_TOKEN, opsToken)
|
||||
.retrieve()
|
||||
.toBodilessEntity();
|
||||
} catch (RestClientResponseException e) {
|
||||
throw mapException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/** I9:查询 {@code webhook_platform_delivery} 行摘要。 */
|
||||
public CallbackWebhookDeliveryStatusResponse fetchDeliveryStatus(long receiptId) {
|
||||
if (!isConfigured()) {
|
||||
throw new IllegalStateException("webhook ops client is not configured");
|
||||
}
|
||||
String url =
|
||||
baseUrl.replaceAll("/+$", "")
|
||||
+ "/internal/v1/platform-deliveries/by-receipt/"
|
||||
+ receiptId;
|
||||
try {
|
||||
return restClient
|
||||
.get()
|
||||
.uri(url)
|
||||
.header(HEADER_OPS_TOKEN, opsToken)
|
||||
.retrieve()
|
||||
.body(CallbackWebhookDeliveryStatusResponse.class);
|
||||
} catch (RestClientResponseException e) {
|
||||
throw mapException(e);
|
||||
} catch (RestClientException e) {
|
||||
throw new ResponseStatusException(
|
||||
HttpStatus.BAD_GATEWAY, "webhook unreachable: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private static ResponseStatusException mapException(RestClientResponseException e) {
|
||||
int code = e.getStatusCode().value();
|
||||
String body = e.getResponseBodyAsString(StandardCharsets.UTF_8);
|
||||
String detail = shorten(body);
|
||||
log.debug("webhook replay HTTP {} body {}", code, detail);
|
||||
if (code == HttpStatus.NOT_FOUND.value()) {
|
||||
return new ResponseStatusException(
|
||||
HttpStatus.NOT_FOUND, "webhook platform delivery not found for receipt");
|
||||
}
|
||||
if (code == HttpStatus.CONFLICT.value()) {
|
||||
return new ResponseStatusException(
|
||||
HttpStatus.CONFLICT,
|
||||
StringUtils.hasText(detail)
|
||||
? detail
|
||||
: "webhook rejected replay (delivery is not DEAD or conflict)");
|
||||
}
|
||||
if (code == HttpStatus.SERVICE_UNAVAILABLE.value()) {
|
||||
return new ResponseStatusException(
|
||||
HttpStatus.SERVICE_UNAVAILABLE,
|
||||
StringUtils.hasText(detail) ? detail : "webhook ops endpoint unavailable");
|
||||
}
|
||||
if (code == HttpStatus.UNAUTHORIZED.value()) {
|
||||
return new ResponseStatusException(
|
||||
HttpStatus.BAD_GATEWAY, "webhook rejected ops token (check LICENSE_WEBHOOK_OPS_TOKEN)");
|
||||
}
|
||||
return new ResponseStatusException(HttpStatus.BAD_GATEWAY, "webhook replay failed: HTTP " + code);
|
||||
}
|
||||
|
||||
private static String shorten(String body) {
|
||||
if (!StringUtils.hasText(body)) {
|
||||
return "";
|
||||
}
|
||||
String t = body.trim();
|
||||
return t.length() > 400 ? t.substring(0, 400) + "…" : t;
|
||||
}
|
||||
}
|
||||
@@ -33,6 +33,12 @@ platform:
|
||||
internal:
|
||||
token: ${PLATFORM_INTERNAL_TOKEN:${CRAFTLABS_PLATFORM_INTERNAL_TOKEN:}}
|
||||
|
||||
# I8:平台代调 Webhook 出库重放(OPS JWT → 平台 → Webhook 内部 API)
|
||||
craftlabs:
|
||||
webhook:
|
||||
base-url: ${LICENSE_WEBHOOK_BASE_URL:}
|
||||
ops-token: ${LICENSE_WEBHOOK_OPS_TOKEN:}
|
||||
|
||||
springdoc:
|
||||
swagger-ui:
|
||||
path: /swagger-ui.html
|
||||
|
||||
+87
@@ -1,16 +1,25 @@
|
||||
package cn.craftlabs.platform.api.callback;
|
||||
|
||||
import cn.craftlabs.platform.api.support.JwtTestSupport;
|
||||
import cn.craftlabs.platform.api.web.dto.CallbackWebhookDeliveryStatusResponse;
|
||||
import cn.craftlabs.platform.api.webhook.WebhookDeliveryReplayClient;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.test.web.servlet.MockMvc;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
|
||||
@@ -33,6 +42,9 @@ class CallbackInboxControllerTest {
|
||||
@Autowired
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
@MockBean
|
||||
private WebhookDeliveryReplayClient webhookDeliveryReplayClient;
|
||||
|
||||
@Test
|
||||
void listDetailStatusLinkAndIntegrationCatalog() throws Exception {
|
||||
String token = JwtTestSupport.obtainBearerToken(mockMvc, objectMapper);
|
||||
@@ -55,6 +67,8 @@ class CallbackInboxControllerTest {
|
||||
.andExpect(status().isOk())
|
||||
.andExpect(jsonPath("$.content.length()").value(2));
|
||||
|
||||
when(webhookDeliveryReplayClient.isConfigured()).thenReturn(false);
|
||||
|
||||
String ingestBody = minimalIngestJson("msg-inbox-flow-1");
|
||||
String ingestResp =
|
||||
mockMvc.perform(
|
||||
@@ -110,6 +124,7 @@ class CallbackInboxControllerTest {
|
||||
|
||||
@Test
|
||||
void developerCannotAccessCallbackInbox() throws Exception {
|
||||
when(webhookDeliveryReplayClient.isConfigured()).thenReturn(false);
|
||||
String token = JwtTestSupport.obtainBearerToken(mockMvc, objectMapper, "dev", "dev");
|
||||
mockMvc.perform(
|
||||
get("/api/v1/callback-inbox")
|
||||
@@ -119,13 +134,85 @@ class CallbackInboxControllerTest {
|
||||
.andExpect(status().isForbidden());
|
||||
}
|
||||
|
||||
@Test
|
||||
void replayWebhookDeliveryDelegatesToWebhookClient() throws Exception {
|
||||
when(webhookDeliveryReplayClient.isConfigured()).thenReturn(true);
|
||||
doNothing().when(webhookDeliveryReplayClient).replay(anyLong());
|
||||
|
||||
String ingestBody = minimalIngestJson("msg-replay-flow", "4242");
|
||||
String ingestResp =
|
||||
mockMvc.perform(
|
||||
post("/internal/v1/callback-events")
|
||||
.header(INTERNAL_HEADER, INTERNAL_TOKEN)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.content(ingestBody))
|
||||
.andExpect(status().isOk())
|
||||
.andReturn()
|
||||
.getResponse()
|
||||
.getContentAsString();
|
||||
long inboxId = objectMapper.readTree(ingestResp).get("inboxId").asLong();
|
||||
|
||||
String token = JwtTestSupport.obtainBearerToken(mockMvc, objectMapper);
|
||||
mockMvc.perform(
|
||||
post("/api/v1/callback-inbox/" + inboxId + "/replay-webhook-delivery")
|
||||
.header("Authorization", "Bearer " + token))
|
||||
.andExpect(status().isOk())
|
||||
.andExpect(jsonPath("$.status").value("REQUEUED"))
|
||||
.andExpect(jsonPath("$.receiptId").value("4242"));
|
||||
|
||||
verify(webhookDeliveryReplayClient).replay(4242L);
|
||||
}
|
||||
|
||||
@Test
|
||||
void getWebhookDeliveryStatusDelegatesToWebhookClient() throws Exception {
|
||||
when(webhookDeliveryReplayClient.isConfigured()).thenReturn(true);
|
||||
CallbackWebhookDeliveryStatusResponse wh = new CallbackWebhookDeliveryStatusResponse();
|
||||
wh.setReceiptId(777L);
|
||||
wh.setStatus("PENDING");
|
||||
wh.setAttempts(2);
|
||||
wh.setLastError("probe");
|
||||
when(webhookDeliveryReplayClient.fetchDeliveryStatus(eq(777L))).thenReturn(wh);
|
||||
|
||||
String ingestBody = minimalIngestJson("msg-webhook-delivery-status", "777");
|
||||
String ingestResp =
|
||||
mockMvc.perform(
|
||||
post("/internal/v1/callback-events")
|
||||
.header(INTERNAL_HEADER, INTERNAL_TOKEN)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.content(ingestBody))
|
||||
.andExpect(status().isOk())
|
||||
.andReturn()
|
||||
.getResponse()
|
||||
.getContentAsString();
|
||||
long inboxId = objectMapper.readTree(ingestResp).get("inboxId").asLong();
|
||||
|
||||
String token = JwtTestSupport.obtainBearerToken(mockMvc, objectMapper);
|
||||
mockMvc.perform(
|
||||
get("/api/v1/callback-inbox/" + inboxId + "/webhook-delivery")
|
||||
.header("Authorization", "Bearer " + token))
|
||||
.andExpect(status().isOk())
|
||||
.andExpect(jsonPath("$.receiptId").value(777))
|
||||
.andExpect(jsonPath("$.status").value("PENDING"))
|
||||
.andExpect(jsonPath("$.attempts").value(2))
|
||||
.andExpect(jsonPath("$.lastError").value("probe"));
|
||||
|
||||
verify(webhookDeliveryReplayClient).fetchDeliveryStatus(777L);
|
||||
}
|
||||
|
||||
private String minimalIngestJson(String externalMessageId) throws Exception {
|
||||
return minimalIngestJson(externalMessageId, null);
|
||||
}
|
||||
|
||||
private String minimalIngestJson(String externalMessageId, String webhookReceiptId) throws Exception {
|
||||
ObjectNode root = objectMapper.createObjectNode();
|
||||
root.put("schemaVersion", "1.0");
|
||||
root.put("sourceSystem", "BITANSWER");
|
||||
root.put("externalMessageId", externalMessageId);
|
||||
root.put("eventType", "sn:test");
|
||||
root.set("rawPayload", objectMapper.createObjectNode().put("sn", "SN-X"));
|
||||
if (webhookReceiptId != null) {
|
||||
root.put("webhookReceiptId", webhookReceiptId);
|
||||
}
|
||||
return objectMapper.writeValueAsString(root);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,3 +14,8 @@ platform:
|
||||
secret: unit-test-jwt-secret-at-least-32-chars-ok
|
||||
internal:
|
||||
token: unit-test-internal-token-for-callback-ingest
|
||||
|
||||
craftlabs:
|
||||
webhook:
|
||||
base-url: http://127.0.0.1:65520
|
||||
ops-token: unit-test-webhook-ops-token
|
||||
|
||||
+53
@@ -7,13 +7,17 @@ import jakarta.servlet.http.HttpServletRequest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.client.RestClientException;
|
||||
import org.springframework.web.server.ResponseStatusException;
|
||||
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* I7:平台投递入库 + 异步拉取发送。
|
||||
@@ -90,6 +94,55 @@ public class PlatformDeliveryService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* I8:将 {@code DEAD} 行按 {@code receipt_id} 重新入队,尝试次数清零。
|
||||
*/
|
||||
public void replayDeadDeliveryByReceiptId(long receiptId) {
|
||||
WebhookPlatformDelivery d =
|
||||
deliveryMapper.selectOne(
|
||||
Wrappers.lambdaQuery(WebhookPlatformDelivery.class)
|
||||
.eq(WebhookPlatformDelivery::getReceiptId, receiptId));
|
||||
if (d == null) {
|
||||
throw new ResponseStatusException(
|
||||
HttpStatus.NOT_FOUND, "platform delivery not found for receipt " + receiptId);
|
||||
}
|
||||
if (!STATUS_DEAD.equals(d.getStatus())) {
|
||||
throw new ResponseStatusException(
|
||||
HttpStatus.CONFLICT,
|
||||
"platform delivery status is "
|
||||
+ d.getStatus()
|
||||
+ ", only DEAD can be replayed");
|
||||
}
|
||||
OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC);
|
||||
d.setStatus(STATUS_PENDING);
|
||||
d.setAttempts(0);
|
||||
d.setLastError(null);
|
||||
d.setNextRetryAt(null);
|
||||
d.setUpdatedAt(now);
|
||||
deliveryMapper.updateById(d);
|
||||
log.info("platform delivery replay re-queued id={} receiptId={}", d.getId(), receiptId);
|
||||
}
|
||||
|
||||
/** I9:按 receipt_id 返回投递行摘要(供运维只读)。 */
|
||||
public Map<String, Object> getStatusByReceiptId(long receiptId) {
|
||||
WebhookPlatformDelivery d =
|
||||
deliveryMapper.selectOne(
|
||||
Wrappers.lambdaQuery(WebhookPlatformDelivery.class)
|
||||
.eq(WebhookPlatformDelivery::getReceiptId, receiptId));
|
||||
if (d == null) {
|
||||
throw new ResponseStatusException(
|
||||
HttpStatus.NOT_FOUND, "platform delivery not found for receipt " + receiptId);
|
||||
}
|
||||
Map<String, Object> m = new LinkedHashMap<>();
|
||||
m.put("receiptId", receiptId);
|
||||
m.put("status", d.getStatus());
|
||||
m.put("attempts", d.getAttempts() != null ? d.getAttempts() : 0);
|
||||
m.put("lastError", d.getLastError());
|
||||
m.put("nextRetryAt", d.getNextRetryAt());
|
||||
m.put("updatedAt", d.getUpdatedAt());
|
||||
return m;
|
||||
}
|
||||
|
||||
private void processOne(WebhookPlatformDelivery d) {
|
||||
OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC);
|
||||
try {
|
||||
|
||||
+48
@@ -0,0 +1,48 @@
|
||||
package cn.craftlabs.platform.webhook;
|
||||
|
||||
import jakarta.servlet.FilterChain;
|
||||
import jakarta.servlet.ServletException;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import jakarta.servlet.http.HttpServletResponse;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.filter.OncePerRequestFilter;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* I8:为 {@code /internal/**} 校验运维 Token;无配置时拒绝(503),避免误暴露。
|
||||
*/
|
||||
@Component
|
||||
@Order(Ordered.HIGHEST_PRECEDENCE)
|
||||
public class WebhookOpsTokenFilter extends OncePerRequestFilter {
|
||||
|
||||
public static final String HEADER_OPS_TOKEN = "X-Webhook-Ops-Token";
|
||||
|
||||
@Value("${craftlabs.webhook.ops-token:}")
|
||||
private String opsToken;
|
||||
|
||||
@Override
|
||||
protected void doFilterInternal(
|
||||
HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
|
||||
throws ServletException, IOException {
|
||||
String path = request.getRequestURI();
|
||||
if (path == null || !path.startsWith("/internal/")) {
|
||||
filterChain.doFilter(request, response);
|
||||
return;
|
||||
}
|
||||
if (!StringUtils.hasText(opsToken)) {
|
||||
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "webhook ops token not configured");
|
||||
return;
|
||||
}
|
||||
String presented = request.getHeader(HEADER_OPS_TOKEN);
|
||||
if (!opsToken.equals(presented)) {
|
||||
response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "invalid or missing ops token");
|
||||
return;
|
||||
}
|
||||
filterChain.doFilter(request, response);
|
||||
}
|
||||
}
|
||||
+41
@@ -0,0 +1,41 @@
|
||||
package cn.craftlabs.platform.webhook;
|
||||
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* I8:{@code DEAD} 重放入队;I9:按收据 ID 只读查询投递状态(供 {@code delivery-platform-api} 代调)。
|
||||
*/
|
||||
@RestController
|
||||
public class WebhookPlatformDeliveryOpsController {
|
||||
|
||||
private final PlatformDeliveryService platformDeliveryService;
|
||||
|
||||
public WebhookPlatformDeliveryOpsController(PlatformDeliveryService platformDeliveryService) {
|
||||
this.platformDeliveryService = platformDeliveryService;
|
||||
}
|
||||
|
||||
@GetMapping(
|
||||
value = "/internal/v1/platform-deliveries/by-receipt/{receiptId}",
|
||||
produces = MediaType.APPLICATION_JSON_VALUE)
|
||||
public Map<String, Object> getByReceipt(@PathVariable("receiptId") long receiptId) {
|
||||
return platformDeliveryService.getStatusByReceiptId(receiptId);
|
||||
}
|
||||
|
||||
@PostMapping(
|
||||
value = "/internal/v1/platform-deliveries/by-receipt/{receiptId}/replay",
|
||||
produces = MediaType.APPLICATION_JSON_VALUE)
|
||||
public Map<String, Object> replayByReceipt(@PathVariable("receiptId") long receiptId) {
|
||||
platformDeliveryService.replayDeadDeliveryByReceiptId(receiptId);
|
||||
Map<String, Object> body = new LinkedHashMap<>();
|
||||
body.put("status", "REQUEUED");
|
||||
body.put("receiptId", receiptId);
|
||||
return body;
|
||||
}
|
||||
}
|
||||
@@ -30,6 +30,8 @@ management:
|
||||
craftlabs:
|
||||
webhook:
|
||||
expected-token: ${CRAFTLABS_WEBHOOK_EXPECTED_TOKEN:}
|
||||
# I8:保护 /internal/** 运维接口(与平台代理共用 LICENSE_WEBHOOK_OPS_TOKEN);空则返回 503
|
||||
ops-token: ${LICENSE_WEBHOOK_OPS_TOKEN:}
|
||||
platform:
|
||||
internal:
|
||||
base-url: ${PLATFORM_INTERNAL_BASE_URL:}
|
||||
|
||||
+115
@@ -0,0 +1,115 @@
|
||||
package cn.craftlabs.platform.webhook;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.test.web.servlet.MockMvc;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
|
||||
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
|
||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
|
||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
|
||||
|
||||
@SpringBootTest
|
||||
@AutoConfigureMockMvc
|
||||
class WebhookPlatformDeliveryReplayTest {
|
||||
|
||||
private static final String OPS = WebhookOpsTokenFilter.HEADER_OPS_TOKEN;
|
||||
private static final String TOKEN = "unit-test-webhook-ops-token";
|
||||
|
||||
@Autowired
|
||||
private MockMvc mockMvc;
|
||||
|
||||
@Autowired
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
|
||||
@BeforeEach
|
||||
void cleanDeliveries() {
|
||||
jdbcTemplate.update("DELETE FROM webhook_platform_delivery");
|
||||
}
|
||||
|
||||
@Test
|
||||
void replayDeadResetsToPending() throws Exception {
|
||||
jdbcTemplate.update(
|
||||
"""
|
||||
INSERT INTO webhook_platform_delivery
|
||||
(receipt_id, idempotency_key, request_body, status, attempts, last_error, created_at, updated_at)
|
||||
VALUES (901, 'idem901', '{}', 'DEAD', 8, 'boom', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
||||
""");
|
||||
|
||||
mockMvc.perform(
|
||||
post("/internal/v1/platform-deliveries/by-receipt/901/replay")
|
||||
.header(OPS, TOKEN)
|
||||
.accept(MediaType.APPLICATION_JSON))
|
||||
.andExpect(status().isOk())
|
||||
.andExpect(jsonPath("$.status").value("REQUEUED"))
|
||||
.andExpect(jsonPath("$.receiptId").value(901));
|
||||
|
||||
String st =
|
||||
jdbcTemplate.queryForObject(
|
||||
"SELECT status FROM webhook_platform_delivery WHERE receipt_id = 901",
|
||||
String.class);
|
||||
assertThat(st).isEqualTo("PENDING");
|
||||
Integer attempts =
|
||||
jdbcTemplate.queryForObject(
|
||||
"SELECT attempts FROM webhook_platform_delivery WHERE receipt_id = 901",
|
||||
Integer.class);
|
||||
assertThat(attempts).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
void replayNonDeadReturns409() throws Exception {
|
||||
jdbcTemplate.update(
|
||||
"""
|
||||
INSERT INTO webhook_platform_delivery
|
||||
(receipt_id, idempotency_key, request_body, status, attempts, created_at, updated_at)
|
||||
VALUES (902, 'idem902', '{}', 'SENT', 1, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
||||
""");
|
||||
|
||||
mockMvc.perform(
|
||||
post("/internal/v1/platform-deliveries/by-receipt/902/replay")
|
||||
.header(OPS, TOKEN)
|
||||
.accept(MediaType.APPLICATION_JSON))
|
||||
.andExpect(status().isConflict());
|
||||
}
|
||||
|
||||
@Test
|
||||
void getDeliveryStatusByReceiptReturnsSummary() throws Exception {
|
||||
jdbcTemplate.update(
|
||||
"""
|
||||
INSERT INTO webhook_platform_delivery
|
||||
(receipt_id, idempotency_key, request_body, status, attempts, last_error, created_at, updated_at)
|
||||
VALUES (905, 'idem905', '{}', 'PENDING', 2, 'err-x', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
||||
""");
|
||||
|
||||
mockMvc.perform(
|
||||
get("/internal/v1/platform-deliveries/by-receipt/905")
|
||||
.header(OPS, TOKEN)
|
||||
.accept(MediaType.APPLICATION_JSON))
|
||||
.andExpect(status().isOk())
|
||||
.andExpect(jsonPath("$.receiptId").value(905))
|
||||
.andExpect(jsonPath("$.status").value("PENDING"))
|
||||
.andExpect(jsonPath("$.attempts").value(2))
|
||||
.andExpect(jsonPath("$.lastError").value("err-x"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void missingOpsTokenReturns401() throws Exception {
|
||||
jdbcTemplate.update(
|
||||
"""
|
||||
INSERT INTO webhook_platform_delivery
|
||||
(receipt_id, idempotency_key, request_body, status, attempts, created_at, updated_at)
|
||||
VALUES (903, 'idem903', '{}', 'DEAD', 1, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
||||
""");
|
||||
|
||||
mockMvc.perform(
|
||||
post("/internal/v1/platform-deliveries/by-receipt/903/replay")
|
||||
.accept(MediaType.APPLICATION_JSON))
|
||||
.andExpect(status().isUnauthorized());
|
||||
}
|
||||
}
|
||||
@@ -18,6 +18,7 @@ mybatis-plus:
|
||||
craftlabs:
|
||||
webhook:
|
||||
expected-token: test-secret
|
||||
ops-token: unit-test-webhook-ops-token
|
||||
platform:
|
||||
internal:
|
||||
base-url: http://127.0.0.1:65509
|
||||
|
||||
Reference in New Issue
Block a user