elevator(P1): bounded parallel delete, listFloor page, bind; Feign callWithContext

Made-with: Cursor
This commit is contained in:
反编译工作区
2026-04-25 08:29:53 +08:00
parent 7bcb72dc53
commit fe571aaadc
6 changed files with 321 additions and 49 deletions
@@ -5,6 +5,7 @@ import cn.cloudwalk.cloud.session.extend.DefaultExtendContext;
import cn.cloudwalk.elevator.context.CloudWalkExtendContextValue;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.lang3.StringUtils;
@@ -76,4 +77,22 @@ public class FeignThreadLocalUtil {
public static void setRequestHeader(CloudwalkCallContext context) {
set(getDefaultRequestHeader(context));
}
/**
* 在有界线程池等子线程中调用 Feign 前,必须为当前线程设置与 {@code context} 一致的请求头 ThreadLocal
* 调用结束后恢复/清理,避免池化线程泄漏或串扰。
*/
public static <T> T callWithContext(CloudwalkCallContext context, Callable<T> action) throws Exception {
Map<String, String> previous = get();
try {
setRequestHeader(context);
return action.call();
} finally {
if (previous != null) {
set(previous);
} else {
remove();
}
}
}
}
@@ -0,0 +1,27 @@
package cn.cloudwalk.elevator.common;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class ElevatorRemoteIoExecutorConfig {
@Autowired
private ElevatorRemoteIoPoolProperties elevatorRemoteIoPoolProperties;
@Bean(name = {"elevatorRemoteBoundedExecutor"})
public ThreadPoolTaskExecutor elevatorRemoteBoundedExecutor() {
ThreadPoolTaskExecutor ex = new ThreadPoolTaskExecutor();
ex.setCorePoolSize(this.elevatorRemoteIoPoolProperties.getCorePoolSize());
ex.setMaxPoolSize(this.elevatorRemoteIoPoolProperties.getMaxPoolSize());
ex.setQueueCapacity(this.elevatorRemoteIoPoolProperties.getQueueCapacity());
ex.setKeepAliveSeconds(this.elevatorRemoteIoPoolProperties.getKeepAliveSeconds());
ex.setAllowCoreThreadTimeOut(this.elevatorRemoteIoPoolProperties.isAllowCoreThreadTimeOut());
ex.setThreadNamePrefix("elevator-remote-io-");
ex.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
ex.initialize();
return ex;
}
}
@@ -0,0 +1,55 @@
package cn.cloudwalk.elevator.common;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "ninca.elevator.remote-io.pool")
public class ElevatorRemoteIoPoolProperties {
/** 约定 §3.2 / §3.3 / §3.4:有界并行建议 4~8,默认 6 */
private int corePoolSize = 6;
private int maxPoolSize = 6;
private int queueCapacity = 512;
private int keepAliveSeconds = 60;
private boolean allowCoreThreadTimeOut = true;
public int getCorePoolSize() {
return this.corePoolSize;
}
public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}
public int getMaxPoolSize() {
return this.maxPoolSize;
}
public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
public int getQueueCapacity() {
return this.queueCapacity;
}
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}
public int getKeepAliveSeconds() {
return this.keepAliveSeconds;
}
public void setKeepAliveSeconds(int keepAliveSeconds) {
this.keepAliveSeconds = keepAliveSeconds;
}
public boolean isAllowCoreThreadTimeOut() {
return this.allowCoreThreadTimeOut;
}
public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
}
}
@@ -18,6 +18,7 @@ import cn.cloudwalk.cloud.page.CloudwalkPageInfo;
import cn.cloudwalk.cloud.result.CloudwalkResult;
import cn.cloudwalk.cloud.utils.BeanCopyUtils;
import cn.cloudwalk.elevator.common.service.AcsApplicationService;
import cn.cloudwalk.elevator.config.FeignThreadLocalUtil;
import cn.cloudwalk.elevator.device.dao.AcsElevatorDeviceDao;
import cn.cloudwalk.elevator.device.dto.AcsElevatorDeviceListDto;
import cn.cloudwalk.elevator.device.dto.AcsElevatorDeviceResultDTO;
@@ -59,9 +60,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@@ -69,6 +75,8 @@ import org.springframework.util.ObjectUtils;
@Service
public class AcsPassRuleServiceImpl extends AbstractAcsPassService implements AcsPassRuleService {
/** 约定 §3.3 / §3.4:与 {@code ninca.elevator.remote-io.pool} 默认 core 对齐 */
private static final int REMOTE_IO_PARALLEL = 6;
@Resource
private ImageStoreService imageStoreService;
@Resource
@@ -87,6 +95,9 @@ public class AcsPassRuleServiceImpl extends AbstractAcsPassService implements Ac
private AcsDeviceImageStoreAppBindService acsDeviceImageStoreAppBindService;
@Resource
private AcsElevatorDeviceDao acsElevatorDeviceDao;
@Autowired
@Qualifier("elevatorRemoteBoundedExecutor")
private ThreadPoolTaskExecutor elevatorRemoteBoundedExecutor;
public CloudwalkResult<List<AcsPassRuleFloorResult>> listFloor(AcsPassRuleFloorParam param,
CloudwalkCallContext context) throws ServiceException {
@@ -112,21 +123,45 @@ public class AcsPassRuleServiceImpl extends AbstractAcsPassService implements Ac
dto.setCurrentFloorId(passRuleResult.getId());
List<AcsElevatorDeviceResultDTO> deviceList = this.acsElevatorDeviceDao.listByZoneId(dto);
passRuleResult.setDeviceNumber(Integer.valueOf(deviceList.size()));
AcsPersonQueryParam personParam = new AcsPersonQueryParam();
personParam.setZoneId(passRuleResult.getId());
CloudwalkPageInfo pageInfo = new CloudwalkPageInfo(1, 1);
CloudwalkResult<CloudwalkPageAble<AcsPersonResult>> page =
this.acsPersonService.page(personParam, pageInfo, context);
if (!page.isSuccess()) {
this.logger.info("远程调用查询通行人员分页失败,原因:" + page.getMessage());
throw new ServiceException(page.getCode(), page.getMessage());
}
int floorCount = passRuleResults.size();
long[] personTotals = new long[floorCount];
for (int i = 0; i < floorCount; ) {
int end = Math.min(i + REMOTE_IO_PARALLEL, floorCount);
List<Callable<Void>> batch = new ArrayList<>();
for (int j = i; j < end; j++) {
final int idx = j;
final String zoneId = passRuleResults.get(j).getId();
batch.add(() -> {
personTotals[idx] = pagePersonTotalRowsForZone(zoneId, context);
return null;
});
}
if (ObjectUtils.isEmpty(page.getData())) {
passRuleResult.setPersonNumber(Long.valueOf(0L));
} else {
passRuleResult.setPersonNumber(
Long.valueOf(((CloudwalkPageAble)page.getData()).getTotalRows()));
List<Future<Void>> floorFutures;
try {
floorFutures = this.elevatorRemoteBoundedExecutor.getThreadPoolExecutor().invokeAll(batch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ServiceException("76260520", getMessage("76260520"));
}
for (Future<Void> fu : floorFutures) {
try {
fu.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ServiceException("76260520", getMessage("76260520"));
} catch (ExecutionException e) {
Throwable c = e.getCause();
if (c instanceof ServiceException) {
throw (ServiceException)c;
}
throw new ServiceException("76260520", getMessage("76260520"));
}
}
i = end;
}
for (int k = 0; k < floorCount; k++) {
passRuleResults.get(k).setPersonNumber(Long.valueOf(personTotals[k]));
}
return CloudwalkResult.success(passRuleResults);
} catch (DataAccessException e) {
@@ -201,24 +236,51 @@ public class AcsPassRuleServiceImpl extends AbstractAcsPassService implements Ac
appBindParam.setApplicationId(applicationId);
this.acsDeviceImageStoreAppBindService.bindAppImageStoreDevice(appBindParam, context);
if (!CollectionUtils.isEmpty(deviceList)) {
for (AcsElevatorDeviceResultDTO device : deviceList) {
try {
DeviceImageStoreAppBindParam bindParam = new DeviceImageStoreAppBindParam();
bindParam.setImageStoreId((String)imageStoreId.getData());
bindParam.setDeviceId(device.getDeviceId());
bindParam.setApplicationId(applicationId);
this.acsDeviceImageStoreAppBindService.bindDeviceAndImageStore(bindParam, context);
} catch (ServiceException e) {
this.logger.error("图库关联失败,图库id={},原因:{}", imageStoreId.getData(), e.getMessage());
ImageStoreDelParam delParam = new ImageStoreDelParam();
delParam.setId((String)imageStoreId.getData());
delParam.setBusinessId(context.getCompany().getCompanyId());
this.logger.info("回滚删除图库开始,delParam={},context={}", JSONObject.toJSON(delParam),
JSONObject.toJSON(context));
CloudwalkResult<Boolean> deleteResult = this.imageStoreService.delete(delParam, context);
this.logger.info("删除图库:图库id={},结果:{}", imageStoreId, deleteResult.getMessage());
throw new ServiceException(e.getCode(), e.getMessage());
final String newImageStoreId = (String)imageStoreId.getData();
for (int i = 0; i < deviceList.size(); ) {
int end = Math.min(i + REMOTE_IO_PARALLEL, deviceList.size());
List<Callable<Void>> bindBatch = new ArrayList<>();
for (int j = i; j < end; j++) {
final AcsElevatorDeviceResultDTO device = deviceList.get(j);
bindBatch.add(() -> {
FeignThreadLocalUtil.callWithContext(context, () -> {
DeviceImageStoreAppBindParam bindParam = new DeviceImageStoreAppBindParam();
bindParam.setImageStoreId(newImageStoreId);
bindParam.setDeviceId(device.getDeviceId());
bindParam.setApplicationId(applicationId);
this.acsDeviceImageStoreAppBindService.bindDeviceAndImageStore(bindParam, context);
return null;
});
return null;
});
}
List<Future<Void>> bindFutures;
try {
bindFutures = this.elevatorRemoteBoundedExecutor.getThreadPoolExecutor().invokeAll(bindBatch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ServiceException("76260505", getMessage("76260505"));
}
for (Future<Void> f : bindFutures) {
try {
f.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ServiceException("76260505", getMessage("76260505"));
} catch (ExecutionException e) {
Throwable c = e.getCause();
if (c instanceof ServiceException) {
ServiceException se = (ServiceException)c;
this.logger.error("图库关联失败,图库id={},原因:{}", newImageStoreId, se.getMessage());
rollbackImageStoreAfterBindFailure(newImageStoreId, context);
throw new ServiceException(se.getCode(), se.getMessage());
}
this.logger.error("图库关联失败,图库id={},原因:{}", newImageStoreId, e.getMessage());
rollbackImageStoreAfterBindFailure(newImageStoreId, context);
throw new ServiceException("76260505", getMessage("76260505"));
}
}
i = end;
}
}
return (String)imageStoreId.getData();
@@ -471,6 +533,35 @@ public class AcsPassRuleServiceImpl extends AbstractAcsPassService implements Ac
}
}
private long pagePersonTotalRowsForZone(String zoneId, CloudwalkCallContext context) throws Exception {
return FeignThreadLocalUtil.callWithContext(context, () -> {
AcsPersonQueryParam personParam = new AcsPersonQueryParam();
personParam.setZoneId(zoneId);
CloudwalkPageInfo pageInfo = new CloudwalkPageInfo(1, 1);
CloudwalkResult<CloudwalkPageAble<AcsPersonResult>> page =
this.acsPersonService.page(personParam, pageInfo, context);
if (!page.isSuccess()) {
this.logger.info("远程调用查询通行人员分页失败,原因:" + page.getMessage());
throw new ServiceException(page.getCode(), page.getMessage());
}
if (ObjectUtils.isEmpty(page.getData())) {
return 0L;
}
return ((CloudwalkPageAble)page.getData()).getTotalRows();
});
}
private void rollbackImageStoreAfterBindFailure(String imageStoreIdValue, CloudwalkCallContext context)
throws ServiceException {
ImageStoreDelParam delParam = new ImageStoreDelParam();
delParam.setId(imageStoreIdValue);
delParam.setBusinessId(context.getCompany().getCompanyId());
this.logger.info("回滚删除图库开始,delParam={},context={}", JSONObject.toJSON(delParam),
JSONObject.toJSON(context));
CloudwalkResult<Boolean> deleteResult = this.imageStoreService.delete(delParam, context);
this.logger.info("删除图库:图库id={},结果:{}", imageStoreIdValue, deleteResult.getMessage());
}
private void getZoneTypeIsThree(ZoneTreeResult zoneTreeResult, List<AcsPassRuleFloorResult> passRuleResults) {
if ("FLOOR".equals(zoneTreeResult.getType())) {
AcsPassRuleFloorResult result = new AcsPassRuleFloorResult();
@@ -49,6 +49,7 @@ import cn.cloudwalk.elevator.person.param.AcsPersonTimeDetailParam;
import cn.cloudwalk.elevator.person.result.AcsPersonResult;
import cn.cloudwalk.elevator.person.result.AcsPersonTimeDetailResult;
import cn.cloudwalk.elevator.person.service.AcsPersonService;
import cn.cloudwalk.elevator.config.FeignThreadLocalUtil;
import cn.cloudwalk.elevator.util.CollectionUtils;
import cn.cloudwalk.elevator.util.DateUtils;
import cn.cloudwalk.elevator.util.StringUtils;
@@ -59,14 +60,21 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
@Service
public class AcsPersonServiceImpl extends AbstractAcsPassService implements AcsPersonService {
private static final int ROWS_OF_PAGE = 1000;
/** 约定 §3.2:与 {@code ninca.elevator.remote-io.pool} 默认 core 对齐,有界并行上界 */
private static final int REMOTE_DELETE_PARALLEL = 6;
@Autowired
private BiologyToolService biologyToolService;
@Autowired
@@ -83,6 +91,9 @@ public class AcsPersonServiceImpl extends AbstractAcsPassService implements AcsP
private AcsPassRuleService acsPassRuleService;
@Resource
private AcsPassRuleDao acsPassRuleDao;
@Autowired
@Qualifier("elevatorRemoteBoundedExecutor")
private ThreadPoolTaskExecutor elevatorRemoteBoundedExecutor;
@CloudwalkParamsValidate
public CloudwalkResult<Boolean> add(AcsPersonAddParam param, CloudwalkCallContext context) throws ServiceException {
@@ -162,16 +173,63 @@ public class AcsPersonServiceImpl extends AbstractAcsPassService implements AcsP
throws ServiceException {
this.logger.info("删除门禁通行人员开始,AcsPersonDeleteParam=[{}], CloudwalkCallContext=[{}]",
JSONObject.toJSONString(param), JSONObject.toJSONString(context));
for (String personId : param.getPersonIds()) {
ImageStorePersonDelParam imageStorePersonDelParam = new ImageStorePersonDelParam();
imageStorePersonDelParam.setPersonId(personId);
imageStorePersonDelParam.setImageStoreId(param.getImageStoreId());
CloudwalkResult<Boolean> imageStorePersonDeleteResult =
this.imageStorePersonService.delete(imageStorePersonDelParam, context);
if (!imageStorePersonDeleteResult.isSuccess()) {
return CloudwalkResult.fail("76260407",
getMessage("76260407") + " " + imageStorePersonDeleteResult.getMessage());
List<String> personIds = param.getPersonIds();
if (CollectionUtils.isEmpty(personIds)) {
return CloudwalkResult.success(Boolean.valueOf(true));
}
if (personIds.size() == 1) {
return deleteOnePersonFromImageStore(param.getImageStoreId(), personIds.get(0), context);
}
for (int i = 0; i < personIds.size(); i += REMOTE_DELETE_PARALLEL) {
int end = Math.min(i + REMOTE_DELETE_PARALLEL, personIds.size());
List<Callable<CloudwalkResult<Boolean>>> batch = new ArrayList<>();
for (int j = i; j < end; j++) {
final String personId = personIds.get(j);
batch.add(() -> FeignThreadLocalUtil.callWithContext(context,
() -> deleteOnePersonFromImageStore(param.getImageStoreId(), personId, context)));
}
List<Future<CloudwalkResult<Boolean>>> futures;
try {
futures = this.elevatorRemoteBoundedExecutor.getThreadPoolExecutor().invokeAll(batch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CloudwalkResult.fail("76260407", getMessage("76260407") + " " + e.getMessage());
}
for (Future<CloudwalkResult<Boolean>> future : futures) {
try {
CloudwalkResult<Boolean> r = future.get();
if (!r.isSuccess()) {
return r;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CloudwalkResult.fail("76260407", getMessage("76260407") + " " + e.getMessage());
} catch (ExecutionException e) {
Throwable c = e.getCause();
if (c instanceof ServiceException) {
ServiceException se = (ServiceException)c;
return CloudwalkResult.fail(
se.getCode() != null ? se.getCode() : "76260407",
getMessage("76260407") + " " + se.getMessage());
}
return CloudwalkResult.fail("76260407",
getMessage("76260407") + " " + (c != null ? c.getMessage() : e.getMessage()));
}
}
}
return CloudwalkResult.success(Boolean.valueOf(true));
}
private CloudwalkResult<Boolean> deleteOnePersonFromImageStore(String imageStoreId, String personId,
CloudwalkCallContext context) throws ServiceException {
ImageStorePersonDelParam imageStorePersonDelParam = new ImageStorePersonDelParam();
imageStorePersonDelParam.setPersonId(personId);
imageStorePersonDelParam.setImageStoreId(imageStoreId);
CloudwalkResult<Boolean> imageStorePersonDeleteResult =
this.imageStorePersonService.delete(imageStorePersonDelParam, context);
if (!imageStorePersonDeleteResult.isSuccess()) {
return CloudwalkResult.fail("76260407",
getMessage("76260407") + " " + imageStorePersonDeleteResult.getMessage());
}
return CloudwalkResult.success(Boolean.valueOf(true));
}