package cn.cloudwalk.event; import cn.cloudwalk.cwos.client.event.EventClient; import cn.cloudwalk.cwos.client.event.event.BaseEvent; import cn.cloudwalk.cwos.client.event.event.CustomEvent; import cn.cloudwalk.cwos.client.event.event.EventType; import cn.cloudwalk.cwos.client.event.handler.EventListener; import cn.cloudwalk.event.annotation.ConsumerGroup; import cn.cloudwalk.event.annotation.CustomTopic; import cn.cloudwalk.event.annotation.EventTopicSuffix; import cn.cloudwalk.event.autoconfig.EventProperties; import cn.cloudwalk.event.handler.CustomEventHandler; import cn.cloudwalk.event.handler.EventHandler; import cn.cloudwalk.event.handler.EventHandlerMapping; import cn.cloudwalk.event.handler.EventHandlerWorker; import cn.cloudwalk.event.handler.NamedThreadFactory; import cn.cloudwalk.event.listener.CloudwalkEventListener; import cn.cloudwalk.event.listener.GroupEventListener; import cn.cloudwalk.event.listener.GroupListnerClassMapping; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.context.ApplicationContext; import org.springframework.core.env.Environment; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; public class CloudwalkEventInitializing implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(CloudwalkEventInitializing.class); @Autowired(required = false) private List eventHandlers; @Autowired private CloudwalkEventManager cloudwalkEventManager; @Autowired private Environment environment; private static final Pattern pattern = Pattern.compile("^\\$\\{(.*?)\\}$"); private static final String DEFAULT_SUFFIX = ""; private String defaultGroup; private EventClient defaultEventClient; private Map eventClientMap = new HashMap<>(); private EventProperties eventProperties; private static boolean isHandlerEnable = false; public void run(String... args) throws Exception { init(); } public void init() throws Exception { initEventClient(); initHandlerMapping(); initHandlerExecutor(); subscribeEvent(); } public CloudwalkEventInitializing(EventProperties eventProperties) { this.eventProperties = eventProperties; } private void initEventClient() throws ClassNotFoundException { String[] groupIds = this.eventProperties.getGroupId().split(","); if (groupIds.length == 0) { throw new IllegalArgumentException("消费组ID不能为空"); } Map listenerClassMap = this.eventProperties.getListenerClass(); GroupListnerClassMapping groupListnerClassMapping = new GroupListnerClassMapping(); for (int i = 0; i < groupIds.length; i++) { Class listenerClass; boolean isFirst = (i == 0); String groupId = groupIds[i]; if (null == listenerClassMap || StringUtils.isEmpty(listenerClassMap.get(groupId))) { if (isFirst) { Class clazz = CloudwalkEventListener.class; } else { throw new IllegalArgumentException(String.format("groupId[%s]缺少listener-class的配置", new Object[] { groupId })); } } else { listenerClass = (Class)Class.forName(listenerClassMap.get(groupId)); } initEventClient(this.eventProperties.getBootstrapServers(), groupId, listenerClass, isFirst); groupListnerClassMapping.register(listenerClass, groupId); } this.cloudwalkEventManager.setGroupListnerClassMapping(groupListnerClassMapping); this.cloudwalkEventManager.setEventClient(this.defaultEventClient); } @Autowired public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { GroupEventListener.applicationContext = applicationContext; } private List extractGroupIds(EventHandler eventHandler) { if (eventHandler.getClass().isAnnotationPresent((Class)ConsumerGroup.class)) { ConsumerGroup consumerGroup = eventHandler.getClass().getAnnotation(ConsumerGroup.class); List groupIds = new LinkedList<>(); for (String groupId : consumerGroup.groupIds()) { Matcher matcher = pattern.matcher(groupId); if (matcher.find()) { String propertyKey = groupId.substring(2, groupId.length() - 1).trim(); String propertyValue = this.environment.getProperty(propertyKey); if (!StringUtils.isEmpty(propertyValue)) { groupId = propertyValue; } } groupIds.add(groupId); } return groupIds; } return Lists.newArrayList((Object[])new String[] { this.defaultGroup }); } private void initHandlerMapping() { if (CollectionUtils.isEmpty(this.eventHandlers)) { return; } isHandlerEnable = true; EventHandlerMapping eventHandlerMapping = new EventHandlerMapping(); for (EventHandler handler : this.eventHandlers) { if (handler.getClass().isAnnotationPresent((Class)EventTopicSuffix.class)) { Type type = handler.getClass().getGenericInterfaces()[0]; ParameterizedType p = (ParameterizedType)type; Class eventClass = (Class)p.getActualTypeArguments()[0]; EventTopicSuffix eventTopicSuffix = handler.getClass().getAnnotation(EventTopicSuffix.class); registerHandlerMapping(eventHandlerMapping, handler, eventClass, eventTopicSuffix.value()); continue; } if (handler.getClass().isAnnotationPresent((Class)CustomTopic.class)) { CustomTopic customTopic = handler.getClass().getAnnotation(CustomTopic.class); String topic = customTopic.topic(); if (StringUtils.isEmpty(topic)) { continue; } registerCustomHandlerMapping(eventHandlerMapping, (EventHandler)handler, topic, customTopic.suffix()); } } this.cloudwalkEventManager.setEventHandlerMapping(eventHandlerMapping); } private void initHandlerExecutor() { if (isHandlerEnable) { EventHandlerWorker eventHandlerWorker = new EventHandlerWorker(); EventProperties.HandlerExecutorConfig handlerExecutorConfig = this.eventProperties.getHandlerExecutorConfig(); eventHandlerWorker.setPoolExecutor(new ThreadPoolExecutor(handlerExecutorConfig.getCorePoolSize().intValue(), handlerExecutorConfig .getMaximumPoolSize().intValue(), 1000L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), (ThreadFactory)new NamedThreadFactory(this.eventProperties .getWorkerNamePrefix()), new ThreadPoolExecutor.CallerRunsPolicy())); this.cloudwalkEventManager.setEventHandlerWorker(eventHandlerWorker); } } private EventType getEventType(Class eventClass) { for (EventType eventType : EventType.values()) { if (eventType.getEventClass().getClass().equals(eventClass)) { return eventType; } } return null; } private void subscribeEvent() throws IllegalAccessException, InstantiationException { EventHandlerMapping eventHandlerMapping = this.cloudwalkEventManager.getEventHandlerMapping(); Map>> subscribedMap = new HashMap<>(); for (Map.Entry>>> entry : (Iterable>>>>)eventHandlerMapping.getHandlerMap().entrySet()) { for (Map.Entry>> eventTypeMapEntry : (Iterable>>>)((Map)entry.getValue()).entrySet()) { for (String serviceCode : ((Map)eventTypeMapEntry.getValue()).keySet()) { subscribe(subscribedMap, entry.getKey(), eventTypeMapEntry.getKey(), serviceCode); } } } Map>> customSubscribedMap = new HashMap<>(); for (Map.Entry>>> entry : (Iterable>>>>)eventHandlerMapping.getCustomHandlerMap().entrySet()) { for (Map.Entry>> subEntry : (Iterable>>>)((Map)entry.getValue()).entrySet()) { for (Map.Entry> handlerEntry : (Iterable>>)((Map)subEntry.getValue()).entrySet()) { for (CustomEventHandler customEventHandler : handlerEntry.getValue()) { Type type = customEventHandler.getClass().getGenericInterfaces()[0]; ParameterizedType p = (ParameterizedType)type; Class eventClass = (Class)p.getActualTypeArguments()[0]; customSubscribe(customSubscribedMap, entry.getKey(), subEntry.getKey(), handlerEntry .getKey(), eventClass); } } } } } private void subscribe(Map>> subscribedMap, String groupId, EventType eventType, String serviceCode) { if (StringUtils.isEmpty(groupId)) { groupId = this.defaultGroup; } Map> eventTypeServiceCodeMap = subscribedMap.get(groupId); if (null != eventTypeServiceCodeMap) { Set serviceCodesSet = eventTypeServiceCodeMap.get(eventType); if (CollectionUtils.isEmpty(serviceCodesSet)) { eventTypeServiceCodeMap.put(eventType, Sets.newHashSet((Object[])new String[] { serviceCode })); } else { serviceCodesSet.add(serviceCode); } } else { eventTypeServiceCodeMap = new HashMap<>(); eventTypeServiceCodeMap.put(eventType, Sets.newHashSet((Object[])new String[] { serviceCode })); subscribedMap.put(groupId, eventTypeServiceCodeMap); } ((EventClient)this.eventClientMap.get(groupId)).subEvent(eventType, serviceCode); } private void customSubscribe(Map>> customSubscribedMap, String groupId, String topic, String serviceCode, Class eventClass) throws IllegalAccessException, InstantiationException { if (StringUtils.isEmpty(groupId)) { groupId = this.defaultGroup; } Map> topicServiceCodeMap = customSubscribedMap.get(groupId); if (null != topicServiceCodeMap) { Set serviceCodesSet = topicServiceCodeMap.get(topic); if (CollectionUtils.isEmpty(serviceCodesSet)) { topicServiceCodeMap.put(topic, Sets.newHashSet((Object[])new String[] { serviceCode })); } else { serviceCodesSet.add(serviceCode); } } else { topicServiceCodeMap = new HashMap<>(); topicServiceCodeMap.put(topic, Sets.newHashSet((Object[])new String[] { serviceCode })); customSubscribedMap.put(groupId, topicServiceCodeMap); } EventClient eventClient = this.eventClientMap.get(groupId); if (null == eventClient) { throw new IllegalArgumentException(String.format("没有找到groupId[%s]对应的配置,请检查 ${cloudwalk.event.group-id} 配置", new Object[] { groupId })); } eventClient.subEvent(topic, serviceCode, eventClass.newInstance()); } private void initEventClient(String bootstrapServers, String groupId, Class listenerClass, boolean isDefault) { EventClient eventClient = this.eventClientMap.get(groupId); if (null == eventClient) { eventClient = EventClient.getInstance(bootstrapServers, groupId); eventClient.init(); if (null != this.eventProperties.getFetchDataWorkerNumber()) { eventClient.setWorkNum(this.eventProperties.getFetchDataWorkerNumber()); } eventClient.setListenerClass(listenerClass); this.eventClientMap.put(groupId, eventClient); } if (isDefault) { this.defaultGroup = groupId; this.defaultEventClient = eventClient; } } private void registerHandlerMapping(EventHandlerMapping eventHandlerMapping, EventHandler handler, Class eventClass, String[] suffixes) { List groupIds = extractGroupIds(handler); EventType eventType = getEventType(eventClass); for (String groupId : groupIds) { Map> handlers = eventHandlerMapping.getServiceCodeHandlerListMap(groupId, eventType); if (CollectionUtils.isEmpty(handlers)) { handlers = new HashMap<>(); if (null == suffixes || suffixes.length == 0) { handlers.put("", Lists.newArrayList((Object[])new EventHandler[] { handler })); } else { for (String suffix : suffixes) { handlers.put(suffix, Lists.newArrayList((Object[])new EventHandler[] { handler })); } } eventHandlerMapping.registerHandlers(groupId, eventType, handlers); continue; } if (null == suffixes || suffixes.length == 0) { List currentHandlers = handlers.get(""); if (CollectionUtils.isEmpty(currentHandlers)) { currentHandlers = new ArrayList<>(); } currentHandlers.add(handler); handlers.put("", currentHandlers); continue; } for (String suffix : suffixes) { List currentHandlers = handlers.get(suffix); if (CollectionUtils.isEmpty(currentHandlers)) { currentHandlers = new ArrayList<>(); } currentHandlers.add(handler); handlers.put(suffix, currentHandlers); } } } private void registerCustomHandlerMapping(EventHandlerMapping eventHandlerMapping, EventHandler handler, String topic, String serviceCode) { List groupIds = extractGroupIds(handler); for (String groupId : groupIds) { Map> customHandlers = eventHandlerMapping.getServiceCodeCustomHandlerListMap(groupId, topic); if (null == customHandlers) { customHandlers = new HashMap<>(); if (StringUtils.isEmpty(topic)) { customHandlers.put("", Lists.newArrayList((Object[])new CustomEventHandler[] { (CustomEventHandler)handler })); } else { customHandlers.put(serviceCode, Lists.newArrayList((Object[])new CustomEventHandler[] { (CustomEventHandler)handler })); } eventHandlerMapping.registerCustomHandlers(groupId, topic, customHandlers); continue; } if (StringUtils.isEmpty(serviceCode)) { List list = customHandlers.get(""); if (CollectionUtils.isEmpty(list)) { list = new ArrayList<>(); } list.add((CustomEventHandler)handler); customHandlers.put("", list); continue; } List currentHandlers = customHandlers.get(serviceCode); if (CollectionUtils.isEmpty(currentHandlers)) { currentHandlers = new ArrayList<>(); } currentHandlers.add((CustomEventHandler)handler); customHandlers.put(serviceCode, currentHandlers); } } }