mirror of
https://github.com/hpd840321/starRiverProperty.git
synced 2026-06-10 00:40:30 +08:00
dee355b4a7
- artifacts/decompiled 树与相关源码变更 - maven-cw-elevator-application 业务 docs 与 package-info - scripts 下 formatter 校验与辅助脚本 - 其他子工程/接口与发布线一并纳入版本控制 Made-with: Cursor Former-commit-id: e102e8cab64e575bcd23c9a66a598aa1892bb492
427 lines
14 KiB
Java
427 lines
14 KiB
Java
package cn.cloudwalk.event;
|
|
|
|
import cn.cloudwalk.cwos.client.event.EventClient;
|
|
import cn.cloudwalk.cwos.client.event.event.BaseEvent;
|
|
import cn.cloudwalk.cwos.client.event.event.CustomEvent;
|
|
import cn.cloudwalk.cwos.client.event.event.EventType;
|
|
import cn.cloudwalk.cwos.client.event.handler.EventListener;
|
|
import cn.cloudwalk.event.annotation.ConsumerGroup;
|
|
import cn.cloudwalk.event.annotation.CustomTopic;
|
|
import cn.cloudwalk.event.annotation.EventTopicSuffix;
|
|
import cn.cloudwalk.event.autoconfig.EventProperties;
|
|
import cn.cloudwalk.event.handler.CustomEventHandler;
|
|
import cn.cloudwalk.event.handler.EventHandler;
|
|
import cn.cloudwalk.event.handler.EventHandlerMapping;
|
|
import cn.cloudwalk.event.handler.EventHandlerWorker;
|
|
import cn.cloudwalk.event.handler.NamedThreadFactory;
|
|
import cn.cloudwalk.event.listener.CloudwalkEventListener;
|
|
import cn.cloudwalk.event.listener.GroupEventListener;
|
|
import cn.cloudwalk.event.listener.GroupListnerClassMapping;
|
|
import com.google.common.collect.Lists;
|
|
import com.google.common.collect.Sets;
|
|
import java.lang.reflect.ParameterizedType;
|
|
import java.lang.reflect.Type;
|
|
import java.util.ArrayList;
|
|
import java.util.HashMap;
|
|
import java.util.LinkedList;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.concurrent.SynchronousQueue;
|
|
import java.util.concurrent.ThreadFactory;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.regex.Matcher;
|
|
import java.util.regex.Pattern;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.BeansException;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.boot.CommandLineRunner;
|
|
import org.springframework.context.ApplicationContext;
|
|
import org.springframework.core.env.Environment;
|
|
import org.springframework.util.CollectionUtils;
|
|
import org.springframework.util.StringUtils;
|
|
|
|
public class CloudwalkEventInitializing
|
|
implements CommandLineRunner {
|
|
private static final Logger LOGGER = LoggerFactory.getLogger(CloudwalkEventInitializing.class);
|
|
|
|
@Autowired(required = false)
|
|
private List<EventHandler> eventHandlers;
|
|
|
|
@Autowired
|
|
private CloudwalkEventManager cloudwalkEventManager;
|
|
|
|
@Autowired
|
|
private Environment environment;
|
|
|
|
private static final Pattern pattern = Pattern.compile("^\\$\\{(.*?)\\}$");
|
|
|
|
|
|
private static final String DEFAULT_SUFFIX = "";
|
|
|
|
|
|
private String defaultGroup;
|
|
|
|
|
|
private EventClient defaultEventClient;
|
|
|
|
private Map<String, EventClient> eventClientMap = new HashMap<>();
|
|
|
|
private EventProperties eventProperties;
|
|
|
|
private static boolean isHandlerEnable = false;
|
|
|
|
|
|
public void run(String... args) throws Exception {
|
|
init();
|
|
}
|
|
|
|
public void init() throws Exception {
|
|
initEventClient();
|
|
initHandlerMapping();
|
|
initHandlerExecutor();
|
|
subscribeEvent();
|
|
}
|
|
|
|
public CloudwalkEventInitializing(EventProperties eventProperties) {
|
|
this.eventProperties = eventProperties;
|
|
}
|
|
|
|
|
|
private void initEventClient() throws ClassNotFoundException {
|
|
String[] groupIds = this.eventProperties.getGroupId().split(",");
|
|
|
|
if (groupIds.length == 0) {
|
|
throw new IllegalArgumentException("消费组ID不能为空");
|
|
}
|
|
|
|
Map<String, String> listenerClassMap = this.eventProperties.getListenerClass();
|
|
|
|
GroupListnerClassMapping groupListnerClassMapping = new GroupListnerClassMapping();
|
|
|
|
for (int i = 0; i < groupIds.length; i++) {
|
|
Class<? extends EventListener> listenerClass; boolean isFirst = (i == 0);
|
|
String groupId = groupIds[i];
|
|
|
|
|
|
|
|
if (null == listenerClassMap || StringUtils.isEmpty(listenerClassMap.get(groupId))) {
|
|
if (isFirst) {
|
|
Class<CloudwalkEventListener> clazz = CloudwalkEventListener.class;
|
|
} else {
|
|
throw new IllegalArgumentException(String.format("groupId[%s]缺少listener-class的配置", new Object[] { groupId }));
|
|
}
|
|
} else {
|
|
listenerClass = (Class)Class.forName(listenerClassMap.get(groupId));
|
|
}
|
|
|
|
initEventClient(this.eventProperties.getBootstrapServers(), groupId, listenerClass, isFirst);
|
|
groupListnerClassMapping.register(listenerClass, groupId);
|
|
}
|
|
|
|
this.cloudwalkEventManager.setGroupListnerClassMapping(groupListnerClassMapping);
|
|
this.cloudwalkEventManager.setEventClient(this.defaultEventClient);
|
|
}
|
|
|
|
@Autowired
|
|
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
|
GroupEventListener.applicationContext = applicationContext;
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private List<String> extractGroupIds(EventHandler eventHandler) {
|
|
if (eventHandler.getClass().isAnnotationPresent((Class)ConsumerGroup.class)) {
|
|
ConsumerGroup consumerGroup = eventHandler.getClass().<ConsumerGroup>getAnnotation(ConsumerGroup.class);
|
|
|
|
List<String> groupIds = new LinkedList<>();
|
|
for (String groupId : consumerGroup.groupIds()) {
|
|
Matcher matcher = pattern.matcher(groupId);
|
|
if (matcher.find()) {
|
|
String propertyKey = groupId.substring(2, groupId.length() - 1).trim();
|
|
String propertyValue = this.environment.getProperty(propertyKey);
|
|
if (!StringUtils.isEmpty(propertyValue)) {
|
|
groupId = propertyValue;
|
|
}
|
|
}
|
|
groupIds.add(groupId);
|
|
}
|
|
|
|
return groupIds;
|
|
}
|
|
return Lists.newArrayList((Object[])new String[] { this.defaultGroup });
|
|
}
|
|
|
|
|
|
|
|
|
|
private void initHandlerMapping() {
|
|
if (CollectionUtils.isEmpty(this.eventHandlers)) {
|
|
return;
|
|
}
|
|
|
|
isHandlerEnable = true;
|
|
|
|
EventHandlerMapping eventHandlerMapping = new EventHandlerMapping();
|
|
|
|
for (EventHandler<BaseEvent> handler : this.eventHandlers) {
|
|
if (handler.getClass().isAnnotationPresent((Class)EventTopicSuffix.class)) {
|
|
Type type = handler.getClass().getGenericInterfaces()[0];
|
|
ParameterizedType p = (ParameterizedType)type;
|
|
Class<BaseEvent> eventClass = (Class)p.getActualTypeArguments()[0];
|
|
EventTopicSuffix eventTopicSuffix = handler.getClass().<EventTopicSuffix>getAnnotation(EventTopicSuffix.class);
|
|
|
|
registerHandlerMapping(eventHandlerMapping, handler, eventClass, eventTopicSuffix.value()); continue;
|
|
} if (handler.getClass().isAnnotationPresent((Class)CustomTopic.class)) {
|
|
CustomTopic customTopic = handler.getClass().<CustomTopic>getAnnotation(CustomTopic.class);
|
|
String topic = customTopic.topic();
|
|
if (StringUtils.isEmpty(topic)) {
|
|
continue;
|
|
}
|
|
registerCustomHandlerMapping(eventHandlerMapping, (EventHandler)handler, topic, customTopic.suffix());
|
|
}
|
|
}
|
|
this.cloudwalkEventManager.setEventHandlerMapping(eventHandlerMapping);
|
|
}
|
|
|
|
|
|
|
|
|
|
private void initHandlerExecutor() {
|
|
if (isHandlerEnable) {
|
|
EventHandlerWorker eventHandlerWorker = new EventHandlerWorker();
|
|
EventProperties.HandlerExecutorConfig handlerExecutorConfig = this.eventProperties.getHandlerExecutorConfig();
|
|
eventHandlerWorker.setPoolExecutor(new ThreadPoolExecutor(handlerExecutorConfig.getCorePoolSize().intValue(), handlerExecutorConfig
|
|
.getMaximumPoolSize().intValue(), 1000L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), (ThreadFactory)new NamedThreadFactory(this.eventProperties
|
|
.getWorkerNamePrefix()), new ThreadPoolExecutor.CallerRunsPolicy()));
|
|
this.cloudwalkEventManager.setEventHandlerWorker(eventHandlerWorker);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private EventType getEventType(Class<? extends BaseEvent> eventClass) {
|
|
for (EventType eventType : EventType.values()) {
|
|
if (eventType.getEventClass().getClass().equals(eventClass)) {
|
|
return eventType;
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
|
|
|
|
|
|
private void subscribeEvent() throws IllegalAccessException, InstantiationException {
|
|
EventHandlerMapping eventHandlerMapping = this.cloudwalkEventManager.getEventHandlerMapping();
|
|
|
|
Map<String, Map<EventType, Set<String>>> subscribedMap = new HashMap<>();
|
|
|
|
for (Map.Entry<String, Map<EventType, Map<String, List<EventHandler>>>> entry : (Iterable<Map.Entry<String, Map<EventType, Map<String, List<EventHandler>>>>>)eventHandlerMapping.getHandlerMap().entrySet()) {
|
|
for (Map.Entry<EventType, Map<String, List<EventHandler>>> eventTypeMapEntry : (Iterable<Map.Entry<EventType, Map<String, List<EventHandler>>>>)((Map)entry.getValue()).entrySet()) {
|
|
for (String serviceCode : ((Map)eventTypeMapEntry.getValue()).keySet()) {
|
|
subscribe(subscribedMap, entry.getKey(), eventTypeMapEntry.getKey(), serviceCode);
|
|
}
|
|
}
|
|
}
|
|
|
|
Map<String, Map<String, Set<String>>> customSubscribedMap = new HashMap<>();
|
|
|
|
for (Map.Entry<String, Map<String, Map<String, List<CustomEventHandler>>>> entry : (Iterable<Map.Entry<String, Map<String, Map<String, List<CustomEventHandler>>>>>)eventHandlerMapping.getCustomHandlerMap().entrySet()) {
|
|
for (Map.Entry<String, Map<String, List<CustomEventHandler>>> subEntry : (Iterable<Map.Entry<String, Map<String, List<CustomEventHandler>>>>)((Map)entry.getValue()).entrySet()) {
|
|
for (Map.Entry<String, List<CustomEventHandler>> handlerEntry : (Iterable<Map.Entry<String, List<CustomEventHandler>>>)((Map)subEntry.getValue()).entrySet()) {
|
|
for (CustomEventHandler customEventHandler : handlerEntry.getValue()) {
|
|
Type type = customEventHandler.getClass().getGenericInterfaces()[0];
|
|
ParameterizedType p = (ParameterizedType)type;
|
|
Class<? extends CustomEvent> eventClass = (Class)p.getActualTypeArguments()[0];
|
|
customSubscribe(customSubscribedMap, entry.getKey(), subEntry.getKey(), handlerEntry
|
|
.getKey(), eventClass);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void subscribe(Map<String, Map<EventType, Set<String>>> subscribedMap, String groupId, EventType eventType, String serviceCode) {
|
|
if (StringUtils.isEmpty(groupId)) {
|
|
groupId = this.defaultGroup;
|
|
}
|
|
|
|
Map<EventType, Set<String>> eventTypeServiceCodeMap = subscribedMap.get(groupId);
|
|
if (null != eventTypeServiceCodeMap) {
|
|
Set<String> serviceCodesSet = eventTypeServiceCodeMap.get(eventType);
|
|
if (CollectionUtils.isEmpty(serviceCodesSet)) {
|
|
eventTypeServiceCodeMap.put(eventType, Sets.newHashSet((Object[])new String[] { serviceCode }));
|
|
} else {
|
|
serviceCodesSet.add(serviceCode);
|
|
}
|
|
} else {
|
|
eventTypeServiceCodeMap = new HashMap<>();
|
|
eventTypeServiceCodeMap.put(eventType, Sets.newHashSet((Object[])new String[] { serviceCode }));
|
|
subscribedMap.put(groupId, eventTypeServiceCodeMap);
|
|
}
|
|
((EventClient)this.eventClientMap.get(groupId)).subEvent(eventType, serviceCode);
|
|
}
|
|
|
|
|
|
|
|
private void customSubscribe(Map<String, Map<String, Set<String>>> customSubscribedMap, String groupId, String topic, String serviceCode, Class<? extends CustomEvent> eventClass) throws IllegalAccessException, InstantiationException {
|
|
if (StringUtils.isEmpty(groupId)) {
|
|
groupId = this.defaultGroup;
|
|
}
|
|
|
|
Map<String, Set<String>> topicServiceCodeMap = customSubscribedMap.get(groupId);
|
|
if (null != topicServiceCodeMap) {
|
|
Set<String> serviceCodesSet = topicServiceCodeMap.get(topic);
|
|
if (CollectionUtils.isEmpty(serviceCodesSet)) {
|
|
topicServiceCodeMap.put(topic, Sets.newHashSet((Object[])new String[] { serviceCode }));
|
|
} else {
|
|
serviceCodesSet.add(serviceCode);
|
|
}
|
|
} else {
|
|
topicServiceCodeMap = new HashMap<>();
|
|
topicServiceCodeMap.put(topic, Sets.newHashSet((Object[])new String[] { serviceCode }));
|
|
customSubscribedMap.put(groupId, topicServiceCodeMap);
|
|
}
|
|
EventClient eventClient = this.eventClientMap.get(groupId);
|
|
if (null == eventClient) {
|
|
throw new IllegalArgumentException(String.format("没有找到groupId[%s]对应的配置,请检查 ${cloudwalk.event.group-id} 配置", new Object[] { groupId }));
|
|
}
|
|
|
|
eventClient.subEvent(topic, serviceCode, eventClass.newInstance());
|
|
}
|
|
|
|
|
|
|
|
private void initEventClient(String bootstrapServers, String groupId, Class<? extends EventListener> listenerClass, boolean isDefault) {
|
|
EventClient eventClient = this.eventClientMap.get(groupId);
|
|
|
|
if (null == eventClient) {
|
|
eventClient = EventClient.getInstance(bootstrapServers, groupId);
|
|
eventClient.init();
|
|
if (null != this.eventProperties.getFetchDataWorkerNumber()) {
|
|
eventClient.setWorkNum(this.eventProperties.getFetchDataWorkerNumber());
|
|
}
|
|
eventClient.setListenerClass(listenerClass);
|
|
|
|
this.eventClientMap.put(groupId, eventClient);
|
|
}
|
|
|
|
if (isDefault) {
|
|
this.defaultGroup = groupId;
|
|
this.defaultEventClient = eventClient;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private <E extends BaseEvent> void registerHandlerMapping(EventHandlerMapping eventHandlerMapping, EventHandler<E> handler, Class<E> eventClass, String[] suffixes) {
|
|
List<String> groupIds = extractGroupIds(handler);
|
|
EventType eventType = getEventType(eventClass);
|
|
|
|
for (String groupId : groupIds) {
|
|
Map<String, List<EventHandler>> handlers = eventHandlerMapping.getServiceCodeHandlerListMap(groupId, eventType);
|
|
|
|
if (CollectionUtils.isEmpty(handlers)) {
|
|
handlers = new HashMap<>();
|
|
if (null == suffixes || suffixes.length == 0) {
|
|
handlers.put("", Lists.newArrayList((Object[])new EventHandler[] { handler }));
|
|
} else {
|
|
for (String suffix : suffixes) {
|
|
handlers.put(suffix, Lists.newArrayList((Object[])new EventHandler[] { handler }));
|
|
}
|
|
}
|
|
|
|
eventHandlerMapping.registerHandlers(groupId, eventType, handlers); continue;
|
|
}
|
|
if (null == suffixes || suffixes.length == 0) {
|
|
List<EventHandler> currentHandlers = handlers.get("");
|
|
if (CollectionUtils.isEmpty(currentHandlers)) {
|
|
currentHandlers = new ArrayList<>();
|
|
}
|
|
currentHandlers.add(handler);
|
|
handlers.put("", currentHandlers); continue;
|
|
}
|
|
for (String suffix : suffixes) {
|
|
List<EventHandler> currentHandlers = handlers.get(suffix);
|
|
if (CollectionUtils.isEmpty(currentHandlers)) {
|
|
currentHandlers = new ArrayList<>();
|
|
}
|
|
currentHandlers.add(handler);
|
|
handlers.put(suffix, currentHandlers);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void registerCustomHandlerMapping(EventHandlerMapping eventHandlerMapping, EventHandler<? extends CustomEvent> handler, String topic, String serviceCode) {
|
|
List<String> groupIds = extractGroupIds(handler);
|
|
|
|
for (String groupId : groupIds) {
|
|
|
|
|
|
Map<String, List<CustomEventHandler>> customHandlers = eventHandlerMapping.getServiceCodeCustomHandlerListMap(groupId, topic);
|
|
|
|
if (null == customHandlers) {
|
|
customHandlers = new HashMap<>();
|
|
if (StringUtils.isEmpty(topic)) {
|
|
customHandlers.put("", Lists.newArrayList((Object[])new CustomEventHandler[] { (CustomEventHandler)handler }));
|
|
} else {
|
|
customHandlers.put(serviceCode, Lists.newArrayList((Object[])new CustomEventHandler[] { (CustomEventHandler)handler }));
|
|
}
|
|
eventHandlerMapping.registerCustomHandlers(groupId, topic, customHandlers); continue;
|
|
}
|
|
if (StringUtils.isEmpty(serviceCode)) {
|
|
List<CustomEventHandler> list = customHandlers.get("");
|
|
if (CollectionUtils.isEmpty(list)) {
|
|
list = new ArrayList<>();
|
|
}
|
|
list.add((CustomEventHandler)handler);
|
|
customHandlers.put("", list); continue;
|
|
}
|
|
List<CustomEventHandler> currentHandlers = customHandlers.get(serviceCode);
|
|
if (CollectionUtils.isEmpty(currentHandlers)) {
|
|
currentHandlers = new ArrayList<>();
|
|
}
|
|
currentHandlers.add((CustomEventHandler)handler);
|
|
customHandlers.put(serviceCode, currentHandlers);
|
|
}
|
|
}
|
|
}
|
|
|
|
|