3.3 生产者启动流程
消息生产者的代码都在client模块中,对于RocketMQ来说,它既是客户端,也是消息的提供者,我们在应用系统中初始化生产者的一个实例即可使用它来发消息。
3.3.1 初识DefaultMQProducer
DefaultMQProducer是默认的消息生产者实现类,实现了MQAdmin的接口,其主要接口如图3-6、图3-7所示。
图3-6 MQAdmin接口
下面介绍DefaultMQProducer的主要方法,核心属性如代码清单3-2所示。
1)void createTopic(String key, String newTopic, int queueNum, int topicSysFlag):创建主题。
- key:目前无实际作用,可以与newTopic相同。
- newTopic:主题名称。
- queueNum:队列数量。
- topicSysFlag:主题系统标签,默认为0。
图3-7 MQProducer接口
2)long searchOffset(final MessageQueue mq, final long timestamp):根据时间戳从队列中查找其偏移量。
3)long maxOffset(final MessageQueue mq):查找该消息队列中最大的物理偏移量。
4)long minOffset(final MessageQueue mq):查找该消息队列中的最小物理偏移量。
5)MessageExt viewMessage(final String offsetMsgId):根据消息偏移量查找消息。
6)QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end):根据条件查询消息。
- topic:消息主题。
- key:消息索引字段。
- maxNum:本次最多取出的消息条数。
- begin:开始时间。
- end:结束时间。
7)MessageExt viewMessage(String topic,String msgId):根据主题与消息ID查找消息。
8)List fetchPublishMessageQueues(final String topic):查找该主题下所有的消息队列。
9)SendResult send(Message msg):同步发送消息,具体发送到主题中的哪个消息队列由负载算法决定。
10)SendResult send(Message msg, final long timeout):同步发送消息,如果发送超过timeout则抛出超时异常。
11)void send(Message msg, SendCallback sendCallback):异步发送消息,sendCallback参数是消息发送成功后的回调方法。
12)void send(Message msg, SendCallback sendCallback, long timeout):异步发送消息,如果发送超过timeout则抛出超时异常。
13)void sendOneway(Message msg):单向消息发送,即不在乎发送结果,消息发送出去后该方法立即返回。
14)SendResult send(Message msg, MessageQueue mq, final long timeout):同步方式发送消息,且发送到指定的消息队列。
15)void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout):异步方式发送消息,且发送到指定的消息队列。
16)void sendOneway(Message msg, MessageQueue Selector selector, Object arg):单向方式发送消息,且发送到指定的消息队列。
17)SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg):消息发送,指定消息选择算法,覆盖消息生产者默认的消息队列负载。
18)SendResult send(final Collection msgs):同步批量消息发送。
代码清单3-2 DefaultMQProducer的核心属性
private String producerGroup; private String createTopicKey = MixAll.DEFAULT_TOPIC; private volatile int defaultTopicQueueNums = 4; private int sendMsgTimeout = 3000; private int compressMsgBodyOverHowmuch = 1024 * 4; private int retryTimesWhenSendFailed = 2; private int retryTimesWhenSendAsyncFailed = 2; private boolean retryAnotherBrokerWhenNotStoreOK = false; private int maxMessageSize = 1024 * 1024 * 4; // 4M
1)producerGroup:生产者所属组,消息服务器在回查事务状态时会随机选择该组中任何一个生产者发起的事务回查请求。
2)createTopicKey:默认topicKey。
3)defaultTopicQueueNums:默认主题在每一个Broker队列的数量。
4)sendMsgTimeout:发送消息的超时时间,默认为3s。
5)compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认为4KB。
6)retryTimesWhenSendFailed:同步方式发送消息重试次数,默认为2,总共执行3次。
7)retryTimesWhenSendAsyncFailed:异步方式发送消息的重试次数,默认为2。
8)retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个Broker,是否不等待存储结果就返回,默认为false。
9)maxMessageSize:允许发送的最大消息长度,默认为4MB,最大值为232-1。
3.3.2 消息生产者启动流程
消息生产者是如何一步一步启动的呢?我们可以从DefaultMQProducerImpl的start方法来跟踪,具体细节如代码清单3-3所示。
代码清单3-3 DefaultMQProducerImpl#start
this.checkConfig(); if (!this.defaultMQProducer.getProducerGroup().equals( MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); }
第一步:检查producerGroup是否符合要求,改变生产者的instanceName为进程ID,如代码清单3-4和代码清单3-5所示。
代码清单3-4 DefaultMQProducerImpl#start
this.mQClientFactory = MQClientManager.getInstance(). getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
代码清单3-5 MQClientManager#getAndCreateMQClientInstance
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this.factoryTable.get(clientId); if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } return instance; }
第二步:创建MQClientInstance实例。整个JVM实例中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =new ConcurrentHashMap<String, MQClientInstance>(),即同一个clientId只会创建一个MQClientInstance实例。创建clientId的方法如代码清单3-6所示。
代码清单3-6 ClientConfig#buildMQClientId
public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); sb.append("@"); sb.append(this.getInstanceName()); if (!UtilAll.isBlank(this.unitName)) { sb.append("@"); sb.append(this.unitName); } return sb.toString(); }
clientId为客户端IP+instance+unitname(可选),如果在同一台物理服务器部署两个应用程序,应用程序的clientId岂不是相同,这样是不是会造成混乱?
为了避免出现这个问题,如果instance为默认值DEFAULT,RocketMQ会自动将instance设置为进程ID,这样就避免了不同进程相互影响,但同一个JVM中相同clientId的消费者和生产者在启动时获取的MQClientInstane实例都是同一个,如代码清单3-7所示。MQClientInstance封装了RocketMQ的网络处理API,是消息生产者、消息消费者与NameServer、Broker打交道的网络通道。
代码清单3-7 DefaultMQProducerImpl#start
boolean registerOK = mQClientFactory.registerProducer (this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); }
第三步:向MQClientInstance注册服务,将当前生产者加入MQClientInstance管理,方便后续调用网络请求、进行心跳检测等。
第四步:启动MQClientInstance,如果MQClientInstance已经启动,则本次启动不会真正执行。MQClientInstance启动过程将在第5章讲解消息消费时详细介绍。