/* */ package cn.cloudwalk.event; /* */ /* */ import cn.cloudwalk.cwos.client.event.EventClient; /* */ import cn.cloudwalk.cwos.client.event.event.BaseEvent; /* */ import cn.cloudwalk.cwos.client.event.event.CustomEvent; /* */ import cn.cloudwalk.cwos.client.event.event.EventType; /* */ import cn.cloudwalk.cwos.client.event.handler.EventListener; /* */ import cn.cloudwalk.event.annotation.ConsumerGroup; /* */ import cn.cloudwalk.event.annotation.CustomTopic; /* */ import cn.cloudwalk.event.annotation.EventTopicSuffix; /* */ import cn.cloudwalk.event.autoconfig.EventProperties; /* */ import cn.cloudwalk.event.handler.CustomEventHandler; /* */ import cn.cloudwalk.event.handler.EventHandler; /* */ import cn.cloudwalk.event.handler.EventHandlerMapping; /* */ import cn.cloudwalk.event.handler.EventHandlerWorker; /* */ import cn.cloudwalk.event.handler.NamedThreadFactory; /* */ import cn.cloudwalk.event.listener.CloudwalkEventListener; /* */ import cn.cloudwalk.event.listener.GroupEventListener; /* */ import cn.cloudwalk.event.listener.GroupListnerClassMapping; /* */ import com.google.common.collect.Lists; /* */ import com.google.common.collect.Sets; /* */ import java.lang.reflect.ParameterizedType; /* */ import java.lang.reflect.Type; /* */ import java.util.ArrayList; /* */ import java.util.HashMap; /* */ import java.util.LinkedList; /* */ import java.util.List; /* */ import java.util.Map; /* */ import java.util.Set; /* */ import java.util.concurrent.SynchronousQueue; /* */ import java.util.concurrent.ThreadFactory; /* */ import java.util.concurrent.ThreadPoolExecutor; /* */ import java.util.concurrent.TimeUnit; /* */ import java.util.regex.Matcher; /* */ import java.util.regex.Pattern; /* */ import org.slf4j.Logger; /* */ import org.slf4j.LoggerFactory; /* */ import org.springframework.beans.BeansException; /* */ import org.springframework.beans.factory.annotation.Autowired; /* */ import org.springframework.boot.CommandLineRunner; /* */ import org.springframework.context.ApplicationContext; /* */ import org.springframework.core.env.Environment; /* */ import org.springframework.util.CollectionUtils; /* */ import org.springframework.util.StringUtils; /* */ /* */ public class CloudwalkEventInitializing /* */ implements CommandLineRunner { /* 48 */ private static final Logger LOGGER = LoggerFactory.getLogger(CloudwalkEventInitializing.class); /* */ /* */ @Autowired(required = false) /* */ private List eventHandlers; /* */ /* */ @Autowired /* */ private CloudwalkEventManager cloudwalkEventManager; /* */ /* */ @Autowired /* */ private Environment environment; /* */ /* 59 */ private static final Pattern pattern = Pattern.compile("^\\$\\{(.*?)\\}$"); /* */ /* */ /* */ private static final String DEFAULT_SUFFIX = ""; /* */ /* */ /* */ private String defaultGroup; /* */ /* */ /* */ private EventClient defaultEventClient; /* */ /* 70 */ private Map eventClientMap = new HashMap<>(); /* */ /* */ private EventProperties eventProperties; /* */ /* */ private static boolean isHandlerEnable = false; /* */ /* */ /* */ public void run(String... args) throws Exception { /* 78 */ init(); /* */ } /* */ /* */ public void init() throws Exception { /* 82 */ initEventClient(); /* 83 */ initHandlerMapping(); /* 84 */ initHandlerExecutor(); /* 85 */ subscribeEvent(); /* */ } /* */ /* */ public CloudwalkEventInitializing(EventProperties eventProperties) { /* 89 */ this.eventProperties = eventProperties; /* */ } /* */ /* */ /* */ private void initEventClient() throws ClassNotFoundException { /* 94 */ String[] groupIds = this.eventProperties.getGroupId().split(","); /* */ /* 96 */ if (groupIds.length == 0) { /* 97 */ throw new IllegalArgumentException("消费组ID不能为空"); /* */ } /* */ /* 100 */ Map listenerClassMap = this.eventProperties.getListenerClass(); /* */ /* 102 */ GroupListnerClassMapping groupListnerClassMapping = new GroupListnerClassMapping(); /* */ /* 104 */ for (int i = 0; i < groupIds.length; i++) { /* 105 */ Class listenerClass; boolean isFirst = (i == 0); /* 106 */ String groupId = groupIds[i]; /* */ /* */ /* */ /* 110 */ if (null == listenerClassMap || StringUtils.isEmpty(listenerClassMap.get(groupId))) { /* 111 */ if (isFirst) { /* 112 */ Class clazz = CloudwalkEventListener.class; /* */ } else { /* 114 */ throw new IllegalArgumentException(String.format("groupId[%s]缺少listener-class的配置", new Object[] { groupId })); /* */ } /* */ } else { /* 117 */ listenerClass = (Class)Class.forName(listenerClassMap.get(groupId)); /* */ } /* */ /* 120 */ initEventClient(this.eventProperties.getBootstrapServers(), groupId, listenerClass, isFirst); /* 121 */ groupListnerClassMapping.register(listenerClass, groupId); /* */ } /* */ /* 124 */ this.cloudwalkEventManager.setGroupListnerClassMapping(groupListnerClassMapping); /* 125 */ this.cloudwalkEventManager.setEventClient(this.defaultEventClient); /* */ } /* */ /* */ @Autowired /* */ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { /* 130 */ GroupEventListener.applicationContext = applicationContext; /* */ } /* */ /* */ /* */ /* */ /* */ /* */ private List extractGroupIds(EventHandler eventHandler) { /* 138 */ if (eventHandler.getClass().isAnnotationPresent((Class)ConsumerGroup.class)) { /* 139 */ ConsumerGroup consumerGroup = eventHandler.getClass().getAnnotation(ConsumerGroup.class); /* */ /* 141 */ List groupIds = new LinkedList<>(); /* 142 */ for (String groupId : consumerGroup.groupIds()) { /* 143 */ Matcher matcher = pattern.matcher(groupId); /* 144 */ if (matcher.find()) { /* 145 */ String propertyKey = groupId.substring(2, groupId.length() - 1).trim(); /* 146 */ String propertyValue = this.environment.getProperty(propertyKey); /* 147 */ if (!StringUtils.isEmpty(propertyValue)) { /* 148 */ groupId = propertyValue; /* */ } /* */ } /* 151 */ groupIds.add(groupId); /* */ } /* */ /* 154 */ return groupIds; /* */ } /* 156 */ return Lists.newArrayList((Object[])new String[] { this.defaultGroup }); /* */ } /* */ /* */ /* */ /* */ /* */ private void initHandlerMapping() { /* 163 */ if (CollectionUtils.isEmpty(this.eventHandlers)) { /* */ return; /* */ } /* */ /* 167 */ isHandlerEnable = true; /* */ /* 169 */ EventHandlerMapping eventHandlerMapping = new EventHandlerMapping(); /* */ /* 171 */ for (EventHandler handler : this.eventHandlers) { /* 172 */ if (handler.getClass().isAnnotationPresent((Class)EventTopicSuffix.class)) { /* 173 */ Type type = handler.getClass().getGenericInterfaces()[0]; /* 174 */ ParameterizedType p = (ParameterizedType)type; /* 175 */ Class eventClass = (Class)p.getActualTypeArguments()[0]; /* 176 */ EventTopicSuffix eventTopicSuffix = handler.getClass().getAnnotation(EventTopicSuffix.class); /* */ /* 178 */ registerHandlerMapping(eventHandlerMapping, handler, eventClass, eventTopicSuffix.value()); continue; /* 179 */ } if (handler.getClass().isAnnotationPresent((Class)CustomTopic.class)) { /* 180 */ CustomTopic customTopic = handler.getClass().getAnnotation(CustomTopic.class); /* 181 */ String topic = customTopic.topic(); /* 182 */ if (StringUtils.isEmpty(topic)) { /* */ continue; /* */ } /* 185 */ registerCustomHandlerMapping(eventHandlerMapping, (EventHandler)handler, topic, customTopic.suffix()); /* */ } /* */ } /* 188 */ this.cloudwalkEventManager.setEventHandlerMapping(eventHandlerMapping); /* */ } /* */ /* */ /* */ /* */ /* */ private void initHandlerExecutor() { /* 195 */ if (isHandlerEnable) { /* 196 */ EventHandlerWorker eventHandlerWorker = new EventHandlerWorker(); /* 197 */ EventProperties.HandlerExecutorConfig handlerExecutorConfig = this.eventProperties.getHandlerExecutorConfig(); /* 198 */ eventHandlerWorker.setPoolExecutor(new ThreadPoolExecutor(handlerExecutorConfig.getCorePoolSize().intValue(), handlerExecutorConfig /* 199 */ .getMaximumPoolSize().intValue(), 1000L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), (ThreadFactory)new NamedThreadFactory(this.eventProperties /* 200 */ .getWorkerNamePrefix()), new ThreadPoolExecutor.CallerRunsPolicy())); /* 201 */ this.cloudwalkEventManager.setEventHandlerWorker(eventHandlerWorker); /* */ } /* */ } /* */ /* */ /* */ /* */ /* */ /* */ /* */ /* */ private EventType getEventType(Class eventClass) { /* 212 */ for (EventType eventType : EventType.values()) { /* 213 */ if (eventType.getEventClass().getClass().equals(eventClass)) { /* 214 */ return eventType; /* */ } /* */ } /* 217 */ return null; /* */ } /* */ /* */ /* */ /* */ /* */ private void subscribeEvent() throws IllegalAccessException, InstantiationException { /* 224 */ EventHandlerMapping eventHandlerMapping = this.cloudwalkEventManager.getEventHandlerMapping(); /* */ /* 226 */ Map>> subscribedMap = new HashMap<>(); /* */ /* 228 */ for (Map.Entry>>> entry : (Iterable>>>>)eventHandlerMapping.getHandlerMap().entrySet()) { /* 229 */ for (Map.Entry>> eventTypeMapEntry : (Iterable>>>)((Map)entry.getValue()).entrySet()) { /* 230 */ for (String serviceCode : ((Map)eventTypeMapEntry.getValue()).keySet()) { /* 231 */ subscribe(subscribedMap, entry.getKey(), eventTypeMapEntry.getKey(), serviceCode); /* */ } /* */ } /* */ } /* */ /* 236 */ Map>> customSubscribedMap = new HashMap<>(); /* */ /* 238 */ for (Map.Entry>>> entry : (Iterable>>>>)eventHandlerMapping.getCustomHandlerMap().entrySet()) { /* 239 */ for (Map.Entry>> subEntry : (Iterable>>>)((Map)entry.getValue()).entrySet()) { /* 240 */ for (Map.Entry> handlerEntry : (Iterable>>)((Map)subEntry.getValue()).entrySet()) { /* 241 */ for (CustomEventHandler customEventHandler : handlerEntry.getValue()) { /* 242 */ Type type = customEventHandler.getClass().getGenericInterfaces()[0]; /* 243 */ ParameterizedType p = (ParameterizedType)type; /* 244 */ Class eventClass = (Class)p.getActualTypeArguments()[0]; /* 245 */ customSubscribe(customSubscribedMap, entry.getKey(), subEntry.getKey(), handlerEntry /* 246 */ .getKey(), eventClass); /* */ } /* */ } /* */ } /* */ } /* */ } /* */ /* */ /* */ /* */ /* */ /* */ /* */ /* */ /* */ /* */ /* */ /* */ /* */ private void subscribe(Map>> subscribedMap, String groupId, EventType eventType, String serviceCode) { /* 265 */ if (StringUtils.isEmpty(groupId)) { /* 266 */ groupId = this.defaultGroup; /* */ } /* */ /* 269 */ Map> eventTypeServiceCodeMap = subscribedMap.get(groupId); /* 270 */ if (null != eventTypeServiceCodeMap) { /* 271 */ Set serviceCodesSet = eventTypeServiceCodeMap.get(eventType); /* 272 */ if (CollectionUtils.isEmpty(serviceCodesSet)) { /* 273 */ eventTypeServiceCodeMap.put(eventType, Sets.newHashSet((Object[])new String[] { serviceCode })); /* */ } else { /* 275 */ serviceCodesSet.add(serviceCode); /* */ } /* */ } else { /* 278 */ eventTypeServiceCodeMap = new HashMap<>(); /* 279 */ eventTypeServiceCodeMap.put(eventType, Sets.newHashSet((Object[])new String[] { serviceCode })); /* 280 */ subscribedMap.put(groupId, eventTypeServiceCodeMap); /* */ } /* 282 */ ((EventClient)this.eventClientMap.get(groupId)).subEvent(eventType, serviceCode); /* */ } /* */ /* */ /* */ /* */ private void customSubscribe(Map>> customSubscribedMap, String groupId, String topic, String serviceCode, Class eventClass) throws IllegalAccessException, InstantiationException { /* 288 */ if (StringUtils.isEmpty(groupId)) { /* 289 */ groupId = this.defaultGroup; /* */ } /* */ /* 292 */ Map> topicServiceCodeMap = customSubscribedMap.get(groupId); /* 293 */ if (null != topicServiceCodeMap) { /* 294 */ Set serviceCodesSet = topicServiceCodeMap.get(topic); /* 295 */ if (CollectionUtils.isEmpty(serviceCodesSet)) { /* 296 */ topicServiceCodeMap.put(topic, Sets.newHashSet((Object[])new String[] { serviceCode })); /* */ } else { /* 298 */ serviceCodesSet.add(serviceCode); /* */ } /* */ } else { /* 301 */ topicServiceCodeMap = new HashMap<>(); /* 302 */ topicServiceCodeMap.put(topic, Sets.newHashSet((Object[])new String[] { serviceCode })); /* 303 */ customSubscribedMap.put(groupId, topicServiceCodeMap); /* */ } /* 305 */ EventClient eventClient = this.eventClientMap.get(groupId); /* 306 */ if (null == eventClient) { /* 307 */ throw new IllegalArgumentException(String.format("没有找到groupId[%s]对应的配置,请检查 ${cloudwalk.event.group-id} 配置", new Object[] { groupId })); /* */ } /* */ /* 310 */ eventClient.subEvent(topic, serviceCode, eventClass.newInstance()); /* */ } /* */ /* */ /* */ /* */ private void initEventClient(String bootstrapServers, String groupId, Class listenerClass, boolean isDefault) { /* 316 */ EventClient eventClient = this.eventClientMap.get(groupId); /* */ /* 318 */ if (null == eventClient) { /* 319 */ eventClient = EventClient.getInstance(bootstrapServers, groupId); /* 320 */ eventClient.init(); /* 321 */ if (null != this.eventProperties.getFetchDataWorkerNumber()) { /* 322 */ eventClient.setWorkNum(this.eventProperties.getFetchDataWorkerNumber()); /* */ } /* 324 */ eventClient.setListenerClass(listenerClass); /* */ /* 326 */ this.eventClientMap.put(groupId, eventClient); /* */ } /* */ /* 329 */ if (isDefault) { /* 330 */ this.defaultGroup = groupId; /* 331 */ this.defaultEventClient = eventClient; /* */ } /* */ } /* */ /* */ /* */ /* */ /* */ /* */ /* */ /* */ /* */ /* */ /* */ /* */ /* */ /* */ /* */ private void registerHandlerMapping(EventHandlerMapping eventHandlerMapping, EventHandler handler, Class eventClass, String[] suffixes) { /* 349 */ List groupIds = extractGroupIds(handler); /* 350 */ EventType eventType = getEventType(eventClass); /* */ /* 352 */ for (String groupId : groupIds) { /* 353 */ Map> handlers = eventHandlerMapping.getServiceCodeHandlerListMap(groupId, eventType); /* */ /* 355 */ if (CollectionUtils.isEmpty(handlers)) { /* 356 */ handlers = new HashMap<>(); /* 357 */ if (null == suffixes || suffixes.length == 0) { /* 358 */ handlers.put("", Lists.newArrayList((Object[])new EventHandler[] { handler })); /* */ } else { /* 360 */ for (String suffix : suffixes) { /* 361 */ handlers.put(suffix, Lists.newArrayList((Object[])new EventHandler[] { handler })); /* */ } /* */ } /* */ /* 365 */ eventHandlerMapping.registerHandlers(groupId, eventType, handlers); continue; /* */ } /* 367 */ if (null == suffixes || suffixes.length == 0) { /* 368 */ List currentHandlers = handlers.get(""); /* 369 */ if (CollectionUtils.isEmpty(currentHandlers)) { /* 370 */ currentHandlers = new ArrayList<>(); /* */ } /* 372 */ currentHandlers.add(handler); /* 373 */ handlers.put("", currentHandlers); continue; /* */ } /* 375 */ for (String suffix : suffixes) { /* 376 */ List currentHandlers = handlers.get(suffix); /* 377 */ if (CollectionUtils.isEmpty(currentHandlers)) { /* 378 */ currentHandlers = new ArrayList<>(); /* */ } /* 380 */ currentHandlers.add(handler); /* 381 */ handlers.put(suffix, currentHandlers); /* */ } /* */ } /* */ } /* */ /* */ /* */ /* */ /* */ /* */ /* */ private void registerCustomHandlerMapping(EventHandlerMapping eventHandlerMapping, EventHandler handler, String topic, String serviceCode) { /* 392 */ List groupIds = extractGroupIds(handler); /* */ /* 394 */ for (String groupId : groupIds) { /* */ /* */ /* 397 */ Map> customHandlers = eventHandlerMapping.getServiceCodeCustomHandlerListMap(groupId, topic); /* */ /* 399 */ if (null == customHandlers) { /* 400 */ customHandlers = new HashMap<>(); /* 401 */ if (StringUtils.isEmpty(topic)) { /* 402 */ customHandlers.put("", Lists.newArrayList((Object[])new CustomEventHandler[] { (CustomEventHandler)handler })); /* */ } else { /* 404 */ customHandlers.put(serviceCode, Lists.newArrayList((Object[])new CustomEventHandler[] { (CustomEventHandler)handler })); /* */ } /* 406 */ eventHandlerMapping.registerCustomHandlers(groupId, topic, customHandlers); continue; /* */ } /* 408 */ if (StringUtils.isEmpty(serviceCode)) { /* 409 */ List list = customHandlers.get(""); /* 410 */ if (CollectionUtils.isEmpty(list)) { /* 411 */ list = new ArrayList<>(); /* */ } /* 413 */ list.add((CustomEventHandler)handler); /* 414 */ customHandlers.put("", list); continue; /* */ } /* 416 */ List currentHandlers = customHandlers.get(serviceCode); /* 417 */ if (CollectionUtils.isEmpty(currentHandlers)) { /* 418 */ currentHandlers = new ArrayList<>(); /* */ } /* 420 */ currentHandlers.add((CustomEventHandler)handler); /* 421 */ customHandlers.put(serviceCode, currentHandlers); /* */ } /* */ } /* */ } /* Location: D:\星中心\cw-elevator-application-V1.0.0.20211103\cw-elevator-application-V1.0.0.20211103\lib\cloudwalk-common-event-3.7.2-Brussels-SRX.jar!\cn\cloudwalk\event\CloudwalkEventInitializing.class * Java compiler version: 7 (51.0) * JD-Core Version: 1.1.3 */