mirror of
https://github.com/hpd840321/starRiverProperty.git
synced 2026-06-09 08:20:31 +08:00
fix(v0.11): 第七轮走查——过滤器异常透传、Kafka 生产者泛型与反射实例化
- CloudwalkContextParameterFilter:去掉对 chain.doFilter 的吞异常 catch,仅 finally 清理 Session,避免下游错误被静默吞掉。
- KafkaProducter:KafkaProducer/ProducerRecord 使用 String 泛型;send 遇 InterruptedException 时恢复中断标志。
- BeanCopyUtils、ServerIdStrategyBeanConfig:newInstance 改为 getDeclaredConstructor().newInstance()。
对应 docs/reviews 05 中 P1;maven-cloudwalk-legacy-public 已全量 compile 通过。
Made-with: Cursor
Former-commit-id: ef030407c0
This commit is contained in:
+5
-4
@@ -10,7 +10,6 @@ import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
|
||||
@@ -25,7 +24,7 @@ public class KafkaProducter
|
||||
implements MessageProducer
|
||||
{
|
||||
private KafkaProperties kafkaProperties;
|
||||
private Producer producer;
|
||||
private KafkaProducer<String, String> producer;
|
||||
|
||||
private KafkaProducter(KafkaProperties kafkaProperties) {
|
||||
Properties properties = new Properties();
|
||||
@@ -49,7 +48,7 @@ properties.put("buffer.memory", Long.valueOf(kafkaProperties.getBufferMemory()))
|
||||
properties.put("key.serializer", kafkaProperties.getKeySerializer());
|
||||
properties.put("value.serializer", kafkaProperties.getValueSerializer());
|
||||
this.kafkaProperties = kafkaProperties;
|
||||
this.producer = (Producer)new KafkaProducer(properties);
|
||||
this.producer = new KafkaProducer<>(properties);
|
||||
}
|
||||
|
||||
public static KafkaProducter getInstance(KafkaProperties kafkaProperties) {
|
||||
@@ -60,13 +59,15 @@ return new KafkaProducter(kafkaProperties);
|
||||
public void send(BaseEvent event, String key, String topic) {
|
||||
Integer deviceIdHashCode = Integer.valueOf((event.getDeviceId() != null) ? event.getDeviceId().hashCode() : 0);
|
||||
Integer partition = Integer.valueOf(deviceIdHashCode.intValue() % this.producer.partitionsFor(topic).size());
|
||||
Future<RecordMetadata> send = this.producer.send(new ProducerRecord(topic, partition, key, JSONObject.toJSONString(event)));
|
||||
Future<RecordMetadata> send =
|
||||
this.producer.send(new ProducerRecord<>(topic, partition, key, JSONObject.toJSONString(event)));
|
||||
try {
|
||||
send.get(1000L, TimeUnit.MILLISECONDS);
|
||||
if (!send.isDone()) {
|
||||
throw new SendFailedException("failed to send kafka");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new SendFailedException("failed to send kafka InterruptedException" + e.getMessage());
|
||||
} catch (ExecutionException e) {
|
||||
throw new SendFailedException("failed to send kafka ExecutionException" + e.getMessage());
|
||||
|
||||
Reference in New Issue
Block a user