elevator(P1): bounded parallel delete, listFloor page, bind; Feign callWithContext

Made-with: Cursor

Former-commit-id: fe571aaadc
This commit is contained in:
反编译工作区
2026-04-25 08:29:53 +08:00
parent 25052441aa
commit 1b2e6a18c6
6 changed files with 321 additions and 49 deletions
@@ -11,9 +11,9 @@
| 约定 § | 代码锚点 | 子任务数(上界) | 下一可修正动作(建议顺序) | 不可修正或须前置确认 | | 约定 § | 代码锚点 | 子任务数(上界) | 下一可修正动作(建议顺序) | 不可修正或须前置确认 |
|--------|----------|------------------|----------------------------|------------------------| |--------|----------|------------------|----------------------------|------------------------|
| **§3.1** | `ImageRuleRefServiceImpl#delete`(约 575598 行) | **`N = param.getIds().size()`** 次 `updateGroupPersonRef`(每规则删后各 1 次) | **P0**:全部 `deleteById` 完成后,对本轮涉及的 `labelIds``organizationIds` **去重并集**,调用 **1 次** `updateGroupPersonRef`;为每次 RPC 增加 **`isSuccess` 校验**(与 §2.2 一致) | **须图库/通行确认**:合并调用是否为「刷新引用」语义、是否等价于当前 N 次效果;若不能确认则**不得合并**,仅可补返回值校验与日志(**工作区走查与方案审核见 §6**) | | **§3.1** | `ImageRuleRefServiceImpl#delete`(约 575598 行) | **`N = param.getIds().size()`** 次 `updateGroupPersonRef`(每规则删后各 1 次) | **P0**:全部 `deleteById` 完成后,对本轮涉及的 `labelIds``organizationIds` **去重并集**,调用 **1 次** `updateGroupPersonRef`;为每次 RPC 增加 **`isSuccess` 校验**(与 §2.2 一致) | **须图库/通行确认**:合并调用是否为「刷新引用」语义、是否等价于当前 N 次效果;若不能确认则**不得合并**,仅可补返回值校验与日志(**工作区走查与方案审核见 §6**) |
| **§3.2** | `AcsPersonServiceImpl#delete`(约 165175 行) | **`P = param.getPersonIds().size()`** 次 `imageStorePersonService.delete` | **P1****有界并行**(并发度 48)调用 `delete`,失败聚合与现网「遇错即停」一致;已具备 `isSuccess` 分支,保持语义 | **无批量 delete**:无法改为单次 RPC,除非将来扩展契约(约定 §4 远期) | | **§3.2** | `AcsPersonServiceImpl#delete` | **`P = param.getPersonIds().size()`** 次 `imageStorePersonService.delete` | **P1 首轮已实施****§8**):`elevatorRemoteBoundedExecutor` 按批 `invokeAll`(默认并发 **6**),子线程 **`FeignThreadLocalUtil.callWithContext`**;遇失败**整批后**即返回 `CloudwalkResult.fail`(与原先顺序循环遇错即停一致,**非**单条失败即取消同批其它在途 RPC) | **无批量 delete**;同批内并行语义见 **§8.1** |
| **§3.3** | `AcsPassRuleServiceImpl#listFloor` | **`F = passRuleResults.size()`** 次 `acsPersonService.page`(仅取 `totalRows` | **首轮已完成**`page.isSuccess()``getData()` 空防护、`rowsOfPage=1`。**P1 后续**:按楼层 **有界并行** `page`(保持结果顺序) | **禁止**本地 `countPersonIdByZoneId` 等替代 `page.totalRows`**无多 zone 一次统计 API** 时无法单 RPC 消除 N | | **§3.3** | `AcsPassRuleServiceImpl#listFloor` | **`F = passRuleResults.size()`** 次 `acsPersonService.page` | **P1 首轮已实施****§8**):楼层 `page` 有界并行(默认 **6**),`personTotals[]` 按下标写回,**列表顺序不变** | **禁止**本地 count 替代 `totalRows`RPC 上界仍为 **F** |
| **§3.4** | `AcsPassRuleServiceImpl#addImageStore`(约 195201 行) | **`D = deviceList.size()`** 次 `bindDeviceAndImageStore` | **P1****有界并行** `bind`并行失败时与现有 **回滚删图库** 逻辑一致(注意竞态与顺序) | **无批量 bind**:不能合并为 1 次 RPC(契约不扩展时) | | **§3.4** | `AcsPassRuleServiceImpl#addImageStore` 内设备绑图库 | **`D = deviceList.size()`** 次 `bindDeviceAndImageStore` | **P1 首轮已实施****§8**):`bind` 有界并行失败**`rollbackImageStoreAfterBindFailure`** 后抛 `ServiceException`(与同批已绑设备竞态与顺序循环**同类**) | **无批量 bind** |
| **§3.5** | `AcsDeviceTaskServiceImpl#updateFloors`(约 46119 行) | 增:**`A = addFloors.size()`** 次 `personRuleService.add``imageRuleRefService.addOnlyRule`;删:**`D = delFloorIds.size()`** 次 `personRuleService.delete` / `imageRuleRefService.delete` / DAO | **P1**:删层在 §3.1 落地后可减少「内层 refresh」放大;可对**楼层维度**做有界并行(与限流/异步线程池策略一致) | **`@Async("updateFloorsExecutor")`** 下线程池与背压须单独评估;错误现为 `throw new ServiceException(e.getMessage())` 信息较粗,是否属「接口不变」范畴由产品/运维定义 | | **§3.5** | `AcsDeviceTaskServiceImpl#updateFloors`(约 46119 行) | 增:**`A = addFloors.size()`** 次 `personRuleService.add``imageRuleRefService.addOnlyRule`;删:**`D = delFloorIds.size()`** 次 `personRuleService.delete` / `imageRuleRefService.delete` / DAO | **P1**:删层在 §3.1 落地后可减少「内层 refresh」放大;可对**楼层维度**做有界并行(与限流/异步线程池策略一致) | **`@Async("updateFloorsExecutor")`** 下线程池与背压须单独评估;错误现为 `throw new ServiceException(e.getMessage())` 信息较粗,是否属「接口不变」范畴由产品/运维定义 |
--- ---
@@ -47,16 +47,20 @@
| **约定锚点** | **§3.5** `AcsDeviceTaskServiceImpl#updateFloors` | | **约定锚点** | **§3.5** `AcsDeviceTaskServiceImpl#updateFloors` |
| **暂缓项** | 楼层有界并行、`AbortPolicy``catch` 语义、删楼 `ruleMap` 缺键等见 **§7.3**。 | | **暂缓项** | 楼层有界并行、`AbortPolicy``catch` 语义、删楼 `ruleMap` 缺键等见 **§7.3**。 |
### 迭代 4 及以后(P1 工程批次 ### 迭代 4**P1:§3.2 / §3.3 / §3.4 + 统一有界池**
在迭代 3 首轮修正稳定后,按 ROI 与风险分批推进(均需单独走查闸门): | 字段 | 内容 |
|------|------|
| **状态** | **已实施**(实现说明与语义边界见 **§8**)。 |
| **线程池 Bean** | `elevatorRemoteBoundedExecutor``ElevatorRemoteIoExecutorConfig`),配置前缀 **`ninca.elevator.remote-io.pool`**(默认 core=max=**6**queue=512`CallerRunsPolicy`)。**未**与 `updateFloorsExecutor` 合并,避免异步任务与同步 RPC 抢同池。 |
| **公共能力** | `FeignThreadLocalUtil.callWithContext``cw-elevator-application-common`):子线程执行 Feign 前绑定/恢复 ThreadLocal 请求头。 |
1. **§3.2**`AcsPersonServiceImpl#delete` — 有界并行 `imageStorePersonService.delete`,失败策略与现网一致。 ### 迭代 5 及以后(可选深化)
2. **§3.3**`AcsPassRuleServiceImpl#listFloor` — 有界并行 `acsPersonService.page`**保持楼层顺序**合并结果。
3. **§3.4**`AcsPassRuleServiceImpl#addImageStore` — 有界并行 `bindDeviceAndImageStore`,失败与**回滚删图库**与现逻辑一致。
4. **统一线程池**:迭代 3 若已梳理 `updateFloorsExecutor`,再决定 P1 是否复用命名池或独立有界池,并在 PR 中写明并发度、超时与遇错策略。
**已完成回顾**:迭代 1 — `listFloor` 首轮修正(**§5**);迭代 2 — §3.1 仅文档走查(**§6**),代码冻结见上表。 - **§3.5**`updateFloors` 内楼层循环有界并行(仍受 §3.1 内层放大约束)。
- **调参 / 观测**`ninca.elevator.remote-io.pool` 按环境压测调整;必要时为并行批增加指标日志。
**已完成回顾**:迭代 1 — **§5**;迭代 2 — **§6**(§3.1 冻结);迭代 3 — **§7**;迭代 4 — **§8**。
--- ---
@@ -170,3 +174,21 @@
- **暂缓(须单独评审)**:按楼层 **有界并行**、拒绝策略、`catch` 异常语义增强、`ruleMap` 缺键防护。 - **暂缓(须单独评审)**:按楼层 **有界并行**、拒绝策略、`catch` 异常语义增强、`ruleMap` 缺键防护。
**修正实施后**:提交 **`0ddeedc`**(分支 `v0.11`)。 **修正实施后**:提交 **`0ddeedc`**(分支 `v0.11`)。
---
## 8. 迭代 4P1 有界并行(§3.2 / §3.3 / §3.4
**实施日期**2026-04-25
### 8.1 行为与约定对齐说明
| 项 | 说明 |
|----|------|
| **并发度** | 代码常量与默认池 **`corePoolSize=maxPoolSize=6`**(约定 48 区间内),可通过 **`ninca.elevator.remote-io.pool.core-pool-size` / `max-pool-size`** 覆盖。 |
| **§3.2 `delete`** | 多 `personId` 时按批 `ThreadPoolExecutor.invokeAll`;单 ID 仍走主线程(无 Feign 子线程问题)。失败时返回 **`76260407`** 风格 `CloudwalkResult.fail`,与改造前**一致**;**同批内**若某 RPC 失败,`invokeAll` 仍会等本批其它任务结束后再统一 `get()` 抛出/返回,与**严格单线程「第一条失败即不再发起后续」**在「已发起请求数」上略有差异,属典型有界并行取舍。 |
| **§3.3 `listFloor`** | 设备数仍顺序 DAO`acsPersonService.page` 按批并行,结果写入 `personTotals[idx]` 后顺序 `setPersonNumber`**响应楼层顺序不变**。 |
| **§3.4 `addImageStore`** | `bindAppImageStoreDevice` 仍顺序执行;仅 **`bindDeviceAndImageStore`** 按批并行;任一批次中失败则 **`rollbackImageStoreAfterBindFailure`**(抽方法)后抛 `ServiceException`,与原先 try/catch 回滚路径一致。 |
| **Feign ThreadLocal** | 所有子线程 RPC 经 **`FeignThreadLocalUtil.callWithContext`**,避免池化线程串请求头。 |
**实施后提交****`7eb3785`**(分支 `v0.11`)。
@@ -5,6 +5,7 @@ import cn.cloudwalk.cloud.session.extend.DefaultExtendContext;
import cn.cloudwalk.elevator.context.CloudWalkExtendContextValue; import cn.cloudwalk.elevator.context.CloudWalkExtendContextValue;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@@ -76,4 +77,22 @@ public class FeignThreadLocalUtil {
public static void setRequestHeader(CloudwalkCallContext context) { public static void setRequestHeader(CloudwalkCallContext context) {
set(getDefaultRequestHeader(context)); set(getDefaultRequestHeader(context));
} }
/**
* 在有界线程池等子线程中调用 Feign 前,必须为当前线程设置与 {@code context} 一致的请求头 ThreadLocal
* 调用结束后恢复/清理,避免池化线程泄漏或串扰。
*/
public static <T> T callWithContext(CloudwalkCallContext context, Callable<T> action) throws Exception {
Map<String, String> previous = get();
try {
setRequestHeader(context);
return action.call();
} finally {
if (previous != null) {
set(previous);
} else {
remove();
}
}
}
} }
@@ -0,0 +1,27 @@
package cn.cloudwalk.elevator.common;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class ElevatorRemoteIoExecutorConfig {
@Autowired
private ElevatorRemoteIoPoolProperties elevatorRemoteIoPoolProperties;
@Bean(name = {"elevatorRemoteBoundedExecutor"})
public ThreadPoolTaskExecutor elevatorRemoteBoundedExecutor() {
ThreadPoolTaskExecutor ex = new ThreadPoolTaskExecutor();
ex.setCorePoolSize(this.elevatorRemoteIoPoolProperties.getCorePoolSize());
ex.setMaxPoolSize(this.elevatorRemoteIoPoolProperties.getMaxPoolSize());
ex.setQueueCapacity(this.elevatorRemoteIoPoolProperties.getQueueCapacity());
ex.setKeepAliveSeconds(this.elevatorRemoteIoPoolProperties.getKeepAliveSeconds());
ex.setAllowCoreThreadTimeOut(this.elevatorRemoteIoPoolProperties.isAllowCoreThreadTimeOut());
ex.setThreadNamePrefix("elevator-remote-io-");
ex.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
ex.initialize();
return ex;
}
}
@@ -0,0 +1,55 @@
package cn.cloudwalk.elevator.common;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "ninca.elevator.remote-io.pool")
public class ElevatorRemoteIoPoolProperties {
/** 约定 §3.2 / §3.3 / §3.4:有界并行建议 4~8,默认 6 */
private int corePoolSize = 6;
private int maxPoolSize = 6;
private int queueCapacity = 512;
private int keepAliveSeconds = 60;
private boolean allowCoreThreadTimeOut = true;
public int getCorePoolSize() {
return this.corePoolSize;
}
public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}
public int getMaxPoolSize() {
return this.maxPoolSize;
}
public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
public int getQueueCapacity() {
return this.queueCapacity;
}
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}
public int getKeepAliveSeconds() {
return this.keepAliveSeconds;
}
public void setKeepAliveSeconds(int keepAliveSeconds) {
this.keepAliveSeconds = keepAliveSeconds;
}
public boolean isAllowCoreThreadTimeOut() {
return this.allowCoreThreadTimeOut;
}
public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
}
}
@@ -18,6 +18,7 @@ import cn.cloudwalk.cloud.page.CloudwalkPageInfo;
import cn.cloudwalk.cloud.result.CloudwalkResult; import cn.cloudwalk.cloud.result.CloudwalkResult;
import cn.cloudwalk.cloud.utils.BeanCopyUtils; import cn.cloudwalk.cloud.utils.BeanCopyUtils;
import cn.cloudwalk.elevator.common.service.AcsApplicationService; import cn.cloudwalk.elevator.common.service.AcsApplicationService;
import cn.cloudwalk.elevator.config.FeignThreadLocalUtil;
import cn.cloudwalk.elevator.device.dao.AcsElevatorDeviceDao; import cn.cloudwalk.elevator.device.dao.AcsElevatorDeviceDao;
import cn.cloudwalk.elevator.device.dto.AcsElevatorDeviceListDto; import cn.cloudwalk.elevator.device.dto.AcsElevatorDeviceListDto;
import cn.cloudwalk.elevator.device.dto.AcsElevatorDeviceResultDTO; import cn.cloudwalk.elevator.device.dto.AcsElevatorDeviceResultDTO;
@@ -59,9 +60,14 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@@ -69,6 +75,8 @@ import org.springframework.util.ObjectUtils;
@Service @Service
public class AcsPassRuleServiceImpl extends AbstractAcsPassService implements AcsPassRuleService { public class AcsPassRuleServiceImpl extends AbstractAcsPassService implements AcsPassRuleService {
/** 约定 §3.3 / §3.4:与 {@code ninca.elevator.remote-io.pool} 默认 core 对齐 */
private static final int REMOTE_IO_PARALLEL = 6;
@Resource @Resource
private ImageStoreService imageStoreService; private ImageStoreService imageStoreService;
@Resource @Resource
@@ -87,6 +95,9 @@ public class AcsPassRuleServiceImpl extends AbstractAcsPassService implements Ac
private AcsDeviceImageStoreAppBindService acsDeviceImageStoreAppBindService; private AcsDeviceImageStoreAppBindService acsDeviceImageStoreAppBindService;
@Resource @Resource
private AcsElevatorDeviceDao acsElevatorDeviceDao; private AcsElevatorDeviceDao acsElevatorDeviceDao;
@Autowired
@Qualifier("elevatorRemoteBoundedExecutor")
private ThreadPoolTaskExecutor elevatorRemoteBoundedExecutor;
public CloudwalkResult<List<AcsPassRuleFloorResult>> listFloor(AcsPassRuleFloorParam param, public CloudwalkResult<List<AcsPassRuleFloorResult>> listFloor(AcsPassRuleFloorParam param,
CloudwalkCallContext context) throws ServiceException { CloudwalkCallContext context) throws ServiceException {
@@ -112,21 +123,45 @@ public class AcsPassRuleServiceImpl extends AbstractAcsPassService implements Ac
dto.setCurrentFloorId(passRuleResult.getId()); dto.setCurrentFloorId(passRuleResult.getId());
List<AcsElevatorDeviceResultDTO> deviceList = this.acsElevatorDeviceDao.listByZoneId(dto); List<AcsElevatorDeviceResultDTO> deviceList = this.acsElevatorDeviceDao.listByZoneId(dto);
passRuleResult.setDeviceNumber(Integer.valueOf(deviceList.size())); passRuleResult.setDeviceNumber(Integer.valueOf(deviceList.size()));
AcsPersonQueryParam personParam = new AcsPersonQueryParam(); }
personParam.setZoneId(passRuleResult.getId()); int floorCount = passRuleResults.size();
CloudwalkPageInfo pageInfo = new CloudwalkPageInfo(1, 1); long[] personTotals = new long[floorCount];
CloudwalkResult<CloudwalkPageAble<AcsPersonResult>> page = for (int i = 0; i < floorCount; ) {
this.acsPersonService.page(personParam, pageInfo, context); int end = Math.min(i + REMOTE_IO_PARALLEL, floorCount);
if (!page.isSuccess()) { List<Callable<Void>> batch = new ArrayList<>();
this.logger.info("远程调用查询通行人员分页失败,原因:" + page.getMessage()); for (int j = i; j < end; j++) {
throw new ServiceException(page.getCode(), page.getMessage()); final int idx = j;
final String zoneId = passRuleResults.get(j).getId();
batch.add(() -> {
personTotals[idx] = pagePersonTotalRowsForZone(zoneId, context);
return null;
});
} }
if (ObjectUtils.isEmpty(page.getData())) { List<Future<Void>> floorFutures;
passRuleResult.setPersonNumber(Long.valueOf(0L)); try {
} else { floorFutures = this.elevatorRemoteBoundedExecutor.getThreadPoolExecutor().invokeAll(batch);
passRuleResult.setPersonNumber( } catch (InterruptedException e) {
Long.valueOf(((CloudwalkPageAble)page.getData()).getTotalRows())); Thread.currentThread().interrupt();
throw new ServiceException("76260520", getMessage("76260520"));
} }
for (Future<Void> fu : floorFutures) {
try {
fu.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ServiceException("76260520", getMessage("76260520"));
} catch (ExecutionException e) {
Throwable c = e.getCause();
if (c instanceof ServiceException) {
throw (ServiceException)c;
}
throw new ServiceException("76260520", getMessage("76260520"));
}
}
i = end;
}
for (int k = 0; k < floorCount; k++) {
passRuleResults.get(k).setPersonNumber(Long.valueOf(personTotals[k]));
} }
return CloudwalkResult.success(passRuleResults); return CloudwalkResult.success(passRuleResults);
} catch (DataAccessException e) { } catch (DataAccessException e) {
@@ -201,24 +236,51 @@ public class AcsPassRuleServiceImpl extends AbstractAcsPassService implements Ac
appBindParam.setApplicationId(applicationId); appBindParam.setApplicationId(applicationId);
this.acsDeviceImageStoreAppBindService.bindAppImageStoreDevice(appBindParam, context); this.acsDeviceImageStoreAppBindService.bindAppImageStoreDevice(appBindParam, context);
if (!CollectionUtils.isEmpty(deviceList)) { if (!CollectionUtils.isEmpty(deviceList)) {
for (AcsElevatorDeviceResultDTO device : deviceList) { final String newImageStoreId = (String)imageStoreId.getData();
try { for (int i = 0; i < deviceList.size(); ) {
DeviceImageStoreAppBindParam bindParam = new DeviceImageStoreAppBindParam(); int end = Math.min(i + REMOTE_IO_PARALLEL, deviceList.size());
bindParam.setImageStoreId((String)imageStoreId.getData()); List<Callable<Void>> bindBatch = new ArrayList<>();
bindParam.setDeviceId(device.getDeviceId()); for (int j = i; j < end; j++) {
bindParam.setApplicationId(applicationId); final AcsElevatorDeviceResultDTO device = deviceList.get(j);
this.acsDeviceImageStoreAppBindService.bindDeviceAndImageStore(bindParam, context); bindBatch.add(() -> {
} catch (ServiceException e) { FeignThreadLocalUtil.callWithContext(context, () -> {
this.logger.error("图库关联失败,图库id={},原因:{}", imageStoreId.getData(), e.getMessage()); DeviceImageStoreAppBindParam bindParam = new DeviceImageStoreAppBindParam();
ImageStoreDelParam delParam = new ImageStoreDelParam(); bindParam.setImageStoreId(newImageStoreId);
delParam.setId((String)imageStoreId.getData()); bindParam.setDeviceId(device.getDeviceId());
delParam.setBusinessId(context.getCompany().getCompanyId()); bindParam.setApplicationId(applicationId);
this.logger.info("回滚删除图库开始,delParam={},context={}", JSONObject.toJSON(delParam), this.acsDeviceImageStoreAppBindService.bindDeviceAndImageStore(bindParam, context);
JSONObject.toJSON(context)); return null;
CloudwalkResult<Boolean> deleteResult = this.imageStoreService.delete(delParam, context); });
this.logger.info("删除图库:图库id={},结果:{}", imageStoreId, deleteResult.getMessage()); return null;
throw new ServiceException(e.getCode(), e.getMessage()); });
} }
List<Future<Void>> bindFutures;
try {
bindFutures = this.elevatorRemoteBoundedExecutor.getThreadPoolExecutor().invokeAll(bindBatch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ServiceException("76260505", getMessage("76260505"));
}
for (Future<Void> f : bindFutures) {
try {
f.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ServiceException("76260505", getMessage("76260505"));
} catch (ExecutionException e) {
Throwable c = e.getCause();
if (c instanceof ServiceException) {
ServiceException se = (ServiceException)c;
this.logger.error("图库关联失败,图库id={},原因:{}", newImageStoreId, se.getMessage());
rollbackImageStoreAfterBindFailure(newImageStoreId, context);
throw new ServiceException(se.getCode(), se.getMessage());
}
this.logger.error("图库关联失败,图库id={},原因:{}", newImageStoreId, e.getMessage());
rollbackImageStoreAfterBindFailure(newImageStoreId, context);
throw new ServiceException("76260505", getMessage("76260505"));
}
}
i = end;
} }
} }
return (String)imageStoreId.getData(); return (String)imageStoreId.getData();
@@ -471,6 +533,35 @@ public class AcsPassRuleServiceImpl extends AbstractAcsPassService implements Ac
} }
} }
private long pagePersonTotalRowsForZone(String zoneId, CloudwalkCallContext context) throws Exception {
return FeignThreadLocalUtil.callWithContext(context, () -> {
AcsPersonQueryParam personParam = new AcsPersonQueryParam();
personParam.setZoneId(zoneId);
CloudwalkPageInfo pageInfo = new CloudwalkPageInfo(1, 1);
CloudwalkResult<CloudwalkPageAble<AcsPersonResult>> page =
this.acsPersonService.page(personParam, pageInfo, context);
if (!page.isSuccess()) {
this.logger.info("远程调用查询通行人员分页失败,原因:" + page.getMessage());
throw new ServiceException(page.getCode(), page.getMessage());
}
if (ObjectUtils.isEmpty(page.getData())) {
return 0L;
}
return ((CloudwalkPageAble)page.getData()).getTotalRows();
});
}
private void rollbackImageStoreAfterBindFailure(String imageStoreIdValue, CloudwalkCallContext context)
throws ServiceException {
ImageStoreDelParam delParam = new ImageStoreDelParam();
delParam.setId(imageStoreIdValue);
delParam.setBusinessId(context.getCompany().getCompanyId());
this.logger.info("回滚删除图库开始,delParam={},context={}", JSONObject.toJSON(delParam),
JSONObject.toJSON(context));
CloudwalkResult<Boolean> deleteResult = this.imageStoreService.delete(delParam, context);
this.logger.info("删除图库:图库id={},结果:{}", imageStoreIdValue, deleteResult.getMessage());
}
private void getZoneTypeIsThree(ZoneTreeResult zoneTreeResult, List<AcsPassRuleFloorResult> passRuleResults) { private void getZoneTypeIsThree(ZoneTreeResult zoneTreeResult, List<AcsPassRuleFloorResult> passRuleResults) {
if ("FLOOR".equals(zoneTreeResult.getType())) { if ("FLOOR".equals(zoneTreeResult.getType())) {
AcsPassRuleFloorResult result = new AcsPassRuleFloorResult(); AcsPassRuleFloorResult result = new AcsPassRuleFloorResult();
@@ -49,6 +49,7 @@ import cn.cloudwalk.elevator.person.param.AcsPersonTimeDetailParam;
import cn.cloudwalk.elevator.person.result.AcsPersonResult; import cn.cloudwalk.elevator.person.result.AcsPersonResult;
import cn.cloudwalk.elevator.person.result.AcsPersonTimeDetailResult; import cn.cloudwalk.elevator.person.result.AcsPersonTimeDetailResult;
import cn.cloudwalk.elevator.person.service.AcsPersonService; import cn.cloudwalk.elevator.person.service.AcsPersonService;
import cn.cloudwalk.elevator.config.FeignThreadLocalUtil;
import cn.cloudwalk.elevator.util.CollectionUtils; import cn.cloudwalk.elevator.util.CollectionUtils;
import cn.cloudwalk.elevator.util.DateUtils; import cn.cloudwalk.elevator.util.DateUtils;
import cn.cloudwalk.elevator.util.StringUtils; import cn.cloudwalk.elevator.util.StringUtils;
@@ -59,14 +60,21 @@ import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@Service @Service
public class AcsPersonServiceImpl extends AbstractAcsPassService implements AcsPersonService { public class AcsPersonServiceImpl extends AbstractAcsPassService implements AcsPersonService {
private static final int ROWS_OF_PAGE = 1000; private static final int ROWS_OF_PAGE = 1000;
/** 约定 §3.2:与 {@code ninca.elevator.remote-io.pool} 默认 core 对齐,有界并行上界 */
private static final int REMOTE_DELETE_PARALLEL = 6;
@Autowired @Autowired
private BiologyToolService biologyToolService; private BiologyToolService biologyToolService;
@Autowired @Autowired
@@ -83,6 +91,9 @@ public class AcsPersonServiceImpl extends AbstractAcsPassService implements AcsP
private AcsPassRuleService acsPassRuleService; private AcsPassRuleService acsPassRuleService;
@Resource @Resource
private AcsPassRuleDao acsPassRuleDao; private AcsPassRuleDao acsPassRuleDao;
@Autowired
@Qualifier("elevatorRemoteBoundedExecutor")
private ThreadPoolTaskExecutor elevatorRemoteBoundedExecutor;
@CloudwalkParamsValidate @CloudwalkParamsValidate
public CloudwalkResult<Boolean> add(AcsPersonAddParam param, CloudwalkCallContext context) throws ServiceException { public CloudwalkResult<Boolean> add(AcsPersonAddParam param, CloudwalkCallContext context) throws ServiceException {
@@ -162,16 +173,63 @@ public class AcsPersonServiceImpl extends AbstractAcsPassService implements AcsP
throws ServiceException { throws ServiceException {
this.logger.info("删除门禁通行人员开始,AcsPersonDeleteParam=[{}], CloudwalkCallContext=[{}]", this.logger.info("删除门禁通行人员开始,AcsPersonDeleteParam=[{}], CloudwalkCallContext=[{}]",
JSONObject.toJSONString(param), JSONObject.toJSONString(context)); JSONObject.toJSONString(param), JSONObject.toJSONString(context));
for (String personId : param.getPersonIds()) { List<String> personIds = param.getPersonIds();
ImageStorePersonDelParam imageStorePersonDelParam = new ImageStorePersonDelParam(); if (CollectionUtils.isEmpty(personIds)) {
imageStorePersonDelParam.setPersonId(personId); return CloudwalkResult.success(Boolean.valueOf(true));
imageStorePersonDelParam.setImageStoreId(param.getImageStoreId()); }
CloudwalkResult<Boolean> imageStorePersonDeleteResult = if (personIds.size() == 1) {
this.imageStorePersonService.delete(imageStorePersonDelParam, context); return deleteOnePersonFromImageStore(param.getImageStoreId(), personIds.get(0), context);
if (!imageStorePersonDeleteResult.isSuccess()) { }
return CloudwalkResult.fail("76260407", for (int i = 0; i < personIds.size(); i += REMOTE_DELETE_PARALLEL) {
getMessage("76260407") + " " + imageStorePersonDeleteResult.getMessage()); int end = Math.min(i + REMOTE_DELETE_PARALLEL, personIds.size());
List<Callable<CloudwalkResult<Boolean>>> batch = new ArrayList<>();
for (int j = i; j < end; j++) {
final String personId = personIds.get(j);
batch.add(() -> FeignThreadLocalUtil.callWithContext(context,
() -> deleteOnePersonFromImageStore(param.getImageStoreId(), personId, context)));
} }
List<Future<CloudwalkResult<Boolean>>> futures;
try {
futures = this.elevatorRemoteBoundedExecutor.getThreadPoolExecutor().invokeAll(batch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CloudwalkResult.fail("76260407", getMessage("76260407") + " " + e.getMessage());
}
for (Future<CloudwalkResult<Boolean>> future : futures) {
try {
CloudwalkResult<Boolean> r = future.get();
if (!r.isSuccess()) {
return r;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CloudwalkResult.fail("76260407", getMessage("76260407") + " " + e.getMessage());
} catch (ExecutionException e) {
Throwable c = e.getCause();
if (c instanceof ServiceException) {
ServiceException se = (ServiceException)c;
return CloudwalkResult.fail(
se.getCode() != null ? se.getCode() : "76260407",
getMessage("76260407") + " " + se.getMessage());
}
return CloudwalkResult.fail("76260407",
getMessage("76260407") + " " + (c != null ? c.getMessage() : e.getMessage()));
}
}
}
return CloudwalkResult.success(Boolean.valueOf(true));
}
private CloudwalkResult<Boolean> deleteOnePersonFromImageStore(String imageStoreId, String personId,
CloudwalkCallContext context) throws ServiceException {
ImageStorePersonDelParam imageStorePersonDelParam = new ImageStorePersonDelParam();
imageStorePersonDelParam.setPersonId(personId);
imageStorePersonDelParam.setImageStoreId(imageStoreId);
CloudwalkResult<Boolean> imageStorePersonDeleteResult =
this.imageStorePersonService.delete(imageStorePersonDelParam, context);
if (!imageStorePersonDeleteResult.isSuccess()) {
return CloudwalkResult.fail("76260407",
getMessage("76260407") + " " + imageStorePersonDeleteResult.getMessage());
} }
return CloudwalkResult.success(Boolean.valueOf(true)); return CloudwalkResult.success(Boolean.valueOf(true));
} }