本文作者:包子也沉默

RocketMQ中Producer的启动源码分析

包子也沉默 3年前 (2019-07-31) ( 07-31 ) 1418 0条评论
摘要: roducer提供的get、set方法进行相应操作常用的构造方法如下:1publicDefaultMQProducer(){2this(MixAll.DEFAULT_PRODUCER_GROUP,null);3}45publicDefaultMQProducer(finalStringproducerGroup){6this(producerGroup,null);7}89publicDefaul

RocketMQ中通过DefaultMQProducer创建Producer

nceforclientId:[{}]",clientId);14}15}1617returninstance;18}首先通过buildMQClientId方法创建clientId:1publicSt

 

QueueList的length取余运算,选取一个MessageQueue,进而选取一条真正的消息队列进行消息发送再次回到DefaultMQProducerImpl的start方法,在完成create

DefaultMQProducer定义如下:

cRouteDataIsChange(old,topicRouteData);27if(!changed){28changed=this.isNeedUpdateTopicRouteInfo(topi

 1 public class DefaultMQProducer extends ClientConfig implements MQProducer {
 2     protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
 3     
 4     private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
 5     
 6     private volatile int defaultTopicQueueNums = 4;
 7     
 8     private int sendMsgTimeout = 3000;
 9     
10     private int compressMsgBodyOverHowmuch = 1024 * 4;
11     
12     private int retryTimesWhenSendFailed = 2;
13     
14     private int retryTimesWhenSendAsyncFailed = 2;
15     
16     private boolean retryAnotherBrokerWhenNotStoreOK = false;
17     
18     private int maxMessageSize = 1024 * 1024 * 4; // 4M
19 }

其中defaultMQProducerImpl成员是Producer的具体实现,其余的一些成员是对一些参数的设置:
createTopicKey:是一个Topic值,在创建时使用,后面会说明
defaultTopicQueueNums :默认的Topic队列个数
sendMsgTimeout:发送消息超时时间
compressMsgBodyOverHowmuch:消息容量限制,超过需要进行压缩
retryTimesWhenSendFailed:同步消息发送失败的允许重发次数
retryTimesWhenSendAsyncFailed:异步消息发送失败的允许重发次数
retryAnotherBrokerWhenNotStoreOK:是否允许发送给Broker失败后,重新选择Broker发送
maxMessageSize:消息最大大小
这些属性可以通过DefaultMQProducer提供的get、set方法进行相应操作

集群)。我们采用的是第二种,外部etcd,拓补图如下:如果采用堆叠的etcd拓补图则是:这边大家可以根据具体的情况选择,推荐使用第二种,外部的etcd。参考来源:https://kubernetes.

常用的构造方法如下:

例模式,getInstance是获取MQClientManager的单例,根据ClientConfig的类型,通过getAndCreateMQClientInstance方法实例化不同属性的生产者客户

 1 public DefaultMQProducer() {
 2     this(MixAll.DEFAULT_PRODUCER_GROUP, null);
 3 }
 4 
 5 public DefaultMQProducer(final String producerGroup) {
 6     this(producerGroup, null);
 7 }
 8 
 9 public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
10     this.producerGroup = producerGroup;
11     defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
12 }


DefaultMQProducer继承自ClientConfig,首先会设置ClientConfig提供的更底层的参数配置:

olatileThreadLocalIndexsendWhichQueue=newThreadLocalIndex();6}其中messageQueueList存放消息队列MessageQueue,s

 1 public class ClientConfig {
 2     public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
 3     
 4     private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
 5     
 6     private String clientIP = RemotingUtil.getLocalAddress();
 7     
 8     private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
 9     
10     private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
11     
12     private int pollNameServerInterval = 1000 * 30;
13     
14     private int heartbeatBrokerInterval = 1000 * 30;
15     
16     private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
17 }

其中namesrvAddr是非常重要的成员,其保存着名称服务器(Name Server)的地址,在一开始构造时会根据系统属性进行设置,若是没有设置系统属性就是null,则需要在后面通过set方法进行设置
clientIP:Producer端的本地IP
instanceName:Producer的实例名称
pollNameServerInterval :轮询NameServer的时间间隔
heartbeatBrokerInterval :向Broker发送心跳包的时间间隔
SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY 和vipChannelEnabled:决定是否使用VIP通道,即高优先级

st();29topicList.addAll(lst);30}31}32}3334for(Stringtopic:topicList){35this.updateTopicRouteInfoFrom

回到DefaultMQProducer的构造方法,其会创建DefaultMQProducerImpl实例

wDefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);只不过他调用的start方法,参数为false,也就是没有调用mQClientFactor

 1 private final Random random = new Random();
 2 private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
 3         new ConcurrentHashMap<String, TopicPublishInfo>();
 4 private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
 5 private final RPCHook rpcHook;
 6 protected BlockingQueue<Runnable> checkRequestQueue;
 7 protected ExecutorService checkExecutor;
 8 private ServiceState serviceState = ServiceState.CREATE_JUST;
 9 private MQClientInstance mQClientFactory;
10 private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
11 private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
12 private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
13 private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
14 private final ExecutorService defaultAsyncSenderExecutor;
15 private ExecutorService asyncSenderExecutor;
16 
17 public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
18     this.defaultMQProducer = defaultMQProducer;
19     this.rpcHook = rpcHook;
20 
21     this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
22     this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
23         Runtime.getRuntime().availableProcessors(),
24         Runtime.getRuntime().availableProcessors(),
25         1000 * 60,
26         TimeUnit.MILLISECONDS,
27         this.asyncSenderThreadPoolQueue,
28         new ThreadFactory() {
29             private AtomicInteger threadIndex = new AtomicInteger(0);
30 
31             @Override
32             public Thread newThread(Runnable r) {
33                 return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
34             }
35         });
36 }

在构造方法中会创建一个线程池,用来处理异步消息的发送
其中有一个topicPublishInfoTable成员很重要,是一个map,保存了不同top和消息队列之间的映射,在后面详细介绍

this.defaultMQProducer.getProducerGroup()15+"]hasbeencreatedbefore,specifyanothernameplease."+FAQUrl


DefaultMQProducer创建完成后,接着来看DefaultMQProducer的start方法:

较重要的map1privatefinalConcurrentMap<String/*group*/,MQProducerInner>producerTable=newConcurrentH

 1 public void start() throws MQClientException {
 2     this.defaultMQProducerImpl.start();
 3     if (null != traceDispatcher) {
 4         try {
 5             traceDispatcher.start(this.getNamesrvAddr());
 6         } catch (MQClientException e) {
 7             log.warn("trace dispatcher start failed ", e);
 8         }
 9     }
10 }

首先交给了defaultMQProducerImpl的start方法去处理

rval(),TimeUnit.MILLISECONDS);5253this.scheduledExecutorService.scheduleAtFixedRate(newRunnable(){54

defaultMQProducerImpl的start方法:

picRouteTable.entrySet().iterator();3while(it.hasNext()){4Entry<String,TopicRouteData>entry=it

 1 public void start() throws MQClientException {
 2     this.start(true);
 3 }
 4 
 5 public void start(final boolean startFactory) throws MQClientException {
 6     switch (this.serviceState) {
 7         case CREATE_JUST:
 8             this.serviceState = ServiceState.START_FAILED;
 9 
10             this.checkConfig();
11 
12             if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
13                 this.defaultMQProducer.changeInstanceNameToPID();
14             }
15 
16             this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
17 
18             boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
19             if (!registerOK) {
20                 this.serviceState = ServiceState.CREATE_JUST;
21                 throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
22                     + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
23                     null);
24             }
25 
26             this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
27 
28             if (startFactory) {
29                 mQClientFactory.start();
30             }
31 
32             log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
33                 this.defaultMQProducer.isSendMessageWithVIPChannel());
34             this.serviceState = ServiceState.RUNNING;
35             break;
36         case RUNNING:
37         case START_FAILED:
38         case SHUTDOWN_ALREADY:
39             throw new MQClientException("The producer service state not OK, maybe started once, "
40                 + this.serviceState
41                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
42                 null);
43         default:
44             break;
45     }
46 
47     this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
48 }

在一开始DefaultMQProducerImpl实例化的时候,serviceState初始化为CREATE_JUST状态,这是一个枚举值,一共有如下几种状态:

essageWithVIPChannel());34this.serviceState=ServiceState.RUNNING;35break;36caseRUNNING:37caseSTART_F

1 public enum ServiceState {
2     CREATE_JUST,
3     RUNNING,
4     SHUTDOWN_ALREADY,
5     START_FAILED;
6 
7     private ServiceState() {
8     }
9 }

这几个状态值很容易理解,在后面MQClientInstance中还会使用到

ressList(List<String>addrs){2List<String>old=this.namesrvAddrList.get();3booleanupdate=f

回到start方法,根据serviceState进行判断,只有当是CREATE_JUST状态时正常执行,防止在其他状态下错误调用start

8MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();9}catch(Exceptione){10log.error("Schedu

直接看到CREATE_JUST的case部分:

BrokerTable.hasNext()){9Entry<String,HashMap<Long,String>>entry=itBrokerTable.next();10S

 1 this.serviceState = ServiceState.START_FAILED;
 2 
 3 this.checkConfig();
 4 
 5 if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
 6 this.defaultMQProducer.changeInstanceNameToPID();
 7 }
 8 
 9 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
10 
11 boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
12 if (!registerOK) {
13 this.serviceState = ServiceState.CREATE_JUST;
14 throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
15 + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
16 null);
17 }
18 
19 this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
20 
21 if (startFactory) {
22 mQClientFactory.start();
23 }
24 
25 log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
26 this.defaultMQProducer.isSendMessageWithVIPChannel());
27 this.serviceState = ServiceState.RUNNING;
28 break;

首先更改serviceState状态为START_FAILED,防止中途的失败

QProducerImpl的start方法去处理defaultMQProducerImpl的start方法:1publicvoidstart()throwsMQClientException{2thi

checkConfig方法是用来进行ProducerGroup命名检查:

ThreadFactory(){26privateAtomicIntegerthreadIndex=newAtomicInteger(0);2728@Override29publicThreadnew

 1 private void checkConfig() throws MQClientException {
 2     Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
 3 
 4     if (null == this.defaultMQProducer.getProducerGroup()) {
 5         throw new MQClientException("producerGroup is null", null);
 6     }
 7 
 8     if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
 9         throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
10             null);
11     }
12 }

主要是检查命名的合法性,以及防止和默认的producerGroup生产者组名DEFAULT_PRODUCER_GROUP产生冲突

lt;SocketChannel>(){21@Override22publicvoidinitChannel(SocketChannelch)throwsException{23ChannelP

1 public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";


接下来实例化mQClientFactory,这其实是生产者客户端的实例,其中MQClientManager采用单例模式,getInstance是获取MQClientManager的单例,根据ClientConfig的类型,通过getAndCreateMQClientInstance方法实例化不同属性的生产者客户端

lt;String,MQProducerInner>>it=this.producerTable.entrySet().iterator();5while(it.hasNext()&

MQClientManager:

3privatefinalBlockingQueue<Runnable>asyncSenderThreadPoolQueue;14privatefinalExecutorServicede

 1 public class MQClientManager {
 2     private final static InternalLogger log = ClientLogger.getLog();
 3     private static MQClientManager instance = new MQClientManager();
 4     private AtomicInteger factoryIndexGenerator = new AtomicInteger();
 5     private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
 6         new ConcurrentHashMap<String, MQClientInstance>();
 7 
 8     private MQClientManager() {
 9     }
10 
11     public static MQClientManager getInstance() {
12         return instance;
13     }
14 }

其中factoryTable是所有生产者客户端实例的map缓存,factoryIndexGenerator 是创建的每个客户端实例的流水号

.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY,this.clientRemotingProcessor,

getAndCreateMQClientInstance方法:

生的HttpURLConnection,完成了一次指定url的GET请求,返回请求数据,将请求到的数据以及状态码封装为HttpResult,返回给上一级调用,也就是TopAddressing的fetc

 1 public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
 2     String clientId = clientConfig.buildMQClientId();
 3     MQClientInstance instance = this.factoryTable.get(clientId);
 4     if (null == instance) {
 5         instance =
 6             new MQClientInstance(clientConfig.cloneClientConfig(),
 7                 this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
 8         MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
 9         if (prev != null) {
10             instance = prev;
11             log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
12         } else {
13             log.info("Created new MQClientInstance for clientId:[{}]", clientId);
14         }
15     }
16 
17     return instance;
18 }

首先通过buildMQClientId方法创建clientId:

ResponseCode.SUCCESS:{24byte[]body=response.getBody();25if(body!=null){26returnTopicRouteData.decode

 1 public String buildMQClientId() {
 2     StringBuilder sb = new StringBuilder();
 3     sb.append(this.getClientIP());
 4 
 5     sb.append("@");
 6     sb.append(this.getInstanceName());
 7     if (!UtilAll.isBlank(this.unitName)) {
 8         sb.append("@");
 9         sb.append(this.unitName);
10     }
11 
12     return sb.toString();
13 }

clientId主要由生产者客户端的ip地址以及实例名称,根据unitName的有无,附加unitName

lean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY,"true"));17}其中namesrvAdd

通过生成的clientId,在factoryTable缓存中先去获取是否创建过客户端实例
若是没有获取到,就需要实例化一个MQClientInstance
这里在实例化MQClientInstance时,并没有直接传入clientConfig,而是通过cloneClientConfig方法复制了一份,来保证安全性:

tance的构造方法创建了很多东西,就不一一说明,主要说几个重要的其中nettyClientConfig,就很清楚的说明了RocketMQ通过Netty来进行网络之间的I/O,其保存了对Netty的一

 1 public ClientConfig cloneClientConfig() {
 2     ClientConfig cc = new ClientConfig();
 3     cc.namesrvAddr = namesrvAddr;
 4     cc.clientIP = clientIP;
 5     cc.instanceName = instanceName;
 6     cc.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
 7     cc.pollNameServerInterval = pollNameServerInterval;
 8     cc.heartbeatBrokerInterval = heartbeatBrokerInterval;
 9     cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
10     cc.unitMode = unitMode;
11     cc.unitName = unitName;
12     cc.vipChannelEnabled = vipChannelEnabled;
13     cc.useTLS = useTLS;
14     cc.language = language;
15     return cc;
16 }


创建MQClientInstance实例:

connectto"+url+"failed,maybethedomainname"+MixAll.getWSAddr()+"notbindin/etc/hosts";31errorMsg+=FAQU

 1 public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
 2     this.clientConfig = clientConfig;
 3     this.instanceIndex = instanceIndex;
 4     this.nettyClientConfig = new NettyClientConfig();
 5     this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
 6     this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
 7     this.clientRemotingProcessor = new ClientRemotingProcessor(this);
 8     this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
 9 
10     if (this.clientConfig.getNamesrvAddr() != null) {
11         this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
12         log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
13     }
14 
15     this.clientId = clientId;
16 
17     this.mQAdminImpl = new MQAdminImpl(this);
18 
19     this.pullMessageService = new PullMessageService(this);
20 
21     this.rebalanceService = new RebalanceService(this);
22 
23     this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
24     this.defaultMQProducer.resetClientConfig(clientConfig);
25 
26     this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
27 
28     log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
29         this.instanceIndex,
30         this.clientId,
31         this.clientConfig,
32         MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
33 }

可以看到MQClientInstance的构造方法创建了很多东西,就不一一说明,主要说几个重要的
其中nettyClientConfig,就很清楚的说明了RocketMQ通过Netty来进行网络之间的I/O,其保存了对Netty的一些配置
clientRemotingProcessor,用来进行消息的处理

parable<BrokerData>{2privateStringcluster;3privateStringbrokerName;4privateHashMap<Long/*br

mQClientAPIImpl则是一个非常重要的部分,直接实例化了一个MQClientAPIImpl对象:

rHowmuch:消息容量限制,超过需要进行压缩retryTimesWhenSendFailed:同步消息发送失败的允许重发次数retryTimesWhenSendAsyncFailed:异步消息发送

 1 public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
 2     final ClientRemotingProcessor clientRemotingProcessor,
 3     RPCHook rpcHook, final ClientConfig clientConfig) {
 4     this.clientConfig = clientConfig;
 5     topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
 6     this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
 7     this.clientRemotingProcessor = clientRemotingProcessor;
 8     
 9     this.remotingClient.registerRPCHook(rpcHook);
10     this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
11     
12     this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
13     
14     this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);
15     
16     this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);
17     
18     this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
19     
20     this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
21 }

可以看到在这个构造方法里,首先创建了一个TopAddressing,用于以后的名称服务的寻址,其默认地址是:

hreads(clientConfig.getClientCallbackExecutorThreads());6this.nettyClientConfig.setUseTLS(clientConf

1 http://jmenv.tbsite.net:8080/rocketmq/nsaddr

需要通过系统属性来完成更改

ngaddrs=this.topAddressing.fetchNSAddr();4if(addrs!=null){5if(!addrs.equals(this.nameSrvAddr)){6log.

接着创建了一个NettyRemotingClient,这个就是实实在在的Netty客户端

t(topic,cloneTopicRouteData);68returntrue;69}70}else{71log.warn("updateTopicRouteInfoFromNameServer,

 1 private final Bootstrap bootstrap = new Bootstrap();
 2 // 名称服务列表
 3 private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
 4 
 5 public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
 6     final ChannelEventListener channelEventListener) {
 7     super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
 8     this.nettyClientConfig = nettyClientConfig;
 9     this.channelEventListener = channelEventListener;
10 
11     int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
12     if (publicThreadNums <= 0) {
13         publicThreadNums = 4;
14     }
15 
16     this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
17         private AtomicInteger threadIndex = new AtomicInteger(0);
18 
19         @Override
20         public Thread newThread(Runnable r) {
21             return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
22         }
23     });
24 
25     this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
26         private AtomicInteger threadIndex = new AtomicInteger(0);
27 
28         @Override
29         public Thread newThread(Runnable r) {
30             return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
31         }
32     });
33 
34     if (nettyClientConfig.isUseTLS()) {
35         try {
36             sslContext = TlsHelper.buildSslContext(true);
37             log.info("SSL enabled for client");
38         } catch (IOException e) {
39             log.error("Failed to create SSLContext", e);
40         } catch (CertificateException e) {
41             log.error("Failed to create SSLContext", e);
42             throw new RuntimeException("Failed to create SSLContext", e);
43         }
44     }
45 }

此时Netty的客户端仅仅完成了对Bootstrap的初始化,以及对NioEventLoopGroup的设置和初始化

rborharbor是一个开源的docker镜像库系统。眼尖的人可以看出,拓补图中的harbor拓补的高可用其实是存在问题的。我们目前采用的是双主模式:可以发现,如果复制过程中出现了问题那么就可能会造

回到MQClientInstance的构造方法,在完成MQClientAPIImpl的创建后,会根据clientConfig的getNamesrvAddr判断是否设置了namesrvAddr名称服务地址,若是设置了,需要通过mQClientAPIImpl的updateNameServerAddressList方法,完成对名称服务地址的更新:

ringBuilder();3sb.append(this.getClientIP());45sb.append("@");6sb.append(this.getInstanceName());7if

MQClientAPIImpl的updateNameServerAddressList方法:

ClientFactory的start方法,这里默认startFactory是true,就需要调用mQClientFactory的start方法:MQClientInstance的start方法:1p

1 public void updateNameServerAddressList(final String addrs) {
2     String[] addrArray = addrs.split(";");
3     List<String> list = Arrays.asList(addrArray);
4     this.remotingClient.updateNameServerAddressList(list);
5 }

由于名称服务可以是集群的方式,所以在这里用“;”进行分割,得到所有的名称服务地址,再由remotingClient进行更新,而此时的remotingClient也就是刚才创建的NettyRemotingClient
NettyRemotingClient的updateNameServerAddressList方法:

hanged,old[{}],new[{}]",topic,old,topicRouteData);31}3233if(changed){34TopicRouteDatacloneTopicRoute

 1 public void updateNameServerAddressList(List<String> addrs) {
 2     List<String> old = this.namesrvAddrList.get();
 3     boolean update = false;
 4 
 5     if (!addrs.isEmpty()) {
 6         if (null == old) {
 7             update = true;
 8         } else if (addrs.size() != old.size()) {
 9             update = true;
10         } else {
11             for (int i = 0; i < addrs.size() && !update; i++) {
12                 if (!old.contains(addrs.get(i))) {
13                     update = true;
14                 }
15             }
16         }
17 
18         if (update) {
19             Collections.shuffle(addrs);
20             log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
21             this.namesrvAddrList.set(addrs);
22         }
23     }
24 }

这里逻辑比较简单,完成了名称服务列表的更新

ubeadm/ha-topology/Master节点的组件apiservercontroller-managerscheduler一个master节点主要含有上面3个组件(像cloud-contro

回到MQClientInstance的构造方法,做完以上操作后,又在后面创建了MQAdminImpl、PullMessageService、RebalanceService、ConsumerStatsManager以及一个新的DefaultMQProducer,关于这几个在后面出现时再介绍

2privatefinalstaticInternalLoggerlog=ClientLogger.getLog();3privatestaticMQClientManagerinstance=new

回到MQClientManager的getAndCreateMQClientInstance方法,在完成MQClientInstance的创建后,将其放入缓存中

送的具体的消息队列MessageQueue:1publicclassMessageQueueimplementsComparable<MessageQueue>,Serializable{

再回到DefaultMQProducerImpl的start方法,在创建完MQClientInstance后,调用registerProducer方法
MQClientInstance的registerProducer方法:

的Topic封装在topicList,交给updateTopicRouteInfoFromNameServer调用updateTopicRouteInfoFromNameServer方法:1publi

 1 public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
 2     if (null == group || null == producer) {
 3         return false;
 4     }
 5 
 6     MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
 7     if (prev != null) {
 8         log.warn("the producer group[{}] exist already.", group);
 9         return false;
10     }
11 
12     return true;
13 }

在MQClientInstance初始化时,会创建producerTable 、consumerTable 、topicRouteTable 、brokerAddrTable 这几个比较重要的map

t方法已经基本完毕,只不过在最后,会通过mQClientFactory的sendHeartbeatToAllBrokerWithLock方法,给所有Broker发送一次心跳包到此,Producer的启

1 private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
2 private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
3 private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
4 private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
5         new ConcurrentHashMap<String, HashMap<Long, String>>();

其中MQProducerInner是接口,DefaultMQProducerImpl是其实现类,完成了以group组名称为键值的DefaultMQProducerImpl的关联
在这里就是根据group,进行DefaultMQProducerImpl的缓存,MQConsumerInner同理
topicRouteTable 则记录与Topic对应的Broker以及消息队列信息
brokerAddrTable则记录与Broker Name对应的Broker的地址列表

ducer,关于这几个在后面出现时再介绍回到MQClientManager的getAndCreateMQClientInstance方法,在完成MQClientInstance的创建后,将其放入缓存中

还是回到start方法,在完成registerProducer方法后,根据返回值registerOK,判断接下来的操作
若是失败,将serviceState置为CREATE_JUST,并报出异常,方便下一次的正常start

1publicvoidrun(){32try{33MQClientInstance.this.cleanOfflineBroker();34MQClientInstance.this.sendHear

若是成功,则先需要向topicPublishInfoTable中添加一条键值为createTopicKey("TBW102")的TopicPublishInfo记录
TopicPublishInfo:

neAddrTable.isEmpty()){27itBrokerTable.remove();28log.info("thebroker[{}]name"shostisoffline,rem

1 public class TopicPublishInfo {
2     private boolean orderTopic = false;
3     private boolean haveTopicRouterInfo = false;
4     private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
5     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
6 }

其中messageQueueList存放消息队列MessageQueue,sendWhichQueue 是用来获取sendWhichQueue中的下标,也就是当前所要发送的具体的消息队列

defaultMQProducer.getProducerGroup(),this);19if(!registerOK){20this.serviceState=ServiceState.CREATE

MessageQueue:

stance的构造方法,在完成MQClientAPIImpl的创建后,会根据clientConfig的getNamesrvAddr判断是否设置了namesrvAddr名称服务地址,若是设置了,需要通过

 1 public class MessageQueue implements Comparable<MessageQueue>, Serializable {
 2     private static final long serialVersionUID = 6191200464116433425L;
 3     private String topic;
 4     private String brokerName;
 5     private int queueId;
 6 
 7     public MessageQueue() {
 8     }
 9 
10     public MessageQueue(String topic, String brokerName, int queueId) {
11         this.topic = topic;
12         this.brokerName = brokerName;
13         this.queueId = queueId;
14     }
15 }

可以看到这是一个简单的pojo,其封装了topic,brokerName以及queueId

);4142this.timer.scheduleAtFixedRate(newTimerTask(){43@Override44publicvoidrun(){45try{46NettyRemoti

ThreadLocalIndex :

是否有需要更新有关该Topic的路由信息当存在需要跟新的情况时,在updateTopicRouteInfoFromNameServer中首先从topicRouteData中取出BrokerData,即

 1 public class ThreadLocalIndex {
 2     private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
 3     private final Random random = new Random();
 4 
 5     public int getAndIncrement() {
 6         Integer index = this.threadLocalIndex.get();
 7         if (null == index) {
 8             index = Math.abs(random.nextInt());
 9             if (index < 0)
10                 index = 0;
11             this.threadLocalIndex.set(index);
12         }
13 
14         index = Math.abs(index + 1);
15         if (index < 0)
16             index = 0;
17 
18         this.threadLocalIndex.set(index);
19         return index;
20     }
21 
22     @Override
23     public String toString() {
24         return "ThreadLocalIndex{" +
25             "threadLocalIndex=" + threadLocalIndex.get() +
26             "}";
27     }
28 }

通过ThreadLocal,赋予每个线程一个随机值,后面会根据这个随机值通过和messageQueueList的length取余运算,选取一个MessageQueue ,进而选取一条真正的消息队列进行消息发送

28newThreadFactory(){29privateAtomicIntegerthreadIndex=newAtomicInteger(0);3031@Override32publicThre

再次回到DefaultMQProducerImpl的start方法,在完成createTopicKey的Topic的记录添加后,根据startFactory判断是否需要调用mQClientFactory的start方法,这里默认startFactory是true,就需要调用mQClientFactory的start方法:

ientConfig.getClientAsyncSemaphoreValue());8this.nettyClientConfig=nettyClientConfig;9this.channelEv

MQClientInstance的start方法:

@Override9publicThreadnewThread(Runnabler){10returnnewThread(r,"NettyClientWorkerThread_"+this.threa

 1 public void start() throws MQClientException {
 2     synchronized (this) {
 3         switch (this.serviceState) {
 4             case CREATE_JUST:
 5                 this.serviceState = ServiceState.START_FAILED;
 6                 // If not specified,looking address from name server
 7                 if (null == this.clientConfig.getNamesrvAddr()) {
 8                     this.mQClientAPIImpl.fetchNameServerAddr();
 9                 }
10                 // Start request-response channel
11                 this.mQClientAPIImpl.start();
12                 // Start various schedule tasks
13                 this.startScheduledTask();
14                 // Start pull service
15                 this.pullMessageService.start();
16                 // Start rebalance service
17                 this.rebalanceService.start();
18                 // Start push service
19                 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
20                 log.info("the client factory [{}] start OK", this.clientId);
21                 this.serviceState = ServiceState.RUNNING;
22                 break;
23             case RUNNING:
24                 break;
25             case SHUTDOWN_ALREADY:
26                 break;
27             case START_FAILED:
28                 throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
29             default:
30                 break;
31         }
32     }
33 }

MQClientInstance在创建时其serviceState状态也是CREATE_JUST

Change方法:1privatebooleantopicRouteDataIsChange(TopicRouteDataolddata,TopicRouteDatanowdata){2if(oldd

这里首先检查名称服务地址是否设置,若是没有设置,则通过MQClientAPIImpl的fetchNameServerAddr方法,尝试自动获取名称服务
MQClientAPIImpl的fetchNameServerAddr方法:

>();4privatefinalArrayList<SendMessageHook>sendMessageHookList=newArrayList<SendMessageH

 1 public String fetchNameServerAddr() {
 2     try {
 3         String addrs = this.topAddressing.fetchNSAddr();
 4         if (addrs != null) {
 5             if (!addrs.equals(this.nameSrvAddr)) {
 6                 log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs);
 7                 this.updateNameServerAddressList(addrs);
 8                 this.nameSrvAddr = addrs;
 9                 return nameSrvAddr;
10             }
11         }
12     } catch (Exception e) {
13         log.error("fetchNameServerAddr Exception", e);
14     }
15     return nameSrvAddr;
16 }

这里首先根据topAddressing的fetchNSAddr方法获取名称服务地址,若是获取到了,则判断是否需要更新名称服务列表以及原来的nameSrvAddr

ook(rpcHook);10this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE,this.client

topAddressing在前面说过,MQClientAPIImpl构造方法中,创建TopAddressing实例
TopAddressing的fetchNSAddr方法:

Content);56HttpURLConnectionconn=null;7try{8conn=(HttpURLConnection)newURL(url).openConnection();9co

 1 public final String fetchNSAddr() {
 2     return fetchNSAddr(true, 3000);
 3 }
 4 
 5 public final String fetchNSAddr(boolean verbose, long timeoutMills) {
 6     String url = this.wsAddr;
 7     try {
 8         if (!UtilAll.isBlank(this.unitName)) {
 9             url = url + "-" + this.unitName + "?nofix=1";
10         }
11         HttpTinyClient.HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills);
12         if (200 == result.code) {
13             String responseStr = result.content;
14             if (responseStr != null) {
15                 return clearNewLine(responseStr);
16             } else {
17                 log.error("fetch nameserver address is null");
18             }
19         } else {
20             log.error("fetch nameserver address failed. statusCode=" + result.code);
21         }
22     } catch (IOException e) {
23         if (verbose) {
24             log.error("fetch name server address exception", e);
25         }
26     }
27 
28     if (verbose) {
29         String errorMsg =
30             "connect to " + url + " failed, maybe the domain name " + MixAll.getWSAddr() + " not bind in /etc/hosts";
31         errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL);
32 
33         log.warn(errorMsg);
34     }
35     return null;
36 }

首先根据wsAddr和unitName创建url,其中wsAddr在前面说过,默认是http://jmenv.tbsite.net:8080/rocketmq/nsaddr,需要通过系统属性来更改

xception,RemotingConnectException{8GetRouteInfoRequestHeaderrequestHeader=newGetRouteInfoRequestHead

然后通过HttpTinyClient的httpGet方法建立连接,进行GET请求,获取名称地址
HttpTinyClient的httpGet方法:

层的参数配置:1publicclassClientConfig{2publicstaticfinalStringSEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY="com.

 1 static public HttpResult httpGet(String url, List<String> headers, List<String> paramValues,
 2     String encoding, long readTimeoutMs) throws IOException {
 3     String encodedContent = encodingParams(paramValues, encoding);
 4     url += (null == encodedContent) ? "" : ("?" + encodedContent);
 5 
 6     HttpURLConnection conn = null;
 7     try {
 8         conn = (HttpURLConnection) new URL(url).openConnection();
 9         conn.setRequestMethod("GET");
10         conn.setConnectTimeout((int) readTimeoutMs);
11         conn.setReadTimeout((int) readTimeoutMs);
12         setHeaders(conn, headers, encoding);
13 
14         conn.connect();
15         int respCode = conn.getResponseCode();
16         String resp = null;
17 
18         if (HttpURLConnection.HTTP_OK == respCode) {
19             resp = IOTinyUtils.toString(conn.getInputStream(), encoding);
20         } else {
21             resp = IOTinyUtils.toString(conn.getErrorStream(), encoding);
22         }
23         return new HttpResult(respCode, resp);
24     } finally {
25         if (conn != null) {
26             conn.disconnect();
27         }
28     }
29 }

这里就通过了JDK原生的HttpURLConnection ,完成了一次指定url的GET请求,返回请求数据,将请求到的数据以及状态码封装为HttpResult,返回给上一级调用,也就是TopAddressing的fetchNSAddr方法中,再调用clearNewLine方法,将状态码为200的数据处理(清除不必要的空客、换行、回车),得到名称地址,最后回到fetchNameServerAddr方法中,完成名称服务列表的更新,至此自动获取名称服务结束

Info(topic,topicRouteData);43publishInfo.setHaveTopicRouterInfo(true);44Iterator<Entry<String,

回到MQClientInstance的start方法中:
在确定有名称服务的情况下,首先调用mQClientAPIImpl的start方法:
MQClientAPIImpl的start方法:

行于其它节点上的实例属于待命状态,只有当激活状态的实例不可用时才会尝试将自己设为激活状态。这边牵扯到了领导选举(zookeeper、consul等分布式集群系统也是需要领导选举)Master高可用需要

1 public void start() {
2     this.remotingClient.start();
3 }

这里实际上调用了前面所创建的Nettt客户端的start方法:
NettyRemotingClient的start方法:

publicstaticMQClientManagergetInstance(){12returninstance;13}14}其中factoryTable是所有生产者客户端实例的map缓存,fact

 1 public void start() {
 2     this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
 3         nettyClientConfig.getClientWorkerThreads(),
 4         new ThreadFactory() {
 5 
 6             private AtomicInteger threadIndex = new AtomicInteger(0);
 7 
 8             @Override
 9             public Thread newThread(Runnable r) {
10                 return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
11             }
12         });
13 
14     Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
15         .option(ChannelOption.TCP_NODELAY, true)
16         .option(ChannelOption.SO_KEEPALIVE, false)
17         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
18         .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
19         .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
20         .handler(new ChannelInitializer<SocketChannel>() {
21             @Override
22             public void initChannel(SocketChannel ch) throws Exception {
23                 ChannelPipeline pipeline = ch.pipeline();
24                 if (nettyClientConfig.isUseTLS()) {
25                     if (null != sslContext) {
26                         pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
27                         log.info("Prepend SSL handler");
28                     } else {
29                         log.warn("Connections are insecure as SSLContext is null!");
30                     }
31                 }
32                 pipeline.addLast(
33                     defaultEventExecutorGroup,
34                     new NettyEncoder(),
35                     new NettyDecoder(),
36                     new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
37                     new NettyConnectManageHandler(),
38                     new NettyClientHandler());
39             }
40         });
41 
42     this.timer.scheduleAtFixedRate(new TimerTask() {
43         @Override
44         public void run() {
45             try {
46                 NettyRemotingClient.this.scanResponseTable();
47             } catch (Throwable e) {
48                 log.error("scanResponseTable exception", e);
49             }
50         }
51     }, 1000 * 3, 1000);
52 
53     if (this.channelEventListener != null) {
54         this.nettyEventExecutor.start();
55     }
56 }

这里完成了Bootstrap对前面创建的EventLoopGroup以及handler的绑定

ame=brokerName;13this.queueId=queueId;14}15}可以看到这是一个简单的pojo,其封装了topic,brokerName以及queueIdThreadLocal


在完成mQClientAPIImpl的start方法后,调用startScheduledTask方法,启动定时任务
startScheduledTask方法:

ry{8if(this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS,TimeUnit.MILLISECONDS)){9try{10TopicRouteDatatop

 1 private void startScheduledTask() {
 2     if (null == this.clientConfig.getNamesrvAddr()) {
 3         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 4 
 5             @Override
 6             public void run() {
 7                 try {
 8                     MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
 9                 } catch (Exception e) {
10                     log.error("ScheduledTask fetchNameServerAddr exception", e);
11                 }
12             }
13         }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
14     }
15 
16     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
17 
18         @Override
19         public void run() {
20             try {
21                 MQClientInstance.this.updateTopicRouteInfoFromNameServer();
22             } catch (Exception e) {
23                 log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
24             }
25         }
26     }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
27 
28     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
29 
30         @Override
31         public void run() {
32             try {
33                 MQClientInstance.this.cleanOfflineBroker();
34                 MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
35             } catch (Exception e) {
36                 log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
37             }
38         }
39     }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
40 
41     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
42 
43         @Override
44         public void run() {
45             try {
46                 MQClientInstance.this.persistAllConsumerOffset();
47             } catch (Exception e) {
48                 log.error("ScheduledTask persistAllConsumerOffset exception", e);
49             }
50         }
51     }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
52 
53     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
54 
55         @Override
56         public void run() {
57             try {
58                 MQClientInstance.this.adjustThreadPool();
59             } catch (Exception e) {
60                 log.error("ScheduledTask adjustThreadPool exception", e);
61             }
62         }
63     }, 1, 1, TimeUnit.MINUTES);
64 }

可以看到,一共设置了五个定时任务

生产线的整体拓补图(隐去了IP,除了K8SNode块其它实例数与图中一致)SLBLVS、HAProxy被规划为基础层,主要提供了一个高可用的7层负载均衡器。由LVSkeepalived提供一个高可用的

①若是名称服务地址namesrvAddr不存在,则调用前面的fetchNameServerAddr方法,定时更新名称服务

ic:{}",topic);72}73}catch(Exceptione){74if(!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)&&a

②通过updateTopicRouteInfoFromNameServer方法定时更新Topic所对应的路由信息:

由于我们刚起步,还没有搭建分布式存储系统,后面当搭建了Ceph集群后会转成这种模式。如果大家现状允许可以直接采用共享存储的方式搭建harbor。高可用验证至此生产可用的k8s集群已“搭建完成”。为什么

 1 public void updateTopicRouteInfoFromNameServer() {
 2     Set<String> topicList = new HashSet<String>();
 3 
 4     // Consumer
 5     {
 6         Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
 7         while (it.hasNext()) {
 8             Entry<String, MQConsumerInner> entry = it.next();
 9             MQConsumerInner impl = entry.getValue();
10             if (impl != null) {
11                 Set<SubscriptionData> subList = impl.subscriptions();
12                 if (subList != null) {
13                     for (SubscriptionData subData : subList) {
14                         topicList.add(subData.getTopic());
15                     }
16                 }
17             }
18         }
19     }
20 
21     // Producer
22     {
23         Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
24         while (it.hasNext()) {
25             Entry<String, MQProducerInner> entry = it.next();
26             MQProducerInner impl = entry.getValue();
27             if (impl != null) {
28                 Set<String> lst = impl.getPublishTopicList();
29                 topicList.addAll(lst);
30             }
31         }
32     }
33 
34     for (String topic : topicList) {
35         this.updateTopicRouteInfoFromNameServer(topic);
36     }
37 }

将所有Consumer和Producer的Topic封装在topicList,交给updateTopicRouteInfoFromNameServer调用

"@");9sb.append(this.unitName);10}1112returnsb.toString();13}clientId主要由生产者客户端的ip地址以及实例名称,根据unitName

updateTopicRouteInfoFromNameServer方法:

chronized(this){3switch(this.serviceState){4caseCREATE_JUST:5this.serviceState=ServiceState.START_FA

 1 public boolean updateTopicRouteInfoFromNameServer(final String topic) {
 2     return updateTopicRouteInfoFromNameServer(topic, false, null);
 3 }
 4 
 5 public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
 6         DefaultMQProducer defaultMQProducer) {
 7     try {
 8         if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
 9             try {
10                 TopicRouteData topicRouteData;
11                 if (isDefault && defaultMQProducer != null) {
12                     topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
13                         1000 * 3);
14                     if (topicRouteData != null) {
15                         for (QueueData data : topicRouteData.getQueueDatas()) {
16                             int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
17                             data.setReadQueueNums(queueNums);
18                             data.setWriteQueueNums(queueNums);
19                         }
20                     }
21                 } else {
22                     topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
23                 }
24                 if (topicRouteData != null) {
25                     TopicRouteData old = this.topicRouteTable.get(topic);
26                     boolean changed = topicRouteDataIsChange(old, topicRouteData);
27                     if (!changed) {
28                         changed = this.isNeedUpdateTopicRouteInfo(topic);
29                     } else {
30                         log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
31                     }
32 
33                     if (changed) {
34                         TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
35 
36                         for (BrokerData bd : topicRouteData.getBrokerDatas()) {
37                             this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
38                         }
39 
40                         // Update Pub info
41                         {
42                             TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
43                             publishInfo.setHaveTopicRouterInfo(true);
44                             Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
45                             while (it.hasNext()) {
46                                 Entry<String, MQProducerInner> entry = it.next();
47                                 MQProducerInner impl = entry.getValue();
48                                 if (impl != null) {
49                                     impl.updateTopicPublishInfo(topic, publishInfo);
50                                 }
51                             }
52                         }
53 
54                         // Update sub info
55                         {
56                             Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
57                             Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
58                             while (it.hasNext()) {
59                                 Entry<String, MQConsumerInner> entry = it.next();
60                                 MQConsumerInner impl = entry.getValue();
61                                 if (impl != null) {
62                                     impl.updateTopicSubscribeInfo(topic, subscribeInfo);
63                                 }
64                             }
65                         }
66                         log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
67                         this.topicRouteTable.put(topic, cloneTopicRouteData);
68                         return true;
69                     }
70                 } else {
71                     log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
72                 }
73             } catch (Exception e) {
74                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
75                     log.warn("updateTopicRouteInfoFromNameServer Exception", e);
76                 }
77             } finally {
78                 this.lockNamesrv.unlock();
79             }
80         } else {
81             log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
82         }
83     } catch (InterruptedException e) {
84         log.warn("updateTopicRouteInfoFromNameServer Exception", e);
85     }
86 
87     return false;
88 }

这里首先由mQClientAPIImpl的getTopicRouteInfoFromNameServer方法,从名称服务器上获取其Topic所对应的路由信息

6for(BrokerDatabd:topicRouteData.getBrokerDatas()){37this.brokerAddrTable.put(bd.getBrokerName(),bd.

其中Topic的路由信息由TopicRouteData进行封装:

)){5thrownewMQClientException("producerGroupisnull",null);6}78if(this.defaultMQProducer.getProducerG

1 public class TopicRouteData extends RemotingSerializable {
2     private String orderTopicConf;
3     private List<QueueData> queueDatas;
4     private List<BrokerData> brokerDatas;
5     private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
6 }

QueueData:

rExistInTopicRouteTable(addr)){21it.remove();22log.info("thebrokeraddr[{}{}]isoffline,removeit",brok

1 public class QueueData implements Comparable<QueueData> {
2     private String brokerName;
3     private int readQueueNums;
4     private int writeQueueNums;
5     private int perm;
6     private int topicSynFlag;
7 }

BrokerData:

ance().getAndCreateMQClientInstance(this.defaultMQProducer,rpcHook);1718booleanregisterOK=mQClientFa

1 public class BrokerData implements Comparable<BrokerData> {
2     private String cluster;
3     private String brokerName;
4     private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
5 }


getTopicRouteInfoFromNameServer方法:

newRuntimeException("FailedtocreateSSLContext",e);43}44}45}此时Netty的客户端仅仅完成了对Bootstrap的初始化,以及对NioEven

 1 public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
 2         throws RemotingException, MQClientException, InterruptedException {
 3     return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
 4 }
 5 
 6     public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
 7         boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
 8     GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
 9     requestHeader.setTopic(topic);
10 
11     RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
12 
13     RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
14     assert response != null;
15     switch (response.getCode()) {
16         case ResponseCode.TOPIC_NOT_EXIST: {
17             if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
18                 log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
19             }
20 
21             break;
22         }
23         case ResponseCode.SUCCESS: {
24             byte[] body = response.getBody();
25             if (body != null) {
26                 return TopicRouteData.decode(body, TopicRouteData.class);
27             }
28         }
29         default:
30             break;
31     }
32 
33     throw new MQClientException(response.getCode(), response.getRemark());
34 }

这里主要通过remotingClient即Netty客户端的invokeSync方法向名称服务器发送封装好的request请求来获取response
通过名称服务器寻找与Topic相关的Broker有关路由信息,将这些信息作为response返回,在这里接收到进行处理,封装成TopicRouteData

r(0,0,nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),37newNettyConnectManageHandler(),38new

在invokeSync方法中采用懒加载的方式,尝试获取已经建立好连接的Channel,若是没有,则需要通过bootstrap的connect方法先建立连接产生ChannelFuture,进而获取并缓存Channel

=entry.getValue();19if(impl!=null){20result=impl.isSubscribeTopicNeedUpdate(topic);21}22}23}2425retu

回到updateTopicRouteInfoFromNameServer,通过名称服务器获取到了有关Topic的路由信息,调用topicRouteDataIsChange方法和原来topicRouteTable保存的路由信息进行比较
topicRouteDataIsChange方法:

ocessor,null);1516this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT,

 1 private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {
 2     if (olddata == null || nowdata == null)
 3         return true;
 4     TopicRouteData old = olddata.cloneTopicRouteData();
 5     TopicRouteData now = nowdata.cloneTopicRouteData();
 6     Collections.sort(old.getQueueDatas());
 7     Collections.sort(old.getBrokerDatas());
 8     Collections.sort(now.getQueueDatas());
 9     Collections.sort(now.getBrokerDatas());
10     return !old.equals(now);
11 }

若是没有发生改变,任然要调用isNeedUpdateTopicRouteInfo方法检查是否有需要更新

on.HTTP_OK==respCode){19resp=IOTinyUtils.toString(conn.getInputStream(),encoding);20}else{21resp=IOT

isNeedUpdateTopicRouteInfo方法:

response!=null;15switch(response.getCode()){16caseResponseCode.TOPIC_NOT_EXIST:{17if(allowTopicNotEx

 1 private boolean isNeedUpdateTopicRouteInfo(final String topic) {
 2     boolean result = false;
 3     {
 4         Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
 5         while (it.hasNext() && !result) {
 6             Entry<String, MQProducerInner> entry = it.next();
 7             MQProducerInner impl = entry.getValue();
 8             if (impl != null) {
 9                 result = impl.isPublishTopicNeedUpdate(topic);
10             }
11         }
12     }
13 
14     {
15         Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
16         while (it.hasNext() && !result) {
17             Entry<String, MQConsumerInner> entry = it.next();
18             MQConsumerInner impl = entry.getValue();
19             if (impl != null) {
20                 result = impl.isSubscribeTopicNeedUpdate(topic);
21             }
22         }
23     }
24 
25     return result;
26 }

分别对所有的消费者和生产者进行检查是否有需要更新有关该Topic的路由信息

operty("rocketmq.client.name","DEFAULT");910privateintclientCallbackExecutorThreads=Runtime.getRunti

当存在需要跟新的情况时,在updateTopicRouteInfoFromNameServer中
首先从topicRouteData中取出BrokerData,即Broker的路由信息,进行更新
再根据topicRouteData从中获取消费者生产者的消息路由信息,分别进行更新

eIndex:{},ClientID:{},ClientConfig:{},ClientVersion:{},SerializerType:{}",29this.instanceIndex,30thi

③定时清除离线的Broker,以及向当前在线的Broker发送心跳包
cleanOfflineBroker清除离线的Broker:

Y_GROUP_TOPIC_PREFIX)&&!topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)){75log.warn("update

 1 private void cleanOfflineBroker() {
 2     try {
 3         if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
 4             try {
 5                 ConcurrentHashMap<String, HashMap<Long, String>> updatedTable = new ConcurrentHashMap<String, HashMap<Long, String>>();
 6 
 7                 Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();
 8                 while (itBrokerTable.hasNext()) {
 9                     Entry<String, HashMap<Long, String>> entry = itBrokerTable.next();
10                     String brokerName = entry.getKey();
11                     HashMap<Long, String> oneTable = entry.getValue();
12 
13                     HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>();
14                     cloneAddrTable.putAll(oneTable);
15 
16                     Iterator<Entry<Long, String>> it = cloneAddrTable.entrySet().iterator();
17                     while (it.hasNext()) {
18                         Entry<Long, String> ee = it.next();
19                         String addr = ee.getValue();
20                         if (!this.isBrokerAddrExistInTopicRouteTable(addr)) {
21                             it.remove();
22                             log.info("the broker addr[{} {}] is offline, remove it", brokerName, addr);
23                         }
24                     }
25 
26                     if (cloneAddrTable.isEmpty()) {
27                         itBrokerTable.remove();
28                         log.info("the broker[{}] name"s host is offline, remove it", brokerName);
29                     } else {
30                         updatedTable.put(brokerName, cloneAddrTable);
31                     }
32                 }
33 
34                 if (!updatedTable.isEmpty()) {
35                     this.brokerAddrTable.putAll(updatedTable);
36                 }
37             } finally {
38                 this.lockNamesrv.unlock();
39             }
40     } catch (InterruptedException e) {
41         log.warn("cleanOfflineBroker Exception", e);
42     }
43 }

这里的brokerAddrTable是会通过②中的定时任务来更新,遍历其中的所有Broker信息,通过isBrokerAddrExistInTopicRouteTable方法,进行检查:

可用验证至此生产可用的k8s集群已“搭建完成”。为什么打引号?因为我们还没有进行测试和验证,下面给出我们列出的上线前的验证清单。其中harbor由于我们采用的是双主,所以目前还标记为警告状态。还有涉及

 1 private boolean isBrokerAddrExistInTopicRouteTable(final String addr) {
 2     Iterator<Entry<String, TopicRouteData>> it = this.topicRouteTable.entrySet().iterator();
 3     while (it.hasNext()) {
 4         Entry<String, TopicRouteData> entry = it.next();
 5         TopicRouteData topicRouteData = entry.getValue();
 6         List<BrokerData> bds = topicRouteData.getBrokerDatas();
 7         for (BrokerData bd : bds) {
 8             if (bd.getBrokerAddrs() != null) {
 9                 boolean exist = bd.getBrokerAddrs().containsValue(addr);
10                 if (exist)
11                     return true;
12             }
13         }
14     }
15 
16     return false;
17 }

通过比对topicRouteTable中的所有TopicRouteData保存的BrokerAddrs来判断,若是Broker不存在,需要进行清除,进而更新brokerAddrTable

生产者客户端的ip地址以及实例名称,根据unitName的有无,附加unitName通过生成的clientId,在factoryTable缓存中先去获取是否创建过客户端实例若是没有获取到,就需要实例化

sendHeartbeatToAllBrokerWithLock定时向Broker发送心跳包:

eState=ServiceState.START_FAILED;6//Ifnotspecified,lookingaddressfromnameserver7if(null==this.client

 1 public void sendHeartbeatToAllBrokerWithLock() {
 2     if (this.lockHeartbeat.tryLock()) {
 3         try {
 4             this.sendHeartbeatToAllBroker();
 5             this.uploadFilterClassSource();
 6         } catch (final Exception e) {
 7             log.error("sendHeartbeatToAllBroker exception", e);
 8         } finally {
 9             this.lockHeartbeat.unlock();
10         }
11     } else {
12         log.warn("lock heartBeat, but failed.");
13     }
14 }

这一部分就不详细介绍,主要还是通过Netty客户端完成心跳包的发送

的服务。k8sapiserver、harbor、etcd都是以HTTP的方式提供的api,如果有7层代理能力的服务后续会更容易维护和扩展。硬件配置用途数量CPU内存Keepalived224GBHAP

④定时持久化消费者队列的消费进度,这个在分析消费者时再详细说明

faultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)){9thrownewMQClientException

⑤定时调整消费者端的线程池的大小,还是在分析消费者时再详细说明

sterProducer方法后,根据返回值registerOK,判断接下来的操作若是失败,将serviceState置为CREATE_JUST,并报出异常,方便下一次的正常start若是成功,则先需要

startScheduledTask创建的五个定时任务结束,回到MQClientInstance的start方法
接着开启pullMessageService服务,为消费者拉取消息
然后开启rebalanceService服务,用来均衡消息队列
这两个服务在有关消费者时再介绍

>entry=it.next();5TopicRouteDatatopicRouteData=entry.getValue();6List<BrokerData>bds=topicR

接着通过:

7caseSTART_FAILED:38caseSHUTDOWN_ALREADY:39thrownewMQClientException("TheproducerservicestatenotOK,m

1 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

开启push service服务
其中defaultMQProducer是在前面MQClientInstance构造方法中创建的

kerInterval:向Broker发送心跳包的时间间隔SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY和vipChannelEnabled:决定是否使用VIP通道,即高

1 this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);

只不过他调用的start方法,参数为false,也就是没有调用mQClientFactory的start方法
后续会介绍其用途

motingProcessor,3RPCHookrpcHook,finalClientConfigclientConfig){4this.clientConfig=clientConfig;5topA

到这DefaultMQProducerImpl的start方法已经基本完毕,只不过在最后,会通过mQClientFactory的sendHeartbeatToAllBrokerWithLock方法,给所有Broker发送一次心跳包

属性来更改然后通过HttpTinyClient的httpGet方法建立连接,进行GET请求,获取名称地址HttpTinyClient的httpGet方法:1staticpublicHttpResult

到此,Producer的启动结束

ingException,MQClientException,InterruptedException{3returngetTopicRouteInfoFromNameServer(topic,tim

文章版权声明:除非注明,否则均为本站原创文章,转载或复制请以超链接形式并注明出处。
分享到:
赞 (0

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

发表评论

快捷回复:

评论列表 (有 0条评论, 1418人围观) 参与讨论