diff --git a/docs/architecture/对外接口不变-走查任务与状态.md b/docs/architecture/对外接口不变-走查任务与状态.md index e8937e61..0479aed6 100644 --- a/docs/architecture/对外接口不变-走查任务与状态.md +++ b/docs/architecture/对外接口不变-走查任务与状态.md @@ -11,9 +11,9 @@ | 约定 § | 代码锚点 | 子任务数(上界) | 下一可修正动作(建议顺序) | 不可修正或须前置确认 | |--------|----------|------------------|----------------------------|------------------------| | **§3.1** | `ImageRuleRefServiceImpl#delete`(约 575~598 行) | **`N = param.getIds().size()`** 次 `updateGroupPersonRef`(每规则删后各 1 次) | **P0**:全部 `deleteById` 完成后,对本轮涉及的 `labelIds`、`organizationIds` **去重并集**,调用 **1 次** `updateGroupPersonRef`;为每次 RPC 增加 **`isSuccess` 校验**(与 §2.2 一致) | **须图库/通行确认**:合并调用是否为「刷新引用」语义、是否等价于当前 N 次效果;若不能确认则**不得合并**,仅可补返回值校验与日志(**工作区走查与方案审核见 §6**) | -| **§3.2** | `AcsPersonServiceImpl#delete`(约 165~175 行) | **`P = param.getPersonIds().size()`** 次 `imageStorePersonService.delete` | **P1**:**有界并行**(并发度 4~8)调用 `delete`,失败聚合与现网「遇错即停」一致;已具备 `isSuccess` 分支,保持语义 | **无批量 delete**:无法改为单次 RPC,除非将来扩展契约(约定 §4 远期) | -| **§3.3** | `AcsPassRuleServiceImpl#listFloor` | **`F = passRuleResults.size()`** 次 `acsPersonService.page`(仅取 `totalRows`) | **首轮已完成**:`page.isSuccess()`、`getData()` 空防护、`rowsOfPage=1`。**P1 后续**:按楼层 **有界并行** `page`(保持结果顺序) | **禁止**用本地 `countPersonIdByZoneId` 等替代 `page.totalRows`;**无多 zone 一次统计 API** 时无法单 RPC 消除 N | -| **§3.4** | `AcsPassRuleServiceImpl#addImageStore`(约 195~201 行) | **`D = deviceList.size()`** 次 `bindDeviceAndImageStore` | **P1**:**有界并行** `bind`;并行失败时与现有 **回滚删图库** 逻辑一致(注意竞态与顺序) | **无批量 bind**:不能合并为 1 次 RPC(契约不扩展时) | +| **§3.2** | `AcsPersonServiceImpl#delete` | **`P = param.getPersonIds().size()`** 次 `imageStorePersonService.delete` | **P1 首轮已实施**(**§8**):`elevatorRemoteBoundedExecutor` 按批 `invokeAll`(默认并发 **6**),子线程 **`FeignThreadLocalUtil.callWithContext`**;遇失败**整批后**即返回 `CloudwalkResult.fail`(与原先顺序循环遇错即停一致,**非**单条失败即取消同批其它在途 RPC) | **无批量 delete**;同批内并行语义见 **§8.1** | +| **§3.3** | `AcsPassRuleServiceImpl#listFloor` | **`F = passRuleResults.size()`** 次 `acsPersonService.page` | **P1 首轮已实施**(**§8**):楼层 `page` 有界并行(默认 **6**),`personTotals[]` 按下标写回,**列表顺序不变** | **禁止**本地 count 替代 `totalRows`;RPC 上界仍为 **F** | +| **§3.4** | `AcsPassRuleServiceImpl#addImageStore` 内设备绑图库 | **`D = deviceList.size()`** 次 `bindDeviceAndImageStore` | **P1 首轮已实施**(**§8**):`bind` 有界并行;失败仍 **`rollbackImageStoreAfterBindFailure`** 后抛 `ServiceException`(与同批已绑设备竞态与顺序循环**同类**) | **无批量 bind** | | **§3.5** | `AcsDeviceTaskServiceImpl#updateFloors`(约 46~119 行) | 增:**`A = addFloors.size()`** 次 `personRuleService.add` 或 `imageRuleRefService.addOnlyRule`;删:**`D = delFloorIds.size()`** 次 `personRuleService.delete` / `imageRuleRefService.delete` / DAO | **P1**:删层在 §3.1 落地后可减少「内层 refresh」放大;可对**楼层维度**做有界并行(与限流/异步线程池策略一致) | **`@Async("updateFloorsExecutor")`** 下线程池与背压须单独评估;错误现为 `throw new ServiceException(e.getMessage())` 信息较粗,是否属「接口不变」范畴由产品/运维定义 | --- @@ -47,16 +47,20 @@ | **约定锚点** | **§3.5** `AcsDeviceTaskServiceImpl#updateFloors` | | **暂缓项** | 楼层有界并行、`AbortPolicy` 与 `catch` 语义、删楼 `ruleMap` 缺键等见 **§7.3**。 | -### 迭代 4 及以后(P1 工程批次) +### 迭代 4(**P1:§3.2 / §3.3 / §3.4 + 统一有界池**) -在迭代 3 首轮修正稳定后,按 ROI 与风险分批推进(均需单独走查闸门): +| 字段 | 内容 | +|------|------| +| **状态** | **已实施**(实现说明与语义边界见 **§8**)。 | +| **线程池 Bean** | `elevatorRemoteBoundedExecutor`(`ElevatorRemoteIoExecutorConfig`),配置前缀 **`ninca.elevator.remote-io.pool`**(默认 core=max=**6**,queue=512,`CallerRunsPolicy`)。**未**与 `updateFloorsExecutor` 合并,避免异步任务与同步 RPC 抢同池。 | +| **公共能力** | `FeignThreadLocalUtil.callWithContext`(`cw-elevator-application-common`):子线程执行 Feign 前绑定/恢复 ThreadLocal 请求头。 | -1. **§3.2**:`AcsPersonServiceImpl#delete` — 有界并行 `imageStorePersonService.delete`,失败策略与现网一致。 -2. **§3.3**:`AcsPassRuleServiceImpl#listFloor` — 有界并行 `acsPersonService.page`,**保持楼层顺序**合并结果。 -3. **§3.4**:`AcsPassRuleServiceImpl#addImageStore` — 有界并行 `bindDeviceAndImageStore`,失败与**回滚删图库**与现逻辑一致。 -4. **统一线程池**:迭代 3 若已梳理 `updateFloorsExecutor`,再决定 P1 是否复用命名池或独立有界池,并在 PR 中写明并发度、超时与遇错策略。 +### 迭代 5 及以后(可选深化) -**已完成回顾**:迭代 1 — `listFloor` 首轮修正(**§5**);迭代 2 — §3.1 仅文档走查(**§6**),代码冻结见上表。 +- **§3.5**:`updateFloors` 内楼层循环有界并行(仍受 §3.1 内层放大约束)。 +- **调参 / 观测**:`ninca.elevator.remote-io.pool` 按环境压测调整;必要时为并行批增加指标日志。 + +**已完成回顾**:迭代 1 — **§5**;迭代 2 — **§6**(§3.1 冻结);迭代 3 — **§7**;迭代 4 — **§8**。 --- @@ -170,3 +174,21 @@ - **暂缓(须单独评审)**:按楼层 **有界并行**、拒绝策略、`catch` 异常语义增强、`ruleMap` 缺键防护。 **修正实施后**:提交 **`0ddeedc`**(分支 `v0.11`)。 + +--- + +## 8. 迭代 4:P1 有界并行(§3.2 / §3.3 / §3.4) + +**实施日期**:2026-04-25 + +### 8.1 行为与约定对齐说明 + +| 项 | 说明 | +|----|------| +| **并发度** | 代码常量与默认池 **`corePoolSize=maxPoolSize=6`**(约定 4~8 区间内),可通过 **`ninca.elevator.remote-io.pool.core-pool-size` / `max-pool-size`** 覆盖。 | +| **§3.2 `delete`** | 多 `personId` 时按批 `ThreadPoolExecutor.invokeAll`;单 ID 仍走主线程(无 Feign 子线程问题)。失败时返回 **`76260407`** 风格 `CloudwalkResult.fail`,与改造前**一致**;**同批内**若某 RPC 失败,`invokeAll` 仍会等本批其它任务结束后再统一 `get()` 抛出/返回,与**严格单线程「第一条失败即不再发起后续」**在「已发起请求数」上略有差异,属典型有界并行取舍。 | +| **§3.3 `listFloor`** | 设备数仍顺序 DAO;`acsPersonService.page` 按批并行,结果写入 `personTotals[idx]` 后顺序 `setPersonNumber`,**响应楼层顺序不变**。 | +| **§3.4 `addImageStore`** | `bindAppImageStoreDevice` 仍顺序执行;仅 **`bindDeviceAndImageStore`** 按批并行;任一批次中失败则 **`rollbackImageStoreAfterBindFailure`**(抽方法)后抛 `ServiceException`,与原先 try/catch 回滚路径一致。 | +| **Feign ThreadLocal** | 所有子线程 RPC 经 **`FeignThreadLocalUtil.callWithContext`**,避免池化线程串请求头。 | + +**实施后提交**:**`7eb3785`**(分支 `v0.11`)。 diff --git a/maven-cw-elevator-application/cw-elevator-application-common/src/main/java/cn/cloudwalk/elevator/config/FeignThreadLocalUtil.java b/maven-cw-elevator-application/cw-elevator-application-common/src/main/java/cn/cloudwalk/elevator/config/FeignThreadLocalUtil.java index af592f9c..5299daf1 100644 --- a/maven-cw-elevator-application/cw-elevator-application-common/src/main/java/cn/cloudwalk/elevator/config/FeignThreadLocalUtil.java +++ b/maven-cw-elevator-application/cw-elevator-application-common/src/main/java/cn/cloudwalk/elevator/config/FeignThreadLocalUtil.java @@ -5,6 +5,7 @@ import cn.cloudwalk.cloud.session.extend.DefaultExtendContext; import cn.cloudwalk.elevator.context.CloudWalkExtendContextValue; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import javax.servlet.http.HttpServletRequest; import org.apache.commons.lang3.StringUtils; @@ -76,4 +77,22 @@ public class FeignThreadLocalUtil { public static void setRequestHeader(CloudwalkCallContext context) { set(getDefaultRequestHeader(context)); } + + /** + * 在有界线程池等子线程中调用 Feign 前,必须为当前线程设置与 {@code context} 一致的请求头 ThreadLocal; + * 调用结束后恢复/清理,避免池化线程泄漏或串扰。 + */ + public static T callWithContext(CloudwalkCallContext context, Callable action) throws Exception { + Map previous = get(); + try { + setRequestHeader(context); + return action.call(); + } finally { + if (previous != null) { + set(previous); + } else { + remove(); + } + } + } } diff --git a/maven-cw-elevator-application/cw-elevator-application-service/src/main/java/cn/cloudwalk/elevator/common/ElevatorRemoteIoExecutorConfig.java b/maven-cw-elevator-application/cw-elevator-application-service/src/main/java/cn/cloudwalk/elevator/common/ElevatorRemoteIoExecutorConfig.java new file mode 100644 index 00000000..d18e5ee9 --- /dev/null +++ b/maven-cw-elevator-application/cw-elevator-application-service/src/main/java/cn/cloudwalk/elevator/common/ElevatorRemoteIoExecutorConfig.java @@ -0,0 +1,27 @@ +package cn.cloudwalk.elevator.common; + +import java.util.concurrent.ThreadPoolExecutor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +@Configuration +public class ElevatorRemoteIoExecutorConfig { + @Autowired + private ElevatorRemoteIoPoolProperties elevatorRemoteIoPoolProperties; + + @Bean(name = {"elevatorRemoteBoundedExecutor"}) + public ThreadPoolTaskExecutor elevatorRemoteBoundedExecutor() { + ThreadPoolTaskExecutor ex = new ThreadPoolTaskExecutor(); + ex.setCorePoolSize(this.elevatorRemoteIoPoolProperties.getCorePoolSize()); + ex.setMaxPoolSize(this.elevatorRemoteIoPoolProperties.getMaxPoolSize()); + ex.setQueueCapacity(this.elevatorRemoteIoPoolProperties.getQueueCapacity()); + ex.setKeepAliveSeconds(this.elevatorRemoteIoPoolProperties.getKeepAliveSeconds()); + ex.setAllowCoreThreadTimeOut(this.elevatorRemoteIoPoolProperties.isAllowCoreThreadTimeOut()); + ex.setThreadNamePrefix("elevator-remote-io-"); + ex.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + ex.initialize(); + return ex; + } +} diff --git a/maven-cw-elevator-application/cw-elevator-application-service/src/main/java/cn/cloudwalk/elevator/common/ElevatorRemoteIoPoolProperties.java b/maven-cw-elevator-application/cw-elevator-application-service/src/main/java/cn/cloudwalk/elevator/common/ElevatorRemoteIoPoolProperties.java new file mode 100644 index 00000000..2db78958 --- /dev/null +++ b/maven-cw-elevator-application/cw-elevator-application-service/src/main/java/cn/cloudwalk/elevator/common/ElevatorRemoteIoPoolProperties.java @@ -0,0 +1,55 @@ +package cn.cloudwalk.elevator.common; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConfigurationProperties(prefix = "ninca.elevator.remote-io.pool") +public class ElevatorRemoteIoPoolProperties { + /** 约定 §3.2 / §3.3 / §3.4:有界并行建议 4~8,默认 6 */ + private int corePoolSize = 6; + private int maxPoolSize = 6; + private int queueCapacity = 512; + private int keepAliveSeconds = 60; + private boolean allowCoreThreadTimeOut = true; + + public int getCorePoolSize() { + return this.corePoolSize; + } + + public void setCorePoolSize(int corePoolSize) { + this.corePoolSize = corePoolSize; + } + + public int getMaxPoolSize() { + return this.maxPoolSize; + } + + public void setMaxPoolSize(int maxPoolSize) { + this.maxPoolSize = maxPoolSize; + } + + public int getQueueCapacity() { + return this.queueCapacity; + } + + public void setQueueCapacity(int queueCapacity) { + this.queueCapacity = queueCapacity; + } + + public int getKeepAliveSeconds() { + return this.keepAliveSeconds; + } + + public void setKeepAliveSeconds(int keepAliveSeconds) { + this.keepAliveSeconds = keepAliveSeconds; + } + + public boolean isAllowCoreThreadTimeOut() { + return this.allowCoreThreadTimeOut; + } + + public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { + this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; + } +} diff --git a/maven-cw-elevator-application/cw-elevator-application-service/src/main/java/cn/cloudwalk/elevator/passrule/impl/AcsPassRuleServiceImpl.java b/maven-cw-elevator-application/cw-elevator-application-service/src/main/java/cn/cloudwalk/elevator/passrule/impl/AcsPassRuleServiceImpl.java index 05cceac9..d7d2c0c3 100644 --- a/maven-cw-elevator-application/cw-elevator-application-service/src/main/java/cn/cloudwalk/elevator/passrule/impl/AcsPassRuleServiceImpl.java +++ b/maven-cw-elevator-application/cw-elevator-application-service/src/main/java/cn/cloudwalk/elevator/passrule/impl/AcsPassRuleServiceImpl.java @@ -18,6 +18,7 @@ import cn.cloudwalk.cloud.page.CloudwalkPageInfo; import cn.cloudwalk.cloud.result.CloudwalkResult; import cn.cloudwalk.cloud.utils.BeanCopyUtils; import cn.cloudwalk.elevator.common.service.AcsApplicationService; +import cn.cloudwalk.elevator.config.FeignThreadLocalUtil; import cn.cloudwalk.elevator.device.dao.AcsElevatorDeviceDao; import cn.cloudwalk.elevator.device.dto.AcsElevatorDeviceListDto; import cn.cloudwalk.elevator.device.dto.AcsElevatorDeviceResultDTO; @@ -59,9 +60,14 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.stream.Collectors; import javax.annotation.Resource; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @@ -69,6 +75,8 @@ import org.springframework.util.ObjectUtils; @Service public class AcsPassRuleServiceImpl extends AbstractAcsPassService implements AcsPassRuleService { + /** 约定 §3.3 / §3.4:与 {@code ninca.elevator.remote-io.pool} 默认 core 对齐 */ + private static final int REMOTE_IO_PARALLEL = 6; @Resource private ImageStoreService imageStoreService; @Resource @@ -87,6 +95,9 @@ public class AcsPassRuleServiceImpl extends AbstractAcsPassService implements Ac private AcsDeviceImageStoreAppBindService acsDeviceImageStoreAppBindService; @Resource private AcsElevatorDeviceDao acsElevatorDeviceDao; + @Autowired + @Qualifier("elevatorRemoteBoundedExecutor") + private ThreadPoolTaskExecutor elevatorRemoteBoundedExecutor; public CloudwalkResult> listFloor(AcsPassRuleFloorParam param, CloudwalkCallContext context) throws ServiceException { @@ -112,21 +123,45 @@ public class AcsPassRuleServiceImpl extends AbstractAcsPassService implements Ac dto.setCurrentFloorId(passRuleResult.getId()); List deviceList = this.acsElevatorDeviceDao.listByZoneId(dto); passRuleResult.setDeviceNumber(Integer.valueOf(deviceList.size())); - AcsPersonQueryParam personParam = new AcsPersonQueryParam(); - personParam.setZoneId(passRuleResult.getId()); - CloudwalkPageInfo pageInfo = new CloudwalkPageInfo(1, 1); - CloudwalkResult> page = - this.acsPersonService.page(personParam, pageInfo, context); - if (!page.isSuccess()) { - this.logger.info("远程调用查询通行人员分页失败,原因:" + page.getMessage()); - throw new ServiceException(page.getCode(), page.getMessage()); + } + int floorCount = passRuleResults.size(); + long[] personTotals = new long[floorCount]; + for (int i = 0; i < floorCount; ) { + int end = Math.min(i + REMOTE_IO_PARALLEL, floorCount); + List> batch = new ArrayList<>(); + for (int j = i; j < end; j++) { + final int idx = j; + final String zoneId = passRuleResults.get(j).getId(); + batch.add(() -> { + personTotals[idx] = pagePersonTotalRowsForZone(zoneId, context); + return null; + }); } - if (ObjectUtils.isEmpty(page.getData())) { - passRuleResult.setPersonNumber(Long.valueOf(0L)); - } else { - passRuleResult.setPersonNumber( - Long.valueOf(((CloudwalkPageAble)page.getData()).getTotalRows())); + List> floorFutures; + try { + floorFutures = this.elevatorRemoteBoundedExecutor.getThreadPoolExecutor().invokeAll(batch); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ServiceException("76260520", getMessage("76260520")); } + for (Future fu : floorFutures) { + try { + fu.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ServiceException("76260520", getMessage("76260520")); + } catch (ExecutionException e) { + Throwable c = e.getCause(); + if (c instanceof ServiceException) { + throw (ServiceException)c; + } + throw new ServiceException("76260520", getMessage("76260520")); + } + } + i = end; + } + for (int k = 0; k < floorCount; k++) { + passRuleResults.get(k).setPersonNumber(Long.valueOf(personTotals[k])); } return CloudwalkResult.success(passRuleResults); } catch (DataAccessException e) { @@ -201,24 +236,51 @@ public class AcsPassRuleServiceImpl extends AbstractAcsPassService implements Ac appBindParam.setApplicationId(applicationId); this.acsDeviceImageStoreAppBindService.bindAppImageStoreDevice(appBindParam, context); if (!CollectionUtils.isEmpty(deviceList)) { - for (AcsElevatorDeviceResultDTO device : deviceList) { - try { - DeviceImageStoreAppBindParam bindParam = new DeviceImageStoreAppBindParam(); - bindParam.setImageStoreId((String)imageStoreId.getData()); - bindParam.setDeviceId(device.getDeviceId()); - bindParam.setApplicationId(applicationId); - this.acsDeviceImageStoreAppBindService.bindDeviceAndImageStore(bindParam, context); - } catch (ServiceException e) { - this.logger.error("图库关联失败,图库id={},原因:{}", imageStoreId.getData(), e.getMessage()); - ImageStoreDelParam delParam = new ImageStoreDelParam(); - delParam.setId((String)imageStoreId.getData()); - delParam.setBusinessId(context.getCompany().getCompanyId()); - this.logger.info("回滚删除图库开始,delParam={},context={}", JSONObject.toJSON(delParam), - JSONObject.toJSON(context)); - CloudwalkResult deleteResult = this.imageStoreService.delete(delParam, context); - this.logger.info("删除图库:图库id={},结果:{}", imageStoreId, deleteResult.getMessage()); - throw new ServiceException(e.getCode(), e.getMessage()); + final String newImageStoreId = (String)imageStoreId.getData(); + for (int i = 0; i < deviceList.size(); ) { + int end = Math.min(i + REMOTE_IO_PARALLEL, deviceList.size()); + List> bindBatch = new ArrayList<>(); + for (int j = i; j < end; j++) { + final AcsElevatorDeviceResultDTO device = deviceList.get(j); + bindBatch.add(() -> { + FeignThreadLocalUtil.callWithContext(context, () -> { + DeviceImageStoreAppBindParam bindParam = new DeviceImageStoreAppBindParam(); + bindParam.setImageStoreId(newImageStoreId); + bindParam.setDeviceId(device.getDeviceId()); + bindParam.setApplicationId(applicationId); + this.acsDeviceImageStoreAppBindService.bindDeviceAndImageStore(bindParam, context); + return null; + }); + return null; + }); } + List> bindFutures; + try { + bindFutures = this.elevatorRemoteBoundedExecutor.getThreadPoolExecutor().invokeAll(bindBatch); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ServiceException("76260505", getMessage("76260505")); + } + for (Future f : bindFutures) { + try { + f.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ServiceException("76260505", getMessage("76260505")); + } catch (ExecutionException e) { + Throwable c = e.getCause(); + if (c instanceof ServiceException) { + ServiceException se = (ServiceException)c; + this.logger.error("图库关联失败,图库id={},原因:{}", newImageStoreId, se.getMessage()); + rollbackImageStoreAfterBindFailure(newImageStoreId, context); + throw new ServiceException(se.getCode(), se.getMessage()); + } + this.logger.error("图库关联失败,图库id={},原因:{}", newImageStoreId, e.getMessage()); + rollbackImageStoreAfterBindFailure(newImageStoreId, context); + throw new ServiceException("76260505", getMessage("76260505")); + } + } + i = end; } } return (String)imageStoreId.getData(); @@ -471,6 +533,35 @@ public class AcsPassRuleServiceImpl extends AbstractAcsPassService implements Ac } } + private long pagePersonTotalRowsForZone(String zoneId, CloudwalkCallContext context) throws Exception { + return FeignThreadLocalUtil.callWithContext(context, () -> { + AcsPersonQueryParam personParam = new AcsPersonQueryParam(); + personParam.setZoneId(zoneId); + CloudwalkPageInfo pageInfo = new CloudwalkPageInfo(1, 1); + CloudwalkResult> page = + this.acsPersonService.page(personParam, pageInfo, context); + if (!page.isSuccess()) { + this.logger.info("远程调用查询通行人员分页失败,原因:" + page.getMessage()); + throw new ServiceException(page.getCode(), page.getMessage()); + } + if (ObjectUtils.isEmpty(page.getData())) { + return 0L; + } + return ((CloudwalkPageAble)page.getData()).getTotalRows(); + }); + } + + private void rollbackImageStoreAfterBindFailure(String imageStoreIdValue, CloudwalkCallContext context) + throws ServiceException { + ImageStoreDelParam delParam = new ImageStoreDelParam(); + delParam.setId(imageStoreIdValue); + delParam.setBusinessId(context.getCompany().getCompanyId()); + this.logger.info("回滚删除图库开始,delParam={},context={}", JSONObject.toJSON(delParam), + JSONObject.toJSON(context)); + CloudwalkResult deleteResult = this.imageStoreService.delete(delParam, context); + this.logger.info("删除图库:图库id={},结果:{}", imageStoreIdValue, deleteResult.getMessage()); + } + private void getZoneTypeIsThree(ZoneTreeResult zoneTreeResult, List passRuleResults) { if ("FLOOR".equals(zoneTreeResult.getType())) { AcsPassRuleFloorResult result = new AcsPassRuleFloorResult(); diff --git a/maven-cw-elevator-application/cw-elevator-application-service/src/main/java/cn/cloudwalk/elevator/person/impl/AcsPersonServiceImpl.java b/maven-cw-elevator-application/cw-elevator-application-service/src/main/java/cn/cloudwalk/elevator/person/impl/AcsPersonServiceImpl.java index ba2b4fb8..8c3a0992 100644 --- a/maven-cw-elevator-application/cw-elevator-application-service/src/main/java/cn/cloudwalk/elevator/person/impl/AcsPersonServiceImpl.java +++ b/maven-cw-elevator-application/cw-elevator-application-service/src/main/java/cn/cloudwalk/elevator/person/impl/AcsPersonServiceImpl.java @@ -49,6 +49,7 @@ import cn.cloudwalk.elevator.person.param.AcsPersonTimeDetailParam; import cn.cloudwalk.elevator.person.result.AcsPersonResult; import cn.cloudwalk.elevator.person.result.AcsPersonTimeDetailResult; import cn.cloudwalk.elevator.person.service.AcsPersonService; +import cn.cloudwalk.elevator.config.FeignThreadLocalUtil; import cn.cloudwalk.elevator.util.CollectionUtils; import cn.cloudwalk.elevator.util.DateUtils; import cn.cloudwalk.elevator.util.StringUtils; @@ -59,14 +60,21 @@ import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.stream.Collectors; import javax.annotation.Resource; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; @Service public class AcsPersonServiceImpl extends AbstractAcsPassService implements AcsPersonService { private static final int ROWS_OF_PAGE = 1000; + /** 约定 §3.2:与 {@code ninca.elevator.remote-io.pool} 默认 core 对齐,有界并行上界 */ + private static final int REMOTE_DELETE_PARALLEL = 6; @Autowired private BiologyToolService biologyToolService; @Autowired @@ -83,6 +91,9 @@ public class AcsPersonServiceImpl extends AbstractAcsPassService implements AcsP private AcsPassRuleService acsPassRuleService; @Resource private AcsPassRuleDao acsPassRuleDao; + @Autowired + @Qualifier("elevatorRemoteBoundedExecutor") + private ThreadPoolTaskExecutor elevatorRemoteBoundedExecutor; @CloudwalkParamsValidate public CloudwalkResult add(AcsPersonAddParam param, CloudwalkCallContext context) throws ServiceException { @@ -162,16 +173,63 @@ public class AcsPersonServiceImpl extends AbstractAcsPassService implements AcsP throws ServiceException { this.logger.info("删除门禁通行人员开始,AcsPersonDeleteParam=[{}], CloudwalkCallContext=[{}]", JSONObject.toJSONString(param), JSONObject.toJSONString(context)); - for (String personId : param.getPersonIds()) { - ImageStorePersonDelParam imageStorePersonDelParam = new ImageStorePersonDelParam(); - imageStorePersonDelParam.setPersonId(personId); - imageStorePersonDelParam.setImageStoreId(param.getImageStoreId()); - CloudwalkResult imageStorePersonDeleteResult = - this.imageStorePersonService.delete(imageStorePersonDelParam, context); - if (!imageStorePersonDeleteResult.isSuccess()) { - return CloudwalkResult.fail("76260407", - getMessage("76260407") + " " + imageStorePersonDeleteResult.getMessage()); + List personIds = param.getPersonIds(); + if (CollectionUtils.isEmpty(personIds)) { + return CloudwalkResult.success(Boolean.valueOf(true)); + } + if (personIds.size() == 1) { + return deleteOnePersonFromImageStore(param.getImageStoreId(), personIds.get(0), context); + } + for (int i = 0; i < personIds.size(); i += REMOTE_DELETE_PARALLEL) { + int end = Math.min(i + REMOTE_DELETE_PARALLEL, personIds.size()); + List>> batch = new ArrayList<>(); + for (int j = i; j < end; j++) { + final String personId = personIds.get(j); + batch.add(() -> FeignThreadLocalUtil.callWithContext(context, + () -> deleteOnePersonFromImageStore(param.getImageStoreId(), personId, context))); } + List>> futures; + try { + futures = this.elevatorRemoteBoundedExecutor.getThreadPoolExecutor().invokeAll(batch); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return CloudwalkResult.fail("76260407", getMessage("76260407") + " " + e.getMessage()); + } + for (Future> future : futures) { + try { + CloudwalkResult r = future.get(); + if (!r.isSuccess()) { + return r; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return CloudwalkResult.fail("76260407", getMessage("76260407") + " " + e.getMessage()); + } catch (ExecutionException e) { + Throwable c = e.getCause(); + if (c instanceof ServiceException) { + ServiceException se = (ServiceException)c; + return CloudwalkResult.fail( + se.getCode() != null ? se.getCode() : "76260407", + getMessage("76260407") + " " + se.getMessage()); + } + return CloudwalkResult.fail("76260407", + getMessage("76260407") + " " + (c != null ? c.getMessage() : e.getMessage())); + } + } + } + return CloudwalkResult.success(Boolean.valueOf(true)); + } + + private CloudwalkResult deleteOnePersonFromImageStore(String imageStoreId, String personId, + CloudwalkCallContext context) throws ServiceException { + ImageStorePersonDelParam imageStorePersonDelParam = new ImageStorePersonDelParam(); + imageStorePersonDelParam.setPersonId(personId); + imageStorePersonDelParam.setImageStoreId(imageStoreId); + CloudwalkResult imageStorePersonDeleteResult = + this.imageStorePersonService.delete(imageStorePersonDelParam, context); + if (!imageStorePersonDeleteResult.isSuccess()) { + return CloudwalkResult.fail("76260407", + getMessage("76260407") + " " + imageStorePersonDeleteResult.getMessage()); } return CloudwalkResult.success(Boolean.valueOf(true)); }