fix(v0.11): 第七轮走查——过滤器异常透传、Kafka 生产者泛型与反射实例化

- CloudwalkContextParameterFilter:去掉对 chain.doFilter 的吞异常 catch,仅 finally 清理 Session,避免下游错误被静默吞掉。
- KafkaProducter:KafkaProducer/ProducerRecord 使用 String 泛型;send 遇 InterruptedException 时恢复中断标志。
- BeanCopyUtils、ServerIdStrategyBeanConfig:newInstance 改为 getDeclaredConstructor().newInstance()。

对应 docs/reviews 05 中 P1;maven-cloudwalk-legacy-public 已全量 compile 通过。

Made-with: Cursor
This commit is contained in:
反编译工作区
2026-04-25 00:18:52 +08:00
parent 2d9fa1c9de
commit ef030407c0
4 changed files with 9 additions and 13 deletions
@@ -33,7 +33,7 @@ public class BeanCopyUtils
public static <T> T copyProperties(Object source, Class<T> targetClazz) { public static <T> T copyProperties(Object source, Class<T> targetClazz) {
/* 34 */ T target = null; /* 34 */ T target = null;
try { try {
/* 36 */ target = targetClazz.newInstance(); /* 36 */ target = targetClazz.getDeclaredConstructor().newInstance();
/* 37 */ BeanUtils.copyProperties(source, target); /* 37 */ BeanUtils.copyProperties(source, target);
/* 38 */ } catch (Exception e) { /* 38 */ } catch (Exception e) {
/* 39 */ throw new RuntimeException(e); /* 39 */ throw new RuntimeException(e);
@@ -101,7 +101,7 @@ if (!CollectionUtils.isEmpty(list)) {
for (V source : list) { for (V source : list) {
E target = null; E target = null;
try { try {
target = clazz.newInstance(); target = clazz.getDeclaredConstructor().newInstance();
BeanUtils.copyProperties(source, target); BeanUtils.copyProperties(source, target);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@@ -32,7 +32,7 @@ ServerIdStrategyEnum.getServerIdStrategyEnum(serialProperties.getServerIdStrateg
beanClassName = Class.forName(ServerIdStrategyEnum.getServerIdStrategyEnum(serialProperties.getServerIdStrategy().getCode()).getBeanName()); beanClassName = Class.forName(ServerIdStrategyEnum.getServerIdStrategyEnum(serialProperties.getServerIdStrategy().getCode()).getBeanName());
} }
} }
return (ServerIdStrategy)beanClassName.newInstance(); return (ServerIdStrategy)beanClassName.getDeclaredConstructor().newInstance();
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@@ -9,8 +9,6 @@ import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse; import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@@ -34,7 +32,6 @@ import org.springframework.web.filter.OncePerRequestFilter;
public class CloudwalkContextParameterFilter public class CloudwalkContextParameterFilter
extends OncePerRequestFilter extends OncePerRequestFilter
{ {
private final Logger logger = LoggerFactory.getLogger(getClass());
@@ -75,8 +72,6 @@ this.cloudwalkSessionContextHolder.putSession(sessionObject);
try { try {
chain.doFilter((ServletRequest)request, (ServletResponse)response); chain.doFilter((ServletRequest)request, (ServletResponse)response);
} catch (Exception e) {
this.logger.error("参数注册过滤器失败,原因:", e);
} finally { } finally {
this.cloudwalkSessionContextHolder.clearSession(); this.cloudwalkSessionContextHolder.clearSession();
} }
@@ -10,7 +10,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
@@ -25,7 +24,7 @@ public class KafkaProducter
implements MessageProducer implements MessageProducer
{ {
private KafkaProperties kafkaProperties; private KafkaProperties kafkaProperties;
private Producer producer; private KafkaProducer<String, String> producer;
private KafkaProducter(KafkaProperties kafkaProperties) { private KafkaProducter(KafkaProperties kafkaProperties) {
Properties properties = new Properties(); Properties properties = new Properties();
@@ -49,7 +48,7 @@ properties.put("buffer.memory", Long.valueOf(kafkaProperties.getBufferMemory()))
properties.put("key.serializer", kafkaProperties.getKeySerializer()); properties.put("key.serializer", kafkaProperties.getKeySerializer());
properties.put("value.serializer", kafkaProperties.getValueSerializer()); properties.put("value.serializer", kafkaProperties.getValueSerializer());
this.kafkaProperties = kafkaProperties; this.kafkaProperties = kafkaProperties;
this.producer = (Producer)new KafkaProducer(properties); this.producer = new KafkaProducer<>(properties);
} }
public static KafkaProducter getInstance(KafkaProperties kafkaProperties) { public static KafkaProducter getInstance(KafkaProperties kafkaProperties) {
@@ -60,13 +59,15 @@ return new KafkaProducter(kafkaProperties);
public void send(BaseEvent event, String key, String topic) { public void send(BaseEvent event, String key, String topic) {
Integer deviceIdHashCode = Integer.valueOf((event.getDeviceId() != null) ? event.getDeviceId().hashCode() : 0); Integer deviceIdHashCode = Integer.valueOf((event.getDeviceId() != null) ? event.getDeviceId().hashCode() : 0);
Integer partition = Integer.valueOf(deviceIdHashCode.intValue() % this.producer.partitionsFor(topic).size()); Integer partition = Integer.valueOf(deviceIdHashCode.intValue() % this.producer.partitionsFor(topic).size());
Future<RecordMetadata> send = this.producer.send(new ProducerRecord(topic, partition, key, JSONObject.toJSONString(event))); Future<RecordMetadata> send =
this.producer.send(new ProducerRecord<>(topic, partition, key, JSONObject.toJSONString(event)));
try { try {
send.get(1000L, TimeUnit.MILLISECONDS); send.get(1000L, TimeUnit.MILLISECONDS);
if (!send.isDone()) { if (!send.isDone()) {
throw new SendFailedException("failed to send kafka"); throw new SendFailedException("failed to send kafka");
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SendFailedException("failed to send kafka InterruptedException" + e.getMessage()); throw new SendFailedException("failed to send kafka InterruptedException" + e.getMessage());
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw new SendFailedException("failed to send kafka ExecutionException" + e.getMessage()); throw new SendFailedException("failed to send kafka ExecutionException" + e.getMessage());