diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 1f72e27f673..7bfc618b922 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -47,11 +47,13 @@ import org.apache.rocketmq.broker.filtersrv.FilterServerManager; import org.apache.rocketmq.broker.latency.BrokerFastFailure; import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor; +import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService; import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener; import org.apache.rocketmq.broker.longpolling.PullRequestHoldService; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; import org.apache.rocketmq.broker.mqtrace.SendMessageHook; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; +import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager; import org.apache.rocketmq.broker.out.BrokerOuterAPI; import org.apache.rocketmq.broker.plugin.MessageStoreFactory; import org.apache.rocketmq.broker.plugin.MessageStorePluginContext; @@ -64,7 +66,9 @@ import org.apache.rocketmq.broker.processor.ReplyMessageProcessor; import org.apache.rocketmq.broker.processor.SendMessageProcessor; import org.apache.rocketmq.broker.slave.SlaveSynchronize; +import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; +import org.apache.rocketmq.broker.topic.LmqTopicConfigManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener; import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService; @@ -106,6 +110,7 @@ import org.apache.rocketmq.store.dledger.DLedgerCommitLog; import org.apache.rocketmq.store.stats.BrokerStats; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.apache.rocketmq.store.stats.LmqBrokerStatsManager; public class BrokerController { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -180,10 +185,10 @@ public BrokerController( this.nettyServerConfig = nettyServerConfig; this.nettyClientConfig = nettyClientConfig; this.messageStoreConfig = messageStoreConfig; - this.consumerOffsetManager = new ConsumerOffsetManager(this); - this.topicConfigManager = new TopicConfigManager(this); + this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this); + this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this); this.pullMessageProcessor = new PullMessageProcessor(this); - this.pullRequestHoldService = new PullRequestHoldService(this); + this.pullRequestHoldService = messageStoreConfig.isEnableLmq() ? new LmqPullRequestHoldService(this) : new PullRequestHoldService(this); this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService); this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener); @@ -191,7 +196,7 @@ public BrokerController( this.producerManager = new ProducerManager(); this.clientHousekeepingService = new ClientHousekeepingService(this); this.broker2Client = new Broker2Client(this); - this.subscriptionGroupManager = new SubscriptionGroupManager(this); + this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this); this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig); this.filterServerManager = new FilterServerManager(this); @@ -207,7 +212,8 @@ public BrokerController( this.heartbeatThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity()); this.endTransactionThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getEndTransactionPoolQueueCapacity()); - this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()); + this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()); + this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); this.brokerFastFailure = new BrokerFastFailure(this); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java index 42c8da9f3fb..321c800b7ce 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java @@ -39,6 +39,10 @@ public static String getConsumerOffsetPath(final String rootDir) { return rootDir + File.separator + "config" + File.separator + "consumerOffset.json"; } + public static String getLmqConsumerOffsetPath(final String rootDir) { + return rootDir + File.separator + "config" + File.separator + "lmqConsumerOffset.json"; + } + public static String getSubscriptionGroupPath(final String rootDir) { return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json"; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/LmqPullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/LmqPullRequestHoldService.java new file mode 100644 index 00000000000..42b44b6ab37 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/LmqPullRequestHoldService.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.longpolling; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; + + +public class LmqPullRequestHoldService extends PullRequestHoldService { + private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + public LmqPullRequestHoldService(BrokerController brokerController) { + super(brokerController); + } + + @Override + public String getServiceName() { + return LmqPullRequestHoldService.class.getSimpleName(); + } + + @Override + public void checkHoldRequest() { + for (String key : pullRequestTable.keySet()) { + int idx = key.lastIndexOf(TOPIC_QUEUEID_SEPARATOR); + if (idx <= 0 || idx >= key.length() - 1) { + pullRequestTable.remove(key); + continue; + } + String topic = key.substring(0, idx); + int queueId = Integer.parseInt(key.substring(idx + 1)); + final long offset = brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); + try { + this.notifyMessageArriving(topic, queueId, offset); + } catch (Throwable e) { + LOGGER.error("check hold request failed. topic={}, queueId={}", topic, queueId, e); + } + if (MixAll.isLmq(topic)) { + ManyPullRequest mpr = pullRequestTable.get(key); + if (mpr == null || mpr.getPullRequestList() == null || mpr.getPullRequestList().isEmpty()) { + pullRequestTable.remove(key); + } + } + } + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java index d956c223c0e..170dae2939c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java @@ -39,4 +39,8 @@ public synchronized List cloneListAndClear() { return null; } + + public ArrayList getPullRequestList() { + return pullRequestList; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java index 866f357d897..85ca9f73b4c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java @@ -31,10 +31,10 @@ public class PullRequestHoldService extends ServiceThread { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private static final String TOPIC_QUEUEID_SEPARATOR = "@"; - private final BrokerController brokerController; + protected static final String TOPIC_QUEUEID_SEPARATOR = "@"; + protected final BrokerController brokerController; private final SystemClock systemClock = new SystemClock(); - private ConcurrentMap pullRequestTable = + protected ConcurrentMap pullRequestTable = new ConcurrentHashMap(1024); public PullRequestHoldService(final BrokerController brokerController) { @@ -93,7 +93,7 @@ public String getServiceName() { return PullRequestHoldService.class.getSimpleName(); } - private void checkHoldRequest() { + protected void checkHoldRequest() { for (String key : this.pullRequestTable.keySet()) { String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); if (2 == kArray.length) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index bd057587597..f09522a7bd7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -35,12 +35,12 @@ public class ConsumerOffsetManager extends ConfigManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private static final String TOPIC_GROUP_SEPARATOR = "@"; + protected static final String TOPIC_GROUP_SEPARATOR = "@"; - private ConcurrentMap> offsetTable = + protected ConcurrentMap> offsetTable = new ConcurrentHashMap>(512); - private transient BrokerController brokerController; + protected transient BrokerController brokerController; public ConsumerOffsetManager() { } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java new file mode 100644 index 00000000000..7e5d77425ab --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.offset; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.BrokerPathConfigHelper; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class LmqConsumerOffsetManager extends ConsumerOffsetManager { + private ConcurrentHashMap lmqOffsetTable = new ConcurrentHashMap<>(512); + + public LmqConsumerOffsetManager(BrokerController brokerController) { + super(brokerController); + } + + @Override + public long queryOffset(final String group, final String topic, final int queueId) { + if (!MixAll.isLmq(group)) { + return super.queryOffset(group, topic, queueId); + } + // topic@group + String key = topic + TOPIC_GROUP_SEPARATOR + group; + Long offset = lmqOffsetTable.get(key); + if (offset != null) { + return offset; + } + return -1; + } + + @Override + public Map queryOffset(final String group, final String topic) { + if (!MixAll.isLmq(group)) { + return super.queryOffset(group, topic); + } + Map map = new HashMap<>(); + // topic@group + String key = topic + TOPIC_GROUP_SEPARATOR + group; + Long offset = lmqOffsetTable.get(key); + if (offset != null) { + map.put(0, offset); + } + return map; + } + + @Override + public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, + final long offset) { + if (!MixAll.isLmq(group)) { + super.commitOffset(clientHost, group, topic, queueId, offset); + return; + } + // topic@group + String key = topic + TOPIC_GROUP_SEPARATOR + group; + lmqOffsetTable.put(key, offset); + } + + @Override + public String encode() { + return this.encode(false); + } + + @Override + public String configFilePath() { + return BrokerPathConfigHelper.getLmqConsumerOffsetPath(brokerController.getMessageStoreConfig().getStorePathRootDir()); + } + + @Override + public void decode(String jsonString) { + if (jsonString != null) { + LmqConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, LmqConsumerOffsetManager.class); + if (obj != null) { + super.offsetTable = obj.offsetTable; + this.lmqOffsetTable = obj.lmqOffsetTable; + } + } + } + + @Override + public String encode(final boolean prettyFormat) { + return RemotingSerializable.toJson(this, prettyFormat); + } + + public ConcurrentHashMap getLmqOffsetTable() { + return lmqOffsetTable; + } + + public void setLmqOffsetTable(ConcurrentHashMap lmqOffsetTable) { + this.lmqOffsetTable = lmqOffsetTable; + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java index b95bab62490..62fd1c5d5e4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -1,267 +1,272 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.broker.plugin; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageExtBatch; -import org.apache.rocketmq.store.CommitLogDispatcher; -import org.apache.rocketmq.store.ConsumeQueue; -import org.apache.rocketmq.store.GetMessageResult; -import org.apache.rocketmq.store.MessageExtBrokerInner; -import org.apache.rocketmq.store.MessageFilter; -import org.apache.rocketmq.store.MessageStore; -import org.apache.rocketmq.store.PutMessageResult; -import org.apache.rocketmq.store.QueryMessageResult; -import org.apache.rocketmq.store.SelectMappedBufferResult; -import org.apache.rocketmq.store.stats.BrokerStatsManager; - -public abstract class AbstractPluginMessageStore implements MessageStore { - protected MessageStore next = null; - protected MessageStorePluginContext context; - - public AbstractPluginMessageStore(MessageStorePluginContext context, MessageStore next) { - this.next = next; - this.context = context; - } - - @Override - public long getEarliestMessageTime() { - return next.getEarliestMessageTime(); - } - - @Override - public long lockTimeMills() { - return next.lockTimeMills(); - } - - @Override - public boolean isOSPageCacheBusy() { - return next.isOSPageCacheBusy(); - } - - @Override - public boolean isTransientStorePoolDeficient() { - return next.isTransientStorePoolDeficient(); - } - - @Override - public boolean load() { - return next.load(); - } - - @Override - public void start() throws Exception { - next.start(); - } - - @Override - public void shutdown() { - next.shutdown(); - } - - @Override - public void destroy() { - next.destroy(); - } - - @Override - public PutMessageResult putMessage(MessageExtBrokerInner msg) { - return next.putMessage(msg); - } - - @Override - public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) { - return next.asyncPutMessage(msg); - } - - @Override - public CompletableFuture asyncPutMessages(MessageExtBatch messageExtBatch) { - return next.asyncPutMessages(messageExtBatch); - } - - @Override - public GetMessageResult getMessage(String group, String topic, int queueId, long offset, - int maxMsgNums, final MessageFilter messageFilter) { - return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter); - } - - @Override - public long getMaxOffsetInQueue(String topic, int queueId) { - return next.getMaxOffsetInQueue(topic, queueId); - } - - @Override - public long getMinOffsetInQueue(String topic, int queueId) { - return next.getMinOffsetInQueue(topic, queueId); - } - - @Override - public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) { - return next.getCommitLogOffsetInQueue(topic, queueId, consumeQueueOffset); - } - - @Override - public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) { - return next.getOffsetInQueueByTime(topic, queueId, timestamp); - } - - @Override - public MessageExt lookMessageByOffset(long commitLogOffset) { - return next.lookMessageByOffset(commitLogOffset); - } - - @Override - public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) { - return next.selectOneMessageByOffset(commitLogOffset); - } - - @Override - public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) { - return next.selectOneMessageByOffset(commitLogOffset, msgSize); - } - - @Override - public String getRunningDataInfo() { - return next.getRunningDataInfo(); - } - - @Override - public HashMap getRuntimeInfo() { - return next.getRuntimeInfo(); - } - - @Override - public long getMaxPhyOffset() { - return next.getMaxPhyOffset(); - } - - @Override - public long getMinPhyOffset() { - return next.getMinPhyOffset(); - } - - @Override - public long getEarliestMessageTime(String topic, int queueId) { - return next.getEarliestMessageTime(topic, queueId); - } - - @Override - public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) { - return next.getMessageStoreTimeStamp(topic, queueId, consumeQueueOffset); - } - - @Override - public long getMessageTotalInQueue(String topic, int queueId) { - return next.getMessageTotalInQueue(topic, queueId); - } - - @Override - public SelectMappedBufferResult getCommitLogData(long offset) { - return next.getCommitLogData(offset); - } - - @Override - public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) { - return next.appendToCommitLog(startOffset, data, dataStart, dataLength); - } - - @Override - public void executeDeleteFilesManually() { - next.executeDeleteFilesManually(); - } - - @Override - public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, - long end) { - return next.queryMessage(topic, key, maxNum, begin, end); - } - - @Override - public void updateHaMasterAddress(String newAddr) { - next.updateHaMasterAddress(newAddr); - } - - @Override - public long slaveFallBehindMuch() { - return next.slaveFallBehindMuch(); - } - - @Override - public long now() { - return next.now(); - } - - @Override - public int cleanUnusedTopic(Set topics) { - return next.cleanUnusedTopic(topics); - } - - @Override - public void cleanExpiredConsumerQueue() { - next.cleanExpiredConsumerQueue(); - } - - @Override - public boolean checkInDiskByConsumeOffset(String topic, int queueId, long consumeOffset) { - return next.checkInDiskByConsumeOffset(topic, queueId, consumeOffset); - } - - @Override - public long dispatchBehindBytes() { - return next.dispatchBehindBytes(); - } - - @Override - public long flush() { - return next.flush(); - } - - @Override - public boolean resetWriteOffset(long phyOffset) { - return next.resetWriteOffset(phyOffset); - } - - @Override - public long getConfirmOffset() { - return next.getConfirmOffset(); - } - - @Override - public void setConfirmOffset(long phyOffset) { - next.setConfirmOffset(phyOffset); - } - - @Override - public LinkedList getDispatcherList() { - return next.getDispatcherList(); - } - - @Override - public ConsumeQueue getConsumeQueue(String topic, int queueId) { - return next.getConsumeQueue(topic, queueId); - } - - @Override - public BrokerStatsManager getBrokerStatsManager() { - return next.getBrokerStatsManager(); - }; -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.plugin; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.store.CommitLogDispatcher; +import org.apache.rocketmq.store.ConsumeQueue; +import org.apache.rocketmq.store.GetMessageResult; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.MessageFilter; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.QueryMessageResult; +import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.apache.rocketmq.store.stats.BrokerStatsManager; + +public abstract class AbstractPluginMessageStore implements MessageStore { + protected MessageStore next = null; + protected MessageStorePluginContext context; + + public AbstractPluginMessageStore(MessageStorePluginContext context, MessageStore next) { + this.next = next; + this.context = context; + } + + @Override + public long getEarliestMessageTime() { + return next.getEarliestMessageTime(); + } + + @Override + public long lockTimeMills() { + return next.lockTimeMills(); + } + + @Override + public boolean isOSPageCacheBusy() { + return next.isOSPageCacheBusy(); + } + + @Override + public boolean isTransientStorePoolDeficient() { + return next.isTransientStorePoolDeficient(); + } + + @Override + public boolean load() { + return next.load(); + } + + @Override + public void start() throws Exception { + next.start(); + } + + @Override + public void shutdown() { + next.shutdown(); + } + + @Override + public void destroy() { + next.destroy(); + } + + @Override + public PutMessageResult putMessage(MessageExtBrokerInner msg) { + return next.putMessage(msg); + } + + @Override + public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) { + return next.asyncPutMessage(msg); + } + + @Override + public CompletableFuture asyncPutMessages(MessageExtBatch messageExtBatch) { + return next.asyncPutMessages(messageExtBatch); + } + + @Override + public GetMessageResult getMessage(String group, String topic, int queueId, long offset, + int maxMsgNums, final MessageFilter messageFilter) { + return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter); + } + + @Override + public long getMaxOffsetInQueue(String topic, int queueId) { + return next.getMaxOffsetInQueue(topic, queueId); + } + + @Override + public long getMinOffsetInQueue(String topic, int queueId) { + return next.getMinOffsetInQueue(topic, queueId); + } + + @Override + public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) { + return next.getCommitLogOffsetInQueue(topic, queueId, consumeQueueOffset); + } + + @Override + public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) { + return next.getOffsetInQueueByTime(topic, queueId, timestamp); + } + + @Override + public MessageExt lookMessageByOffset(long commitLogOffset) { + return next.lookMessageByOffset(commitLogOffset); + } + + @Override + public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) { + return next.selectOneMessageByOffset(commitLogOffset); + } + + @Override + public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) { + return next.selectOneMessageByOffset(commitLogOffset, msgSize); + } + + @Override + public String getRunningDataInfo() { + return next.getRunningDataInfo(); + } + + @Override + public HashMap getRuntimeInfo() { + return next.getRuntimeInfo(); + } + + @Override + public long getMaxPhyOffset() { + return next.getMaxPhyOffset(); + } + + @Override + public long getMinPhyOffset() { + return next.getMinPhyOffset(); + } + + @Override + public long getEarliestMessageTime(String topic, int queueId) { + return next.getEarliestMessageTime(topic, queueId); + } + + @Override + public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) { + return next.getMessageStoreTimeStamp(topic, queueId, consumeQueueOffset); + } + + @Override + public long getMessageTotalInQueue(String topic, int queueId) { + return next.getMessageTotalInQueue(topic, queueId); + } + + @Override + public SelectMappedBufferResult getCommitLogData(long offset) { + return next.getCommitLogData(offset); + } + + @Override + public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) { + return next.appendToCommitLog(startOffset, data, dataStart, dataLength); + } + + @Override + public void executeDeleteFilesManually() { + next.executeDeleteFilesManually(); + } + + @Override + public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, + long end) { + return next.queryMessage(topic, key, maxNum, begin, end); + } + + @Override + public void updateHaMasterAddress(String newAddr) { + next.updateHaMasterAddress(newAddr); + } + + @Override + public long slaveFallBehindMuch() { + return next.slaveFallBehindMuch(); + } + + @Override + public long now() { + return next.now(); + } + + @Override + public int cleanUnusedTopic(Set topics) { + return next.cleanUnusedTopic(topics); + } + + @Override + public void cleanExpiredConsumerQueue() { + next.cleanExpiredConsumerQueue(); + } + + @Override + public boolean checkInDiskByConsumeOffset(String topic, int queueId, long consumeOffset) { + return next.checkInDiskByConsumeOffset(topic, queueId, consumeOffset); + } + + @Override + public long dispatchBehindBytes() { + return next.dispatchBehindBytes(); + } + + @Override + public long flush() { + return next.flush(); + } + + @Override + public boolean resetWriteOffset(long phyOffset) { + return next.resetWriteOffset(phyOffset); + } + + @Override + public long getConfirmOffset() { + return next.getConfirmOffset(); + } + + @Override + public void setConfirmOffset(long phyOffset) { + next.setConfirmOffset(phyOffset); + } + + @Override + public LinkedList getDispatcherList() { + return next.getDispatcherList(); + } + + @Override + public ConsumeQueue getConsumeQueue(String topic, int queueId) { + return next.getConsumeQueue(topic, queueId); + } + + @Override + public BrokerStatsManager getBrokerStatsManager() { + return next.getBrokerStatsManager(); + } + + @Override + public void cleanUnusedLmqTopic(String topic) { + next.cleanUnusedLmqTopic(topic); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 86aab63479c..d7d7b63bbf4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -292,6 +292,13 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, return response; } + if (MixAll.isLmq(topic)) { + this.brokerController.getMessageStore().cleanUnusedLmqTopic(topic); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + this.brokerController.getTopicConfigManager().deleteTopicConfig(topic); this.brokerController.getMessageStore() .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index f925364e489..c8ea4d3b5a9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -514,6 +514,10 @@ private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); break; + case LMQ_CONSUME_QUEUE_NUM_EXCEEDED: + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("[LMQ_CONSUME_QUEUE_NUM_EXCEEDED]broker config enableLmq and enableMultiDispatch, lmq consumeQueue num exceed maxLmqConsumeQueueNum config num, default limit 2w."); + break; case UNKNOWN_ERROR: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UNKNOWN_ERROR"); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/LmqSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/LmqSubscriptionGroupManager.java new file mode 100644 index 00000000000..635b935b823 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/LmqSubscriptionGroupManager.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.subscription; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; + +public class LmqSubscriptionGroupManager extends SubscriptionGroupManager { + + public LmqSubscriptionGroupManager(BrokerController brokerController) { + super(brokerController); + } + + @Override + public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) { + if (MixAll.isLmq(group)) { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(group); + return subscriptionGroupConfig; + } + return super.findSubscriptionGroupConfig(group); + } + + @Override + public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) { + if (config == null || MixAll.isLmq(config.getGroupName())) { + return; + } + super.updateSubscriptionGroupConfig(config); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/LmqTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/LmqTopicConfigManager.java new file mode 100644 index 00000000000..d021758b2fd --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/LmqTopicConfigManager.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.topic; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.constant.PermName; + +public class LmqTopicConfigManager extends TopicConfigManager { + public LmqTopicConfigManager(BrokerController brokerController) { + super(brokerController); + } + + @Override + public TopicConfig selectTopicConfig(final String topic) { + if (MixAll.isLmq(topic)) { + return simpleLmqTopicConfig(topic); + } + return super.selectTopicConfig(topic); + } + + @Override + public void updateTopicConfig(final TopicConfig topicConfig) { + if (topicConfig == null || MixAll.isLmq(topicConfig.getTopicName())) { + return; + } + super.updateTopicConfig(topicConfig); + } + + private TopicConfig simpleLmqTopicConfig(String topic) { + return new TopicConfig(topic, 1, 1, PermName.PERM_READ | PermName.PERM_WRITE); + } + +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerPathConfigHelperTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerPathConfigHelperTest.java new file mode 100644 index 00000000000..01e7c365928 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerPathConfigHelperTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class BrokerPathConfigHelperTest { + + @Test + public void testGetLmqConsumerOffsetPath() { + String lmqConsumerOffsetPath = BrokerPathConfigHelper.getLmqConsumerOffsetPath("/home/admin/store"); + assertEquals("/home/admin/store/config/lmqConsumerOffset.json", lmqConsumerOffsetPath); + + + String consumerOffsetPath = BrokerPathConfigHelper.getConsumerOffsetPath("/home/admin/store"); + assertEquals("/home/admin/store/config/consumerOffset.json", consumerOffsetPath); + + String topicConfigPath = BrokerPathConfigHelper.getTopicConfigPath("/home/admin/store"); + assertEquals("/home/admin/store/config/topics.json", topicConfigPath); + + String subscriptionGroupPath = BrokerPathConfigHelper.getSubscriptionGroupPath("/home/admin/store"); + assertEquals("/home/admin/store/config/subscriptionGroup.json", subscriptionGroupPath); + + } +} \ No newline at end of file diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManagerTest.java new file mode 100644 index 00000000000..6ec20c618df --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManagerTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.offset; + +import java.io.File; +import java.util.Map; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager; +import org.apache.rocketmq.broker.topic.LmqTopicConfigManager; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; +import org.junit.Test; +import org.mockito.Spy; + +import static org.assertj.core.api.Assertions.assertThat; + +public class LmqConsumerOffsetManagerTest { + + @Spy + private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), + new NettyClientConfig(), new MessageStoreConfig()); + + @Test + public void testOffsetManage() { + LmqConsumerOffsetManager lmqConsumerOffsetManager = new LmqConsumerOffsetManager(brokerController); + LmqTopicConfigManager lmqTopicConfigManager = new LmqTopicConfigManager(brokerController); + LmqSubscriptionGroupManager lmqSubscriptionGroupManager = new LmqSubscriptionGroupManager(brokerController); + + String lmqTopicName = "%LMQ%1111"; + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(lmqTopicName); + lmqTopicConfigManager.updateTopicConfig(topicConfig); + TopicConfig topicConfig1 = lmqTopicConfigManager.selectTopicConfig(lmqTopicName); + assertThat(topicConfig1.getTopicName()).isEqualTo(topicConfig.getTopicName()); + + String lmqGroupName = "%LMQ%GID_test"; + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(lmqGroupName); + lmqSubscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig); + SubscriptionGroupConfig subscriptionGroupConfig1 = lmqSubscriptionGroupManager.findSubscriptionGroupConfig( + lmqGroupName); + assertThat(subscriptionGroupConfig1.getGroupName()).isEqualTo(subscriptionGroupConfig.getGroupName()); + + lmqConsumerOffsetManager.commitOffset("127.0.0.1", lmqGroupName, lmqTopicName, 0, 10L); + Map integerLongMap = lmqConsumerOffsetManager.queryOffset(lmqGroupName, lmqTopicName); + assertThat(integerLongMap.get(0)).isEqualTo(10L); + long offset = lmqConsumerOffsetManager.queryOffset(lmqGroupName, lmqTopicName, 0); + assertThat(offset).isEqualTo(10L); + + long offset1 = lmqConsumerOffsetManager.queryOffset(lmqGroupName, lmqTopicName + "test", 0); + assertThat(offset1).isEqualTo(-1L); + } + + @After + public void destroy() { + UtilAll.deleteFile(new File(new MessageStoreConfig().getStorePathRootDir())); + } + +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index ec1e1f0245f..c2300d362c8 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -83,6 +83,8 @@ public class MixAll { public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS"; public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml"; public static final String REPLY_MESSAGE_FLAG = "reply"; + public static final String LMQ_PREFIX = "%LMQ%"; + public static final String MULTI_DISPATCH_QUEUE_SPLITTER = ","; private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); public static String getWSAddr() { @@ -444,4 +446,7 @@ public static String humanReadableByteCount(long bytes, boolean si) { return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre); } + public static boolean isLmq(String lmqMetaData) { + return lmqMetaData != null && lmqMetaData.startsWith(LMQ_PREFIX); + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java index 5bdc846562d..ba9b7443cbb 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java @@ -52,6 +52,8 @@ public class MessageConst { public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME"; public static final String PROPERTY_CLUSTER = "CLUSTER"; public static final String PROPERTY_MESSAGE_TYPE = "MSG_TYPE"; + public static final String PROPERTY_INNER_MULTI_DISPATCH = "INNER_MULTI_DISPATCH"; + public static final String PROPERTY_INNER_MULTI_QUEUE_OFFSET = "INNER_MULTI_QUEUE_OFFSET"; public static final String KEY_SEPARATOR = " "; @@ -88,5 +90,6 @@ public class MessageConst { STRING_HASH_SET.add(PROPERTY_PUSH_REPLY_TIME); STRING_HASH_SET.add(PROPERTY_CLUSTER); STRING_HASH_SET.add(PROPERTY_MESSAGE_TYPE); + STRING_HASH_SET.add(PROPERTY_INNER_MULTI_QUEUE_OFFSET); } } diff --git a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java index 8d86544be69..4f2a341553e 100644 --- a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java @@ -95,4 +95,16 @@ public void testGetLocalhostByNetworkInterface() throws Exception { assertThat(MixAll.LOCALHOST).isNotNull(); assertThat(MixAll.getLocalhostByNetworkInterface()).isNotNull(); } + + @Test + public void testIsLmq() { + String testLmq = null; + assertThat(MixAll.isLmq(testLmq)).isFalse(); + testLmq = "lmq"; + assertThat(MixAll.isLmq(testLmq)).isFalse(); + testLmq = "%LMQ%queue123"; + assertThat(MixAll.isLmq(testLmq)).isTrue(); + testLmq = "%LMQ%GID_TEST"; + assertThat(MixAll.isLmq(testLmq)).isTrue(); + } } diff --git a/docs/cn/Example_LMQ.md b/docs/cn/Example_LMQ.md new file mode 100644 index 00000000000..85a3db5005c --- /dev/null +++ b/docs/cn/Example_LMQ.md @@ -0,0 +1,85 @@ +# Light message queue (LMQ) +LMQ采用的读放大的策略,写一份数据,多个LMQ队列分发, +因为存储的成本和效率对用户的体感最明显。写多份不仅加大了存储成本,同时也对性能和数据准确一致性提出了挑战。 + +![](image/LMQ_1.png) + +上图描述的是LMQ的队列存储模型,消息可以来自各个接入场景 +(如服务端的MQ/AMQP,客户端的MQTT),但只会写一份存到commitlog里面,然后分发出多个需求场景的队列索引(ConsumerQueue),如服务端场景(MQ/AMQP)可以按照一级Topic队列进行传统的服务端消费,客户端MQTT场景可以按照MQTT多级Topic(也即 LMQ)进行消费消息。 + +## 一、broker启动配置 + + +broker.conf文件需要增加以下的配置项,开启LMQ开关,这样就可以识别LMQ相关属性的消息,进行原子分发消息到LMQ队列 +```properties +enableLmq = true +enableMultiDispatch = true +``` +## 二、发送消息 +发送消息的时候通过设置 INNER_MULTI_DISPATCH 属性,LMQ queue使用逗号分割,queue前缀必须是 %LMQ%,这样broker就可以识别LMQ queue. +以下代码只是demo伪代码 具体逻辑参照执行即可 +```java +DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); +producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); +producer.start(); + + +/* +* Create a message instance, specifying topic, tag and message body. +*/ +Message msg = new Message("TopicTest" /* Topic */, + "TagA" /* Tag */, + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ + ); +/* +* INNER_MULTI_DISPATCH property and PREFIX must start as "%LMQ%", +* If it is multiple LMQ, need to use “,” split +*/ +message.putUserProperty("INNER_MULTI_DISPATCH", "%LMQ%123,%LMQ%456"); +/* +* Call send message to deliver message to one of brokers. +*/ +SendResult sendResult = producer.send(msg); +``` +## 三、拉取消息 +LMQ queue在每个broker上只有一个queue,也即queueId为0, 指明轻量级的MessageQueue,就可以拉取消息进行消费。 +以下代码只是demo伪代码 具体逻辑参照执行即可 +```java +DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(); +defaultMQPullConsumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); +defaultMQPullConsumer.setVipChannelEnabled(false); +defaultMQPullConsumer.setConsumerGroup("CID_RMQ_SYS_LMQ_TEST"); +defaultMQPullConsumer.setInstanceName("CID_RMQ_SYS_LMQ_TEST"); +defaultMQPullConsumer.setRegisterTopics(new HashSet<>(Arrays.asList("TopicTest"))); +defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(2000); +defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(3000); +defaultMQPullConsumer.start(); + +String brokerName = "set broker Name"; +MessageQueue mq = new MessageQueue("%LMQ%123", brokerName, 0); +defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer("TopicTest"); + +Thread.sleep(30000); +Long offset = defaultMQPullConsumer.maxOffset(mq); + +defaultMQPullConsumer.pullBlockIfNotFound( + mq, "*", offset, 32, + new PullCallback() { + @Override + public void onSuccess(PullResult pullResult) { + List list = pullResult.getMsgFoundList(); + if (list == null || list.isEmpty()) { + return; + } + for (MessageExt messageExt : list) { + System.out.println(messageExt); + } + } + @Override + public void onException(Throwable e) { + + } +}); +``` +​ + diff --git a/docs/cn/image/LMQ_1.png b/docs/cn/image/LMQ_1.png new file mode 100644 index 00000000000..3afd0885f16 Binary files /dev/null and b/docs/cn/image/LMQ_1.png differ diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 05ea526c99e..73fd361a32d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; import org.apache.rocketmq.common.ServiceThread; @@ -68,6 +69,7 @@ public class CommitLog { private final AppendMessageCallback appendMessageCallback; private final ThreadLocal putMessageThreadLocal; protected HashMap topicQueueTable = new HashMap(1024); + protected Map lmqTopicQueueTable = new ConcurrentHashMap<>(1024); protected volatile long confirmOffset = -1L; private volatile long beginTimeInLock = 0; @@ -76,6 +78,8 @@ public class CommitLog { private volatile Set fullStorePaths = Collections.emptySet(); + private final MultiDispatch multiDispatch; + public CommitLog(final DefaultMessageStore defaultMessageStore) { String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(); if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { @@ -107,6 +111,8 @@ protected PutMessageThreadLocal initialValue() { }; this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); + this.multiDispatch = new MultiDispatch(defaultMessageStore, this); + } public void setFullStorePaths(Set fullStorePaths) { @@ -117,6 +123,10 @@ public Set getFullStorePaths() { return fullStorePaths; } + public ThreadLocal getPutMessageThreadLocal() { + return putMessageThreadLocal; + } + public boolean load() { boolean result = this.mappedFileQueue.load(); log.info("load commit log " + (result ? "OK" : "Failed")); @@ -964,6 +974,7 @@ public void removeQueueFromTopicQueueTable(final String topic, final int queueId String key = topic + "-" + queueId; synchronized (this) { this.topicQueueTable.remove(key); + this.lmqTopicQueueTable.remove(key); } log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId); @@ -987,6 +998,10 @@ public long lockTimeMills() { return diff; } + public Map getLmqTopicQueueTable() { + return this.lmqTopicQueueTable; + } + abstract class FlushCommitLogService extends ServiceThread { protected static final int RETRY_TIMES_OVER = 10; } @@ -1298,6 +1313,11 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer CommitLog.this.topicQueueTable.put(key, queueOffset); } + boolean multiDispatchWrapResult = CommitLog.this.multiDispatch.wrapMultiDispatch(msgInner); + if (!multiDispatchWrapResult) { + return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); + } + // Transaction messages that require special handling final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { @@ -1361,6 +1381,7 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // The next update ConsumeQueue information CommitLog.this.topicQueueTable.put(key, ++queueOffset); + CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner); break; default: break; @@ -1691,6 +1712,9 @@ private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { byteBuffer.limit(limit); } + public ByteBuffer getEncoderBuffer() { + return encoderBuffer; + } } static class PutMessageThreadLocal { diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 87ff0a096cf..fdc725db7f5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -19,7 +19,11 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; + +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.config.BrokerRole; @@ -376,7 +380,7 @@ public long getMinOffsetInQueue() { return this.minLogicOffset / CQ_STORE_UNIT_SIZE; } - public void putMessagePositionInfoWrapper(DispatchRequest request) { + public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) { final int maxRetries = 30; boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable(); for (int i = 0; i < maxRetries && canWrite; i++) { @@ -403,6 +407,9 @@ public void putMessagePositionInfoWrapper(DispatchRequest request) { this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp()); } this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp()); + if (multiQueue) { + multiDispatchLmqQueue(request, maxRetries); + } return; } else { // XXX: warn and notify me @@ -422,6 +429,52 @@ public void putMessagePositionInfoWrapper(DispatchRequest request) { this.defaultMessageStore.getRunningFlags().makeLogicsQueueError(); } + private void multiDispatchLmqQueue(DispatchRequest request, int maxRetries) { + Map prop = request.getPropertiesMap(); + String multiDispatchQueue = prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); + String multiQueueOffset = prop.get(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET); + String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + String[] queueOffsets = multiQueueOffset.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + if (queues.length != queueOffsets.length) { + log.error("[bug] queues.length!=queueOffsets.length ", request.getTopic()); + return; + } + for (int i = 0; i < queues.length; i++) { + String queueName = queues[i]; + long queueOffset = Long.parseLong(queueOffsets[i]); + int queueId = request.getQueueId(); + if (this.defaultMessageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) { + queueId = 0; + } + doDispatchLmqQueue(request, maxRetries, queueName, queueOffset, queueId); + + } + return; + } + + private void doDispatchLmqQueue(DispatchRequest request, int maxRetries, String queueName, long queueOffset, + int queueId) { + ConsumeQueue cq = this.defaultMessageStore.findConsumeQueue(queueName, queueId); + boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable(); + for (int i = 0; i < maxRetries && canWrite; i++) { + boolean result = cq.putMessagePositionInfo(request.getCommitLogOffset(), request.getMsgSize(), + request.getTagsCode(), + queueOffset); + if (result) { + break; + } else { + log.warn("[BUG]put commit log position info to " + queueName + ":" + queueId + " " + request.getCommitLogOffset() + + " failed, retry " + i + " times"); + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + log.warn("", e); + } + } + } + } + private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) { @@ -584,4 +637,5 @@ protected boolean isExtWriteEnable() { public boolean isExtAddr(long tagsCode) { return ConsumeQueueExt.isExtAddr(tagsCode); } + } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 1f24dc7771b..bfc9fd282b1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -39,7 +39,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceThread; @@ -47,6 +50,7 @@ import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; @@ -109,6 +113,8 @@ public class DefaultMessageStore implements MessageStore { private AtomicLong printTimes = new AtomicLong(0); + private final AtomicInteger lmqConsumeQueueNum = new AtomicInteger(0); + private final LinkedList dispatcherList; private RandomAccessFile lockFile; @@ -419,6 +425,23 @@ private PutMessageStatus checkStoreStatus() { return PutMessageStatus.PUT_OK; } + private PutMessageStatus checkLmqMessage(MessageExtBrokerInner msg) { + if (msg.getProperties() != null + && StringUtils.isNotBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) + && this.isLmqConsumeQueueNumExceeded()) { + return PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED; + } + return PutMessageStatus.PUT_OK; + } + + private boolean isLmqConsumeQueueNumExceeded() { + if (this.getMessageStoreConfig().isEnableLmq() && this.getMessageStoreConfig().isEnableMultiDispatch() + && this.lmqConsumeQueueNum.get() > this.messageStoreConfig.getMaxLmqConsumeQueueNum()) { + return true; + } + return false; + } + @Override public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) { PutMessageStatus checkStoreStatus = this.checkStoreStatus(); @@ -431,6 +454,12 @@ public CompletableFuture asyncPutMessage(MessageExtBrokerInner return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null)); } + PutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg); + if (msgCheckStatus == PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED) { + return CompletableFuture.completedFuture(new PutMessageResult(lmqMsgCheckStatus, null)); + } + + long beginTime = this.getSystemClock().now(); CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg); @@ -448,7 +477,6 @@ public CompletableFuture asyncPutMessage(MessageExtBrokerInner return putResultFuture; } - public CompletableFuture asyncPutMessages(MessageExtBatch messageExtBatch) { PutMessageStatus checkStoreStatus = this.checkStoreStatus(); if (checkStoreStatus != PutMessageStatus.PUT_OK) { @@ -532,6 +560,11 @@ public GetMessageResult getMessage(final String group, final String topic, final return null; } + if (MixAll.isLmq(topic) && this.isLmqConsumeQueueNumExceeded()) { + log.warn("message store is not available, broker config enableLmq and enableMultiDispatch, lmq consumeQueue num exceed maxLmqConsumeQueueNum config num"); + return null; + } + long beginTime = this.getSystemClock().now(); GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; @@ -1016,7 +1049,8 @@ public int cleanUnusedTopic(Set topics) { String topic = next.getKey(); if (!topics.contains(topic) && !topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC) - && !topic.equals(TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC)) { + && !topic.equals(TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC) + && !MixAll.isLmq(topic)) { ConcurrentMap queueTable = next.getValue(); for (ConsumeQueue cq : queueTable.values()) { cq.destroy(); @@ -1218,6 +1252,9 @@ public ConsumeQueue findConsumeQueue(String topic, int queueId) { if (oldLogic != null) { logic = oldLogic; } else { + if (MixAll.isLmq(topic)) { + lmqConsumeQueueNum.getAndIncrement(); + } logic = newLogic; } } @@ -1424,6 +1461,9 @@ private void putConsumeQueue(final String topic, final int queueId, final Consum map = new ConcurrentHashMap(); map.put(queueId, consumeQueue); this.consumeQueueTable.put(topic, map); + if (MixAll.isLmq(topic)) { + this.lmqConsumeQueueNum.getAndIncrement(); + } } else { map.put(queueId, consumeQueue); } @@ -1497,7 +1537,23 @@ public void doDispatch(DispatchRequest req) { public void putMessagePositionInfo(DispatchRequest dispatchRequest) { ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId()); - cq.putMessagePositionInfoWrapper(dispatchRequest); + cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest)); + } + + private boolean checkMultiDispatchQueue(DispatchRequest dispatchRequest) { + if (!this.messageStoreConfig.isEnableMultiDispatch()) { + return false; + } + Map prop = dispatchRequest.getPropertiesMap(); + if (prop == null && prop.isEmpty()) { + return false; + } + String multiDispatchQueue = prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); + String multiQueueOffset = prop.get(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET); + if (StringUtils.isBlank(multiDispatchQueue) || StringUtils.isBlank(multiQueueOffset)) { + return false; + } + return true; } @Override @@ -1517,6 +1573,29 @@ public void handleScheduleMessageService(final BrokerRole brokerRole) { } + @Override + public void cleanUnusedLmqTopic(String topic) { + if (this.consumeQueueTable.containsKey(topic)) { + ConcurrentMap map = this.consumeQueueTable.get(topic); + if (map != null) { + ConsumeQueue cq = map.get(0); + cq.destroy(); + log.info("cleanUnusedLmqTopic: {} {} ConsumeQueue cleaned", + cq.getTopic(), + cq.getQueueId() + ); + + this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId()); + this.lmqConsumeQueueNum.getAndDecrement(); + } + this.consumeQueueTable.remove(topic); + if (this.brokerConfig.isAutoDeleteUnusedStats()) { + this.brokerStatsManager.onTopicDeleted(topic); + } + log.info("cleanUnusedLmqTopic: {},topic destroyed", topic); + } + } + public int remainTransientStoreBufferNumbs() { return this.transientStorePool.availableBufferNums(); } @@ -1974,6 +2053,7 @@ private void doReput() { dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); + notifyMessageArrive4MultiQueue(dispatchRequest); } this.reputFromOffset += size; @@ -2016,6 +2096,34 @@ private void doReput() { } } + private void notifyMessageArrive4MultiQueue(DispatchRequest dispatchRequest) { + Map prop = dispatchRequest.getPropertiesMap(); + if (prop == null) { + return; + } + String multiDispatchQueue = prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); + String multiQueueOffset = prop.get(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET); + if (StringUtils.isBlank(multiDispatchQueue) || StringUtils.isBlank(multiQueueOffset)) { + return; + } + String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + String[] queueOffsets = multiQueueOffset.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + if (queues.length != queueOffsets.length) { + return; + } + for (int i = 0; i < queues.length; i++) { + String queueName = queues[i]; + long queueOffset = Long.parseLong(queueOffsets[i]); + int queueId = dispatchRequest.getQueueId(); + if (DefaultMessageStore.this.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) { + queueId = 0; + } + DefaultMessageStore.this.messageArrivingListener.arriving( + queueName, queueId, queueOffset + 1, dispatchRequest.getTagsCode(), + dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); + } + } + @Override public void run() { DefaultMessageStore.log.info(this.getServiceName() + " service started"); diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index a8c658bfe26..6771ede633a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -397,4 +397,14 @@ QueryMessageResult queryMessage(final String topic, final String key, final int * @param brokerRole */ void handleScheduleMessageService(BrokerRole brokerRole); + + /** + * Clean unused lmq topic. + * When calling to clean up the lmq topic, + * the lmq topic cannot be used to write messages at the same time, + * otherwise the messages of the cleaning lmq topic may be lost, + * please call this method with caution + * @param topic lmq topic + */ + void cleanUnusedLmqTopic(String topic); } diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java new file mode 100644 index 00000000000..679eed12344 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.store.CommitLog.MessageExtEncoder; + +/** + * not-thread-safe + */ +public class MultiDispatch { + private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); + private final StringBuilder keyBuilder = new StringBuilder(); + private final DefaultMessageStore messageStore; + private final CommitLog commitLog; + + public MultiDispatch(DefaultMessageStore messageStore, CommitLog commitLog) { + this.messageStore = messageStore; + this.commitLog = commitLog; + } + + public String queueKey(String queueName, MessageExtBrokerInner msgInner) { + keyBuilder.setLength(0); + keyBuilder.append(queueName); + keyBuilder.append('-'); + int queueId = msgInner.getQueueId(); + if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) { + queueId = 0; + } + keyBuilder.append(queueId); + return keyBuilder.toString(); + } + + public boolean wrapMultiDispatch(final MessageExtBrokerInner msgInner) { + if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch()) { + return true; + } + String multiDispatchQueue = msgInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); + if (StringUtils.isBlank(multiDispatchQueue)) { + return true; + } + String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + Long[] queueOffsets = new Long[queues.length]; + for (int i = 0; i < queues.length; i++) { + String key = queueKey(queues[i], msgInner); + Long queueOffset; + try { + queueOffset = getTopicQueueOffset(key); + } catch (Exception e) { + return false; + } + if (null == queueOffset) { + queueOffset = 0L; + if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) { + commitLog.getLmqTopicQueueTable().put(key, queueOffset); + } else { + commitLog.getTopicQueueTable().put(key, queueOffset); + } + } + queueOffsets[i] = queueOffset; + } + MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, + StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER)); + removeWaitStorePropertyString(msgInner); + return rebuildMsgInner(msgInner); + } + + private void removeWaitStorePropertyString(MessageExtBrokerInner msgInner) { + if (msgInner.getProperties().containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) { + // There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message. + // It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it. + String waitStoreMsgOKValue = msgInner.getProperties().remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK); + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + // Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later + msgInner.getProperties().put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue); + } else { + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + } + } + + private boolean rebuildMsgInner(MessageExtBrokerInner msgInner) { + MessageExtEncoder encoder = this.commitLog.getPutMessageThreadLocal().get().getEncoder(); + PutMessageResult encodeResult = encoder.encode(msgInner); + if (encodeResult != null) { + LOGGER.error("rebuild msgInner for multiDispatch", encodeResult); + return false; + } + msgInner.setEncodedBuff(encoder.getEncoderBuffer()); + return true; + + } + + public void updateMultiQueueOffset(final MessageExtBrokerInner msgInner) { + if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch()) { + return; + } + String multiDispatchQueue = msgInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); + if (StringUtils.isBlank(multiDispatchQueue)) { + return; + } + String multiQueueOffset = msgInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET); + if (StringUtils.isBlank(multiQueueOffset)) { + LOGGER.error("[bug] no multiQueueOffset when updating {}", msgInner.getTopic()); + return; + } + String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + String[] queueOffsets = multiQueueOffset.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + if (queues.length != queueOffsets.length) { + LOGGER.error("[bug] num is not equal when updateMultiQueueOffset {}", msgInner.getTopic()); + return; + } + for (int i = 0; i < queues.length; i++) { + String key = queueKey(queues[i], msgInner); + long queueOffset = Long.parseLong(queueOffsets[i]); + if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) { + commitLog.getLmqTopicQueueTable().put(key, ++queueOffset); + } else { + commitLog.getTopicQueueTable().put(key, ++queueOffset); + } + } + } + + private Long getTopicQueueOffset(String key) throws Exception { + Long offset = null; + if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) { + Long queueNextOffset = commitLog.getLmqTopicQueueTable().get(key); + if (queueNextOffset != null) { + offset = queueNextOffset; + } + } else { + offset = commitLog.getTopicQueueTable().get(key); + } + return offset; + } + +} diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java index e1631d7d05c..29d0d95d9af 100644 --- a/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java @@ -27,4 +27,5 @@ public enum PutMessageStatus { PROPERTIES_SIZE_EXCEEDED, OS_PAGECACHE_BUSY, UNKNOWN_ERROR, + LMQ_CONSUME_QUEUE_NUM_EXCEEDED, } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index e1439a0c9d8..1188f2191f7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -159,6 +159,10 @@ public class MessageStoreConfig { private boolean enableScheduleMessageStats = true; + private boolean enableLmq = false; + private boolean enableMultiDispatch = false; + private int maxLmqConsumeQueueNum = 20000; + public boolean isDebugLockEnable() { return debugLockEnable; } @@ -744,4 +748,28 @@ public boolean isEnableScheduleMessageStats() { public void setEnableScheduleMessageStats(boolean enableScheduleMessageStats) { this.enableScheduleMessageStats = enableScheduleMessageStats; } + + public boolean isEnableLmq() { + return enableLmq; + } + + public void setEnableLmq(boolean enableLmq) { + this.enableLmq = enableLmq; + } + + public boolean isEnableMultiDispatch() { + return enableMultiDispatch; + } + + public void setEnableMultiDispatch(boolean enableMultiDispatch) { + this.enableMultiDispatch = enableMultiDispatch; + } + + public int getMaxLmqConsumeQueueNum() { + return maxLmqConsumeQueueNum; + } + + public void setMaxLmqConsumeQueueNum(int maxLmqConsumeQueueNum) { + this.maxLmqConsumeQueueNum = maxLmqConsumeQueueNum; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/LmqBrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/LmqBrokerStatsManager.java new file mode 100644 index 00000000000..f0e23fe6388 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/stats/LmqBrokerStatsManager.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.stats; + +import org.apache.rocketmq.common.MixAll; + +public class LmqBrokerStatsManager extends BrokerStatsManager { + + public LmqBrokerStatsManager(String clusterName, boolean enableQueueStat) { + super(clusterName, enableQueueStat); + } + + @Override + public void incGroupGetNums(final String group, final String topic, final int incValue) { + String lmqGroup = group; + String lmqTopic = topic; + if (MixAll.isLmq(group)) { + lmqGroup = MixAll.LMQ_PREFIX; + } + if (MixAll.isLmq(topic)) { + lmqTopic = MixAll.LMQ_PREFIX; + } + super.incGroupGetNums(lmqGroup, lmqTopic, incValue); + } + + @Override + public void incGroupGetSize(final String group, final String topic, final int incValue) { + String lmqGroup = group; + String lmqTopic = topic; + if (MixAll.isLmq(group)) { + lmqGroup = MixAll.LMQ_PREFIX; + } + if (MixAll.isLmq(topic)) { + lmqTopic = MixAll.LMQ_PREFIX; + } + super.incGroupGetSize(lmqGroup, lmqTopic, incValue); + } + + @Override + public void incGroupGetLatency(final String group, final String topic, final int queueId, final int incValue) { + String lmqGroup = group; + String lmqTopic = topic; + if (MixAll.isLmq(group)) { + lmqGroup = MixAll.LMQ_PREFIX; + } + if (MixAll.isLmq(topic)) { + lmqTopic = MixAll.LMQ_PREFIX; + } + super.incGroupGetLatency(lmqGroup, lmqTopic, queueId, incValue); + } + + @Override + public void incSendBackNums(final String group, final String topic) { + String lmqGroup = group; + String lmqTopic = topic; + if (MixAll.isLmq(group)) { + lmqGroup = MixAll.LMQ_PREFIX; + } + if (MixAll.isLmq(topic)) { + lmqTopic = MixAll.LMQ_PREFIX; + } + super.incSendBackNums(lmqGroup, lmqTopic); + } + + @Override + public double tpsGroupGetNums(final String group, final String topic) { + String lmqGroup = group; + String lmqTopic = topic; + if (MixAll.isLmq(group)) { + lmqGroup = MixAll.LMQ_PREFIX; + } + if (MixAll.isLmq(topic)) { + lmqTopic = MixAll.LMQ_PREFIX; + } + return super.tpsGroupGetNums(lmqGroup, lmqTopic); + } + + @Override + public void recordDiskFallBehindTime(final String group, final String topic, final int queueId, + final long fallBehind) { + String lmqGroup = group; + String lmqTopic = topic; + if (MixAll.isLmq(group)) { + lmqGroup = MixAll.LMQ_PREFIX; + } + if (MixAll.isLmq(topic)) { + lmqTopic = MixAll.LMQ_PREFIX; + } + super.recordDiskFallBehindTime(lmqGroup, lmqTopic, queueId, fallBehind); + } + + @Override + public void recordDiskFallBehindSize(final String group, final String topic, final int queueId, + final long fallBehind) { + String lmqGroup = group; + String lmqTopic = topic; + if (MixAll.isLmq(group)) { + lmqGroup = MixAll.LMQ_PREFIX; + } + if (MixAll.isLmq(topic)) { + lmqTopic = MixAll.LMQ_PREFIX; + } + super.recordDiskFallBehindSize(lmqGroup, lmqTopic, queueId, fallBehind); + } + +} diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java index 7654e0a288d..668c069217d 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; @@ -146,6 +147,34 @@ public void arriving(String topic, int queueId, long logicOffset, long tagsCode, return master; } + protected DefaultMessageStore genForMultiQueue() throws Exception { + MessageStoreConfig messageStoreConfig = buildStoreConfig( + commitLogFileSize, cqFileSize, true, cqExtFileSize + ); + + messageStoreConfig.setEnableLmq(true); + messageStoreConfig.setEnableMultiDispatch(true); + + BrokerConfig brokerConfig = new BrokerConfig(); + + DefaultMessageStore master = new DefaultMessageStore( + messageStoreConfig, + new BrokerStatsManager(brokerConfig.getBrokerClusterName(), brokerConfig.isEnableDetailStat()), + new MessageArrivingListener() { + @Override + public void arriving(String topic, int queueId, long logicOffset, long tagsCode, + long msgStoreTime, byte[] filterBitMap, Map properties) { + } + } + , brokerConfig); + + assertThat(master.load()).isTrue(); + + master.start(); + + return master; + } + protected void putMsg(DefaultMessageStore master) throws Exception { long totalMsgs = 200; @@ -158,6 +187,33 @@ protected void putMsg(DefaultMessageStore master) throws Exception { } } + protected void putMsgMultiQueue(DefaultMessageStore master) throws Exception { + for (long i = 0; i < 1; i++) { + master.putMessage(buildMessageMultiQueue()); + } + } + + private MessageExtBrokerInner buildMessageMultiQueue() { + MessageExtBrokerInner msg = new MessageExtBrokerInner(); + msg.setTopic(topic); + msg.setTags("TAG1"); + msg.setKeys("Hello"); + msg.setBody(msgBody); + msg.setKeys(String.valueOf(System.currentTimeMillis())); + msg.setQueueId(queueId); + msg.setSysFlag(0); + msg.setBornTimestamp(System.currentTimeMillis()); + msg.setStoreHost(StoreHost); + msg.setBornHost(BornHost); + for (int i = 0; i < 1; i++) { + msg.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, "%LMQ%123,%LMQ%456"); + msg.putUserProperty(String.valueOf(i), "imagoodperson" + i); + } + msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); + + return msg; + } + protected void deleteDirectory(String rootPath) { File file = new File(rootPath); deleteFile(file); @@ -217,6 +273,89 @@ public void testPutMessagePositionInfo_buildCQRepeatedly() throws Exception { } + @Test + public void testPutMessagePositionInfoWrapper_MultiQueue() throws Exception { + DefaultMessageStore messageStore = null; + try { + messageStore = genForMultiQueue(); + + + int totalMessages = 10; + + for (int i = 0; i < totalMessages; i++) { + putMsgMultiQueue(messageStore); + } + Thread.sleep(5); + + ConsumeQueue cq = messageStore.getConsumeQueueTable().get(topic).get(queueId); + Method method = cq.getClass().getDeclaredMethod("putMessagePositionInfoWrapper", DispatchRequest.class, boolean.class); + + assertThat(method).isNotNull(); + + method.setAccessible(true); + + SelectMappedBufferResult result = messageStore.getCommitLog().getData(0); + assertThat(result != null).isTrue(); + + DispatchRequest dispatchRequest = messageStore.getCommitLog().checkMessageAndReturnSize(result.getByteBuffer(), false, false); + + assertThat(cq).isNotNull(); + + Object dispatchResult = method.invoke(cq, dispatchRequest, true); + + ConsumeQueue lmqCq1 = messageStore.getConsumeQueueTable().get("%LMQ%123").get(0); + + ConsumeQueue lmqCq2 = messageStore.getConsumeQueueTable().get("%LMQ%456").get(0); + + assertThat(lmqCq1).isNotNull(); + + assertThat(lmqCq2).isNotNull(); + + } finally { + if (messageStore != null) { + messageStore.shutdown(); + messageStore.destroy(); + } + deleteDirectory(storePath); + } + + } + + @Test + public void testPutMessagePositionInfoMultiQueue() throws Exception { + DefaultMessageStore messageStore = null; + try { + + messageStore = genForMultiQueue(); + + int totalMessages = 10; + + for (int i = 0; i < totalMessages; i++) { + putMsgMultiQueue(messageStore); + } + Thread.sleep(5); + + ConsumeQueue cq = messageStore.getConsumeQueueTable().get(topic).get(queueId); + + ConsumeQueue lmqCq1 = messageStore.getConsumeQueueTable().get("%LMQ%123").get(0); + + ConsumeQueue lmqCq2 = messageStore.getConsumeQueueTable().get("%LMQ%456").get(0); + + assertThat(cq).isNotNull(); + + assertThat(lmqCq1).isNotNull(); + + assertThat(lmqCq2).isNotNull(); + + } finally { + if (messageStore != null) { + messageStore.shutdown(); + messageStore.destroy(); + } + deleteDirectory(storePath); + } + } + @Test public void testConsumeQueueWithExtendData() { DefaultMessageStore master = null; diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 96451e3c6bf..b565c5c6671 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; @@ -641,6 +642,21 @@ private void damageCommitlog(long offset) throws Exception { fileChannel.close(); } + @Test + public void testCleanUnusedLmqTopic() throws Exception { + String lmqTopic = "%LMQ%123"; + + MessageExtBrokerInner messageExtBrokerInner = buildMessage(); + messageExtBrokerInner.setTopic("test"); + messageExtBrokerInner.setQueueId(0); + messageExtBrokerInner.getProperties().put(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, lmqTopic); + messageStore.putMessage(messageExtBrokerInner); + + Thread.sleep(3000); + messageStore.cleanUnusedLmqTopic(lmqTopic); + + } + private class MyMessageArrivingListener implements MessageArrivingListener { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java new file mode 100644 index 00000000000..45e4d06652d --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import java.io.File; +import java.net.InetSocketAddress; +import java.nio.charset.Charset; + +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MultiDispatchTest { + + private CommitLog commitLog; + private MultiDispatch multiDispatch; + + @Before + public void init() throws Exception { + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMappedFileSizeCommitLog(1024 * 8); + messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4); + messageStoreConfig.setMaxHashSlotNum(100); + messageStoreConfig.setMaxIndexNum(100 * 10); + messageStoreConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "unitteststore1"); + messageStoreConfig.setStorePathCommitLog( + System.getProperty("user.home") + File.separator + "unitteststore1" + File.separator + "commitlog"); + + messageStoreConfig.setEnableLmq(true); + messageStoreConfig.setEnableMultiDispatch(true); + //too much reference + DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, null); + this.commitLog = new CommitLog(messageStore); + this.multiDispatch = new MultiDispatch(messageStore, commitLog); + } + + @After + public void destroy() { + UtilAll.deleteFile(new File(System.getProperty("user.home") + File.separator + "unitteststore1")); + } + + @Test + public void queueKey() { + MessageExtBrokerInner messageExtBrokerInner = mock(MessageExtBrokerInner.class); + when(messageExtBrokerInner.getQueueId()).thenReturn(2); + String ret = multiDispatch.queueKey("%LMQ%lmq123", messageExtBrokerInner); + assertEquals(ret, "%LMQ%lmq123-0"); + } + + @Test + public void wrapMultiDispatch() { + MessageExtBrokerInner messageExtBrokerInner = mock(MessageExtBrokerInner.class); + when(messageExtBrokerInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)).thenReturn( + "%LMQ%123,%LMQ%456"); + when(messageExtBrokerInner.getTopic()).thenReturn("test"); + when(messageExtBrokerInner.getBody()).thenReturn("aaa".getBytes(Charset.forName("UTF-8"))); + when(messageExtBrokerInner.getBornHost()).thenReturn(new InetSocketAddress("127.0.0.1", 54270)); + when(messageExtBrokerInner.getStoreHost()).thenReturn(new InetSocketAddress("127.0.0.1", 10911)); + multiDispatch.wrapMultiDispatch(messageExtBrokerInner); + assertTrue(commitLog.getLmqTopicQueueTable().size() == 2); + assertTrue(commitLog.getLmqTopicQueueTable().get("%LMQ%123-0") == 0L); + assertTrue(commitLog.getLmqTopicQueueTable().get("%LMQ%456-0") == 0L); + } + + @Test + public void updateMultiQueueOffset() { + MessageExtBrokerInner messageExtBrokerInner = mock(MessageExtBrokerInner.class); + when(messageExtBrokerInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)).thenReturn("%LMQ%123,%LMQ%456"); + when(messageExtBrokerInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)).thenReturn("0,1"); + multiDispatch.updateMultiQueueOffset(messageExtBrokerInner); + assertTrue(commitLog.getLmqTopicQueueTable().size() == 2); + assertTrue(commitLog.getLmqTopicQueueTable().get("%LMQ%123-0") == 1L); + assertTrue(commitLog.getLmqTopicQueueTable().get("%LMQ%456-0") == 2L); + } +} \ No newline at end of file diff --git a/test/pom.xml b/test/pom.xml index 93ff590cb0b..9d8336e050d 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -45,6 +45,27 @@ truth 0.30 + + org.mockito + mockito-core + 2.28.2 + test + + + junit + junit + 4.12 + test + + + ${project.groupId} + rocketmq-client + + + ${project.groupId} + rocketmq-tools + + diff --git a/test/src/main/java/org/apache/rocketmq/test/lmq/benchmark/BenchLmqStore.java b/test/src/main/java/org/apache/rocketmq/test/lmq/benchmark/BenchLmqStore.java new file mode 100644 index 00000000000..392b306734e --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/lmq/benchmark/BenchLmqStore.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.test.lmq.benchmark; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullCallback; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.test.util.StatUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +public class BenchLmqStore { + private static Logger logger = LoggerFactory.getLogger(BenchLmqStore.class); + private static String namesrv = System.getProperty("namesrv", "127.0.0.1:9876"); + private static String lmqTopic = System.getProperty("lmqTopic", "lmqTestTopic"); + private static boolean enableSub = Boolean.parseBoolean(System.getProperty("enableSub", "true")); + private static String queuePrefix = System.getProperty("queuePrefix", "lmqTest"); + private static int tps = Integer.parseInt(System.getProperty("tps", "1")); + private static int lmqNum = Integer.parseInt(System.getProperty("lmqNum", "1")); + private static int sendThreadNum = Integer.parseInt(System.getProperty("sendThreadNum", "64")); + private static int consumerThreadNum = Integer.parseInt(System.getProperty("consumerThreadNum", "64")); + private static String brokerName = System.getProperty("brokerName", "broker-a"); + private static int size = Integer.parseInt(System.getProperty("size", "128")); + private static int suspendTime = Integer.parseInt(System.getProperty("suspendTime", "2000")); + private static final boolean RETRY_NO_MATCHED_MSG = Boolean.parseBoolean(System.getProperty("retry_no_matched_msg", "false")); + private static boolean benchOffset = Boolean.parseBoolean(System.getProperty("benchOffset", "false")); + private static int benchOffsetNum = Integer.parseInt(System.getProperty("benchOffsetNum", "1")); + private static Map offsetMap = new ConcurrentHashMap<>(256); + private static Map pullStatus = new ConcurrentHashMap<>(256); + private static Map> pullEvent = new ConcurrentHashMap<>(256); + public static DefaultMQProducer defaultMQProducer; + private static int pullConsumerNum = Integer.parseInt(System.getProperty("pullConsumerNum", "8")); + public static DefaultMQPullConsumer[] defaultMQPullConsumers = new DefaultMQPullConsumer[pullConsumerNum]; + private static AtomicLong rid = new AtomicLong(); + private static final String LMQ_PREFIX = "%LMQ%"; + + public static void main(String[] args) throws InterruptedException, MQClientException, MQBrokerException, + RemotingException { + defaultMQProducer = new DefaultMQProducer(); + defaultMQProducer.setProducerGroup("PID_LMQ_TEST"); + defaultMQProducer.setVipChannelEnabled(false); + defaultMQProducer.setNamesrvAddr(namesrv); + defaultMQProducer.start(); + //defaultMQProducer.createTopic(lmqTopic, lmqTopic, 8); + for (int i = 0; i < pullConsumerNum; i++) { + DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(); + defaultMQPullConsumers[i] = defaultMQPullConsumer; + defaultMQPullConsumer.setNamesrvAddr(namesrv); + defaultMQPullConsumer.setVipChannelEnabled(false); + defaultMQPullConsumer.setConsumerGroup("CID_RMQ_SYS_LMQ_TEST_" + i); + defaultMQPullConsumer.setInstanceName("CID_RMQ_SYS_LMQ_TEST_" + i); + defaultMQPullConsumer.setRegisterTopics(new HashSet<>(Arrays.asList(lmqTopic))); + defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(suspendTime); + defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(suspendTime + 1000); + defaultMQPullConsumer.start(); + } + Thread.sleep(3000L); + if (benchOffset) { + doBenchOffset(); + return; + } + ScheduledThreadPoolExecutor consumerPool = new ScheduledThreadPoolExecutor(consumerThreadNum, new ThreadFactoryImpl("test")); + for (int i = 0; i < consumerThreadNum; i++) { + final int idx = i; + consumerPool.scheduleWithFixedDelay(() -> { + try { + Map map = pullEvent.get(idx); + if (map == null) { + return; + } + for (Map.Entry entry : map.entrySet()) { + try { + Boolean status = pullStatus.get(entry.getKey()); + if (Boolean.TRUE.equals(status)) { + continue; + } + doPull(map, entry.getKey(), entry.getValue()); + } catch (Exception e) { + logger.error("pull broker msg error", e); + } + } + } catch (Exception e) { + logger.error("exec doPull task error", e); + } + }, 1, 1, TimeUnit.MILLISECONDS); + } + // init queue sub + if (enableSub && lmqNum > 0 && StringUtils.isNotBlank(brokerName)) { + for (int i = 0; i < lmqNum; i++) { + long idx = rid.incrementAndGet(); + String queue = LMQ_PREFIX + queuePrefix + idx % lmqNum; + MessageQueue mq = new MessageQueue(queue, brokerName, 0); + int queueHash = Math.abs(queue.hashCode()) % consumerThreadNum; + pullEvent.putIfAbsent(queueHash, new ConcurrentHashMap<>()); + pullEvent.get(queueHash).put(mq, idx); + } + } + Thread.sleep(5000L); + doSend(); + } + public static void doSend() { + StringBuilder sb = new StringBuilder(); + for (int j = 0; j < size; j += 10) { + sb.append("hello baby"); + } + byte[] body = sb.toString().getBytes(); + String pubKey = "pub"; + ExecutorService sendPool = Executors.newFixedThreadPool(sendThreadNum); + for (int i = 0; i < sendThreadNum; i++) { + sendPool.execute(() -> { + while (true) { + if (StatUtil.isOverFlow(pubKey, tps)) { + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + long start = System.currentTimeMillis(); + try { + long idx = rid.incrementAndGet(); + Message message = new Message(lmqTopic, body); + String queue = lmqTopic; + if (lmqNum > 0) { + queue = LMQ_PREFIX + queuePrefix + idx % lmqNum; + message.putUserProperty("INNER_MULTI_DISPATCH", queue); + } + SendResult sendResult = defaultMQProducer.send(message); + StatUtil.addInvoke(pubKey, System.currentTimeMillis() - start); + if (StatUtil.nowTps(pubKey) < 10) { + logger.warn("pub: {} ", sendResult.getMsgId()); + } + if (enableSub) { + MessageQueue mq = new MessageQueue(queue, sendResult.getMessageQueue().getBrokerName(), + lmqNum > 0 ? 0 : sendResult.getMessageQueue().getQueueId()); + int queueHash = Math.abs(queue.hashCode()) % consumerThreadNum; + pullEvent.putIfAbsent(queueHash, new ConcurrentHashMap<>()); + pullEvent.get(queueHash).put(mq, idx); + } + } catch (Exception e) { + logger.error("", e); + StatUtil.addInvoke(pubKey, System.currentTimeMillis() - start, false); + } + } + }); + } + } + public static void doPull(Map eventMap, MessageQueue mq, Long eventId) throws RemotingException, InterruptedException, MQClientException { + if (!enableSub) { + eventMap.remove(mq, eventId); + pullStatus.remove(mq); + return; + } + DefaultMQPullConsumer defaultMQPullConsumer = defaultMQPullConsumers[(int) (eventId % pullConsumerNum)]; + Long offset = offsetMap.get(mq); + if (offset == null) { + long start = System.currentTimeMillis(); + offset = defaultMQPullConsumer.maxOffset(mq); + StatUtil.addInvoke("maxOffset", System.currentTimeMillis() - start); + offsetMap.put(mq, offset); + } + long start = System.currentTimeMillis(); + if (null != pullStatus.putIfAbsent(mq, true)) { + return; + } + defaultMQPullConsumer.pullBlockIfNotFound( + mq, "*", offset, 32, + new PullCallback() { + @Override + public void onSuccess(PullResult pullResult) { + StatUtil.addInvoke(pullResult.getPullStatus().name(), System.currentTimeMillis() - start); + eventMap.remove(mq, eventId); + pullStatus.remove(mq); + offsetMap.put(mq, pullResult.getNextBeginOffset()); + StatUtil.addInvoke("doPull", System.currentTimeMillis() - start); + if (PullStatus.NO_MATCHED_MSG.equals(pullResult.getPullStatus()) && RETRY_NO_MATCHED_MSG) { + long idx = rid.incrementAndGet(); + eventMap.put(mq, idx); + } + List list = pullResult.getMsgFoundList(); + if (list == null || list.isEmpty()) { + StatUtil.addInvoke("NoMsg", System.currentTimeMillis() - start); + return; + } + for (MessageExt messageExt : list) { + StatUtil.addInvoke("sub", System.currentTimeMillis() - messageExt.getBornTimestamp()); + if (StatUtil.nowTps("sub") < 10) { + logger.warn("sub: {}", messageExt.getMsgId()); + } + } + } + @Override + public void onException(Throwable e) { + eventMap.remove(mq, eventId); + pullStatus.remove(mq); + logger.error("", e); + StatUtil.addInvoke("doPull", System.currentTimeMillis() - start, false); + } + }); + } + public static void doBenchOffset() throws RemotingException, InterruptedException, MQClientException { + ExecutorService sendPool = Executors.newFixedThreadPool(sendThreadNum); + Map offsetMap = new ConcurrentHashMap<>(); + String statKey = "benchOffset"; + TopicRouteData topicRouteData = defaultMQPullConsumers[0].getDefaultMQPullConsumerImpl(). + getRebalanceImpl().getMqClientFactory().getMQClientAPIImpl(). + getTopicRouteInfoFromNameServer(lmqTopic, 3000); + HashMap brokerMap = topicRouteData.getBrokerDatas().get(0).getBrokerAddrs(); + if (brokerMap == null || brokerMap.isEmpty()) { + return; + } + String brokerAddress = brokerMap.get(MixAll.MASTER_ID); + for (int i = 0; i < sendThreadNum; i++) { + final int flag = i; + sendPool.execute(new Runnable() { + @Override + public void run() { + while (true) { + try { + if (StatUtil.isOverFlow(statKey, tps)) { + Thread.sleep(100L); + } + long start = System.currentTimeMillis(); + DefaultMQPullConsumer defaultMQPullConsumer = defaultMQPullConsumers[(int) (rid.incrementAndGet() % pullConsumerNum)]; + long id = rid.incrementAndGet(); + String lmq = LMQ_PREFIX + queuePrefix + id % benchOffsetNum; + String lmqCid = LMQ_PREFIX + "GID_LMQ@@c" + flag + "-" + id % benchOffsetNum; + Long offset = offsetMap.get(lmq); + if (offset == null) { + offsetMap.put(lmq, 0L); + } + long newOffset1 = offsetMap.get(lmq) + 1; + UpdateConsumerOffsetRequestHeader updateHeader = new UpdateConsumerOffsetRequestHeader(); + updateHeader.setTopic(lmq); + updateHeader.setConsumerGroup(lmqCid); + updateHeader.setQueueId(0); + updateHeader.setCommitOffset(newOffset1); + defaultMQPullConsumer + .getDefaultMQPullConsumerImpl() + .getRebalanceImpl() + .getMqClientFactory() + .getMQClientAPIImpl().updateConsumerOffset(brokerAddress, updateHeader, 1000); + QueryConsumerOffsetRequestHeader queryHeader = new QueryConsumerOffsetRequestHeader(); + queryHeader.setTopic(lmq); + queryHeader.setConsumerGroup(lmqCid); + queryHeader.setQueueId(0); + long newOffset2 = defaultMQPullConsumer + .getDefaultMQPullConsumerImpl() + .getRebalanceImpl() + .getMqClientFactory() + .getMQClientAPIImpl() + .queryConsumerOffset(brokerAddress, queryHeader, 1000); + offsetMap.put(lmq, newOffset2); + if (newOffset1 != newOffset2) { + StatUtil.addInvoke("ErrorOffset", 1); + } + StatUtil.addInvoke(statKey, System.currentTimeMillis() - start); + } catch (Exception e) { + logger.error("", e); + } + } + } + }); + } + } +} \ No newline at end of file diff --git a/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java b/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java new file mode 100644 index 00000000000..5645d66dac3 --- /dev/null +++ b/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.test.util; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.annotation.Generated; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.math.BigDecimal.ROUND_HALF_UP; + +@Generated("StatUtil") +public class StatUtil { + private static Logger sysLogger = LoggerFactory.getLogger(StatUtil.class); + private static Logger logger = LoggerFactory.getLogger("StatLogger"); + private static final int MAX_KEY_NUM = Integer.parseInt(System.getProperty("stat.util.key.max.num", "10000")); + private static volatile ConcurrentMap invokeCache = new ConcurrentHashMap<>(64); + private static volatile ConcurrentMap> secondInvokeCache = new ConcurrentHashMap<>( + 64); + + private static final int STAT_WINDOW_SECONDS = Integer.parseInt(System.getProperty("stat.win.seconds", "60")); + private static final String SPLITTER = "|"; + private static ScheduledExecutorService daemon = Executors.newSingleThreadScheduledExecutor(); + + static class Invoke { + AtomicLong totalPv = new AtomicLong(); + AtomicLong failPv = new AtomicLong(); + AtomicLong sumRt = new AtomicLong(); + AtomicLong maxRt = new AtomicLong(); + AtomicLong minRt = new AtomicLong(); + AtomicInteger topSecondPv = new AtomicInteger(); + AtomicInteger secondPv = new AtomicInteger(); + AtomicLong second = new AtomicLong(System.currentTimeMillis() / 1000L); + } + + static class SecondInvoke implements Comparable { + AtomicLong total = new AtomicLong(); + AtomicLong fail = new AtomicLong(); + AtomicLong sumRt = new AtomicLong(); + AtomicLong maxRt = new AtomicLong(); + AtomicLong minRt = new AtomicLong(); + Long second = nowSecond(); + + @Override + public int compareTo(SecondInvoke o) { + return o.second.compareTo(second); + } + } + + static { + daemon.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + printInvokeStat(); + printSecondInvokeStat(); + } catch (Exception e) { + logger.error("", e); + } + } + }, STAT_WINDOW_SECONDS, STAT_WINDOW_SECONDS, TimeUnit.SECONDS); + } + + private static void printInvokeStat() { + Map tmp = invokeCache; + invokeCache = new ConcurrentHashMap<>(64); + + sysLogger.warn("printInvokeStat key count:{}", tmp.size()); + for (Map.Entry entry : tmp.entrySet()) { + String key = entry.getKey(); + Invoke invoke = entry.getValue(); + logger.warn("{}", + buildLog(key, invoke.topSecondPv.get(), invoke.totalPv.get(), invoke.failPv.get(), invoke.minRt.get(), + invoke.maxRt.get(), invoke.sumRt.get())); + } + } + + private static void printSecondInvokeStat() { + sysLogger.warn("printSecondInvokeStat key count:{}", secondInvokeCache.size()); + for (Map.Entry> entry : secondInvokeCache.entrySet()) { + String key = entry.getKey(); + Map secondInvokeMap = entry.getValue(); + long totalPv = 0L; + long failPv = 0L; + long topSecondPv = 0L; + long sumRt = 0L; + long maxRt = 0L; + long minRt = 0L; + + for (Map.Entry invokeEntry : secondInvokeMap.entrySet()) { + long second = invokeEntry.getKey(); + SecondInvoke secondInvoke = invokeEntry.getValue(); + if (nowSecond() - second >= STAT_WINDOW_SECONDS) { + secondInvokeMap.remove(second); + continue; + } + long secondPv = secondInvoke.total.get(); + totalPv += secondPv; + failPv += secondInvoke.fail.get(); + sumRt += secondInvoke.sumRt.get(); + if (maxRt < secondInvoke.maxRt.get()) { + maxRt = secondInvoke.maxRt.get(); + } + if (minRt > secondInvoke.minRt.get()) { + minRt = secondInvoke.minRt.get(); + } + if (topSecondPv < secondPv) { + topSecondPv = secondPv; + } + } + if (secondInvokeMap.isEmpty()) { + secondInvokeCache.remove(key); + continue; + } + logger.warn("{}", buildLog(key, topSecondPv, totalPv, failPv, minRt, maxRt, sumRt)); + } + } + + private static String buildLog(String key, long topSecondPv, long totalPv, long failPv, long minRt, long maxRt, + long sumRt) { + StringBuilder sb = new StringBuilder(); + sb.append(SPLITTER); + sb.append(key); + sb.append(SPLITTER); + sb.append(topSecondPv); + sb.append(SPLITTER); + int tps = new BigDecimal(totalPv).divide(new BigDecimal(STAT_WINDOW_SECONDS), + ROUND_HALF_UP).intValue(); + sb.append(tps); + sb.append(SPLITTER); + sb.append(totalPv); + sb.append(SPLITTER); + sb.append(failPv); + sb.append(SPLITTER); + sb.append(minRt); + sb.append(SPLITTER); + long avg = new BigDecimal(sumRt).divide(new BigDecimal(totalPv), + ROUND_HALF_UP).longValue(); + sb.append(avg); + sb.append(SPLITTER); + sb.append(maxRt); + return sb.toString(); + } + + public static String buildKey(String... keys) { + if (keys == null || keys.length <= 0) { + return null; + } + StringBuilder sb = new StringBuilder(); + for (String key : keys) { + sb.append(key); + sb.append(","); + } + sb.deleteCharAt(sb.length() - 1); + return sb.toString(); + } + + public static void addInvoke(String key, long rt) { + addInvoke(key, rt, true); + } + + private static Invoke getAndSetInvoke(String key) { + Invoke invoke = invokeCache.get(key); + if (invoke == null) { + invokeCache.putIfAbsent(key, new Invoke()); + } + return invokeCache.get(key); + } + + public static void addInvoke(String key, int num, long rt, boolean success) { + if (invokeCache.size() > MAX_KEY_NUM || secondInvokeCache.size() > MAX_KEY_NUM) { + return; + } + Invoke invoke = getAndSetInvoke(key); + if (invoke == null) { + return; + } + + invoke.totalPv.getAndAdd(num); + if (!success) { + invoke.failPv.getAndAdd(num); + } + long now = nowSecond(); + AtomicLong oldSecond = invoke.second; + if (oldSecond.get() == now) { + invoke.secondPv.getAndAdd(num); + } else { + if (oldSecond.compareAndSet(oldSecond.get(), now)) { + if (invoke.secondPv.get() > invoke.topSecondPv.get()) { + invoke.topSecondPv.set(invoke.secondPv.get()); + } + invoke.secondPv.set(num); + } else { + invoke.secondPv.getAndAdd(num); + } + } + + invoke.sumRt.addAndGet(rt); + if (invoke.maxRt.get() < rt) { + invoke.maxRt.set(rt); + } + if (invoke.minRt.get() > rt) { + invoke.minRt.set(rt); + } + } + + public static void addInvoke(String key, long rt, boolean success) { + if (invokeCache.size() > MAX_KEY_NUM || secondInvokeCache.size() > MAX_KEY_NUM) { + return; + } + Invoke invoke = getAndSetInvoke(key); + if (invoke == null) { + return; + } + + invoke.totalPv.getAndIncrement(); + if (!success) { + invoke.failPv.getAndIncrement(); + } + long now = nowSecond(); + AtomicLong oldSecond = invoke.second; + if (oldSecond.get() == now) { + invoke.secondPv.getAndIncrement(); + } else { + if (oldSecond.compareAndSet(oldSecond.get(), now)) { + if (invoke.secondPv.get() > invoke.topSecondPv.get()) { + invoke.topSecondPv.set(invoke.secondPv.get()); + } + invoke.secondPv.set(1); + } else { + invoke.secondPv.getAndIncrement(); + } + } + + invoke.sumRt.addAndGet(rt); + if (invoke.maxRt.get() < rt) { + invoke.maxRt.set(rt); + } + if (invoke.minRt.get() > rt) { + invoke.minRt.set(rt); + } + } + + public static SecondInvoke getAndSetSecondInvoke(String key) { + if (!secondInvokeCache.containsKey(key)) { + secondInvokeCache.putIfAbsent(key, new ConcurrentHashMap<>(STAT_WINDOW_SECONDS)); + } + Map secondInvokeMap = secondInvokeCache.get(key); + if (secondInvokeMap == null) { + return null; + } + long second = nowSecond(); + if (!secondInvokeMap.containsKey(second)) { + secondInvokeMap.putIfAbsent(second, new SecondInvoke()); + } + return secondInvokeMap.get(second); + } + + public static void addSecondInvoke(String key, long rt) { + addSecondInvoke(key, rt, true); + } + + public static void addSecondInvoke(String key, long rt, boolean success) { + if (invokeCache.size() > MAX_KEY_NUM || secondInvokeCache.size() > MAX_KEY_NUM) { + return; + } + SecondInvoke secondInvoke = getAndSetSecondInvoke(key); + if (secondInvoke == null) { + return; + } + secondInvoke.total.addAndGet(1); + if (!success) { + secondInvoke.fail.addAndGet(1); + } + secondInvoke.sumRt.addAndGet(rt); + if (secondInvoke.maxRt.get() < rt) { + secondInvoke.maxRt.set(rt); + } + if (secondInvoke.minRt.get() > rt) { + secondInvoke.minRt.set(rt); + } + } + + public static void addPv(String key, long totalPv) { + addPv(key, totalPv, true); + } + + public static void addPv(String key, long totalPv, boolean success) { + if (invokeCache.size() > MAX_KEY_NUM || secondInvokeCache.size() > MAX_KEY_NUM) { + return; + } + if (totalPv <= 0) { + return; + } + Invoke invoke = getAndSetInvoke(key); + if (invoke == null) { + return; + } + invoke.totalPv.addAndGet(totalPv); + if (!success) { + invoke.failPv.addAndGet(totalPv); + } + long now = nowSecond(); + AtomicLong oldSecond = invoke.second; + if (oldSecond.get() == now) { + invoke.secondPv.addAndGet((int)totalPv); + } else { + if (oldSecond.compareAndSet(oldSecond.get(), now)) { + if (invoke.secondPv.get() > invoke.topSecondPv.get()) { + invoke.topSecondPv.set(invoke.secondPv.get()); + } + invoke.secondPv.set((int)totalPv); + } else { + invoke.secondPv.addAndGet((int)totalPv); + } + } + } + + public static void addSecondPv(String key, long totalPv) { + addSecondPv(key, totalPv, true); + } + + public static void addSecondPv(String key, long totalPv, boolean success) { + if (invokeCache.size() > MAX_KEY_NUM || secondInvokeCache.size() > MAX_KEY_NUM) { + return; + } + if (totalPv <= 0) { + return; + } + SecondInvoke secondInvoke = getAndSetSecondInvoke(key); + if (secondInvoke == null) { + return; + } + secondInvoke.total.addAndGet(totalPv); + if (!success) { + secondInvoke.fail.addAndGet(totalPv); + } + } + + public static boolean isOverFlow(String key, int tps) { + return nowTps(key) >= tps; + } + + public static int nowTps(String key) { + Map secondInvokeMap = secondInvokeCache.get(key); + if (secondInvokeMap != null) { + SecondInvoke secondInvoke = secondInvokeMap.get(nowSecond()); + if (secondInvoke != null) { + return (int)secondInvoke.total.get(); + } + } + Invoke invoke = invokeCache.get(key); + if (invoke == null) { + return 0; + } + AtomicLong oldSecond = invoke.second; + if (oldSecond.get() == nowSecond()) { + return invoke.secondPv.get(); + } + return 0; + } + + public static int totalPvInWindow(String key, int windowSeconds) { + List list = secondInvokeList(key, windowSeconds); + long totalPv = 0; + for (int i = 0; i < windowSeconds && i < list.size(); i++) { + totalPv += list.get(i).total.get(); + } + return (int)totalPv; + } + + public static int failPvInWindow(String key, int windowSeconds) { + List list = secondInvokeList(key, windowSeconds); + long failPv = 0; + for (int i = 0; i < windowSeconds && i < list.size(); i++) { + failPv += list.get(i).fail.get(); + } + return (int)failPv; + } + + public static int topTpsInWindow(String key, int windowSeconds) { + List list = secondInvokeList(key, windowSeconds); + long topTps = 0; + for (int i = 0; i < windowSeconds && i < list.size(); i++) { + long secondPv = list.get(i).total.get(); + if (topTps < secondPv) { + topTps = secondPv; + } + } + return (int)topTps; + } + + public static int avgRtInWindow(String key, int windowSeconds) { + List list = secondInvokeList(key, windowSeconds); + long sumRt = 0; + long totalPv = 0; + for (int i = 0; i < windowSeconds && i < list.size(); i++) { + sumRt += list.get(i).sumRt.get(); + totalPv += list.get(i).total.get(); + } + if (totalPv <= 0) { + return 0; + } + long avg = new BigDecimal(sumRt).divide(new BigDecimal(totalPv), + ROUND_HALF_UP).longValue(); + return (int)avg; + } + + public static int maxRtInWindow(String key, int windowSeconds) { + List list = secondInvokeList(key, windowSeconds); + long maxRt = 0; + long totalPv = 0; + for (int i = 0; i < windowSeconds && i < list.size(); i++) { + if (maxRt < list.get(i).maxRt.get()) { + maxRt = list.get(i).maxRt.get(); + } + } + return (int)maxRt; + } + + public static int minRtInWindow(String key, int windowSeconds) { + List list = secondInvokeList(key, windowSeconds); + long minRt = 0; + long totalPv = 0; + for (int i = 0; i < windowSeconds && i < list.size(); i++) { + if (minRt < list.get(i).minRt.get()) { + minRt = list.get(i).minRt.get(); + } + } + return (int)minRt; + } + + private static List secondInvokeList(String key, int windowSeconds) { + if (windowSeconds > STAT_WINDOW_SECONDS || windowSeconds <= 0) { + throw new IllegalArgumentException("windowSeconds Must Not be great than " + STAT_WINDOW_SECONDS); + } + Map secondInvokeMap = secondInvokeCache.get(key); + if (secondInvokeMap == null || secondInvokeMap.isEmpty()) { + return new ArrayList<>(); + } + List list = new ArrayList<>(); + list.addAll(secondInvokeMap.values()); + Collections.sort(list); + return list; + } + + private static long nowSecond() { + return System.currentTimeMillis() / 1000L; + } + +} diff --git a/test/src/test/java/org/apache/rocketmq/test/lmq/TestBenchLmqStore.java b/test/src/test/java/org/apache/rocketmq/test/lmq/TestBenchLmqStore.java new file mode 100644 index 00000000000..98bb8bf1961 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/lmq/TestBenchLmqStore.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.test.lmq; + +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullCallback; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl; +import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.test.lmq.benchmark.BenchLmqStore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class TestBenchLmqStore { + @Test + public void test() throws MQBrokerException, RemotingException, InterruptedException, MQClientException { + System.setProperty("sendThreadNum", "1"); + System.setProperty("pullConsumerNum", "1"); + System.setProperty("consumerThreadNum", "1"); + BenchLmqStore.defaultMQProducer = mock(DefaultMQProducer.class); + SendResult sendResult = new SendResult(); + when(BenchLmqStore.defaultMQProducer.send(any(Message.class))).thenReturn(sendResult); + BenchLmqStore.doSend(); + Thread.sleep(100L); + //verify(BenchLmqStore.defaultMQProducer, atLeastOnce()).send(any(Message.class)); + BenchLmqStore.defaultMQPullConsumers = new DefaultMQPullConsumer[1]; + BenchLmqStore.defaultMQPullConsumers[0] = mock(DefaultMQPullConsumer.class); + BenchLmqStore.doPull(new ConcurrentHashMap<>(), new MessageQueue(), 1L); + verify(BenchLmqStore.defaultMQPullConsumers[0], atLeastOnce()).pullBlockIfNotFound(any(MessageQueue.class), anyString(), anyLong(), anyInt(), any( + PullCallback.class)); + } + @Test + public void testOffset() throws RemotingException, InterruptedException, MQClientException, MQBrokerException, IllegalAccessException { + System.setProperty("sendThreadNum", "1"); + DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class); + BenchLmqStore.defaultMQPullConsumers = new DefaultMQPullConsumer[1]; + BenchLmqStore.defaultMQPullConsumers[0] = defaultMQPullConsumer; + DefaultMQPullConsumerImpl defaultMQPullConsumerImpl = mock(DefaultMQPullConsumerImpl.class); + when(defaultMQPullConsumer.getDefaultMQPullConsumerImpl()).thenReturn(defaultMQPullConsumerImpl); + RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class); + when(defaultMQPullConsumerImpl.getRebalanceImpl()).thenReturn(rebalanceImpl); + MQClientInstance mqClientInstance = mock(MQClientInstance.class); + when(rebalanceImpl.getMqClientFactory()).thenReturn(mqClientInstance); + MQClientAPIImpl mqClientAPI = mock(MQClientAPIImpl.class); + when(mqClientInstance.getMQClientAPIImpl()).thenReturn(mqClientAPI); + TopicRouteData topicRouteData = new TopicRouteData(); + HashMap brokerAddrs = new HashMap<>(); + brokerAddrs.put(MixAll.MASTER_ID, "test"); + List brokerData = Arrays.asList(new BrokerData("test", "test", brokerAddrs)); + topicRouteData.setBrokerDatas(brokerData); + FieldUtils.writeStaticField(BenchLmqStore.class, "lmqTopic", "test", true); + when(mqClientAPI.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData); + BenchLmqStore.doBenchOffset(); + Thread.sleep(100L); + verify(mqClientAPI, atLeastOnce()).queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong()); + verify(mqClientAPI, atLeastOnce()).updateConsumerOffset(anyString(), any(UpdateConsumerOffsetRequestHeader.class), anyLong()); + } +} \ No newline at end of file