同步master,同步刷盘 。broker8G
[root@dev-mysql-16-103 ~]# java -cp rocketmq-example.jar org.apache.rocketmq.example.benchmark.TransactionProducer 1 20 true true Send TPS: 438 Max RT: 277 Average RT: 2.277 Send Failed: 0 Response Failed: 0 transaction checkCount: 0 Send TPS: 571 Max RT: 277 Average RT: 1.749 Send Failed: 0 Response Failed: 0 transaction checkCount: 0 Send TPS: 514 Max RT: 277 Average RT: 1.942 Send Failed: 0 Response Failed: 0 transaction checkCount: 0 Send TPS: 538 Max RT: 277 Average RT: 1.856 Send Failed: 0 Response Failed: 0 transaction checkCount: 0 Send TPS: 562 Max RT: 277 Average RT: 1.776 Send Failed: 0 Response Failed: 0 transaction checkCount: 0 [root@dev-mysql-16-103 ~]# java -cp rocketmq-example.jar org.apache.rocketmq.example.benchmark.TransactionProducer 2 20 true true Send TPS: 617 Max RT: 271 Average RT: 3.235 Send Failed: 0 Response Failed: 0 transaction checkCount: 0 Send TPS: 660 Max RT: 271 Average RT: 3.026 Send Failed: 0 Response Failed: 0 transaction checkCount: 0 异步master,同步刷盘 。broker8G [root@dev-mysql-16-103 ~]# java -cp rocketmq-example.jar org.apache.rocketmq.example.benchmark.TransactionProducer 1 20 true true Send TPS: 466 Max RT: 287 Average RT: 2.143 Send Failed: 0 Response Failed: 0 transaction checkCount: 1244 Send TPS: 621 Max RT: 287 Average RT: 1.609 Send Failed: 0 Response Failed: 0 transaction checkCount: 1244 Send TPS: 698 Max RT: 287 Average RT: 1.432 Send Failed: 0 Response Failed: 0 transaction checkCount: 1244 Send TPS: 730 Max RT: 287 Average RT: 1.368 Send Failed: 0 Response Failed: 0 transaction checkCount: 1244 Send TPS: 638 Max RT: 287 Average RT: 1.566 Send Failed: 0 Response Failed: 0 transaction checkCount: 1244 Send TPS: 671 Max RT: 287 Average RT: 1.489 Send Failed: 0 Response Failed: 0 transaction checkCount: 1244 异步master,异步刷盘 。broker8G [root@dev-mysql-16-103 ~]# java -cp rocketmq-example.jar org.apache.rocketmq.example.benchmark.TransactionProducer 1 20 true true Send TPS: 1480 Max RT: 265 Average RT: 0.545 Send Failed: 8 Response Failed: 0 transaction checkCount: 17614
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.rocketmq.example.benchmark;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.LocalTransactionState;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.client.producer.TransactionMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException;import java.util.LinkedList;import java.util.Timer;import java.util.TimerTask;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicLong;public class TransactionProducer { private static int threadCount; private static int messageSize; private static boolean isCheck; private static boolean isCheckFalse; private static String nameSrv = null; private static int sleepTime = 0; public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 1; messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2; isCheck = args.length >= 3 && Boolean.parseBoolean(args[2]); isCheckFalse = args.length >= 4 && Boolean.parseBoolean(args[3]); nameSrv = args.length >= 5 ? args[4] : "101.101.16.103:9876;102.103.161.104:9876"; // sleepTime = args.length >= 6 ? Integer.parseInt(args[5]) : 0; final Message msg = buildMessage(messageSize); final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount); final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer(); final Timer timer = new Timer("BenchmarkTimerThread", true); final LinkedListsnapshotList = new LinkedList (); timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { snapshotList.addLast(statsBenchmark.createSnapshot()); while (snapshotList.size() > 10) { snapshotList.removeFirst(); } } }, 1000, 1000); timer.scheduleAtFixedRate(new TimerTask() { private void printStats() { if (snapshotList.size() >= 10) { Long[] begin = snapshotList.getFirst(); Long[] end = snapshotList.getLast(); final long sendTps = (long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L); final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]); System.out.printf( "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n", sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4], end[6]); } } @Override public void run() { try { this.printStats(); } catch (Exception e) { e.printStackTrace(); } } }, 10000, 10000); final TransactionListener transactionListener = new TransactionListenerImpl(isCheckFalse, isCheck, statsBenchmark); final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer"); producer.setInstanceName(Long.toString(System.currentTimeMillis())); producer.setTransactionListener(transactionListener); producer.setDefaultTopicQueueNums(1000); producer.setNamesrvAddr(nameSrv); producer.start(); for (int i = 0; i < threadCount; i++) { sendThreadPool.execute(new Runnable() { @Override public void run() { while (true) { try { // Thread.sleep(1000); final long beginTimestamp = System.currentTimeMillis(); SendResult sendResult = producer.sendMessageInTransaction(msg, null); if (sendResult != null) { statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); } final long currentRT = System.currentTimeMillis() - beginTimestamp; statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT); long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); while (currentRT > prevMaxRT) { boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT); if (updated) { break; } prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); } } catch (MQClientException e) { e.printStackTrace(); statsBenchmark.getSendRequestFailedCount().incrementAndGet(); }finally { try { if(sleepTime!=0) { Thread.sleep(sleepTime); } } catch (InterruptedException e) { e.printStackTrace(); } } } } }); } } private static Message buildMessage(final int messageSize) throws UnsupportedEncodingException { Message msg = new Message(); msg.setTopic("BenchmarkTest"); StringBuilder sb = new StringBuilder(); for (int i = 0; i < messageSize; i += 10) { sb.append("hello baby"); } msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); return msg; }}class TransactionListenerImpl implements TransactionListener { private boolean isCheckFalse; private StatsBenchmarkTProducer statsBenchmarkTProducer; private boolean isCheckLocal; public TransactionListenerImpl(boolean isCheckFalse, boolean isCheckLocal, StatsBenchmarkTProducer statsBenchmarkTProducer) { this.isCheckFalse = isCheckFalse; this.isCheckLocal = isCheckLocal; this.statsBenchmarkTProducer = statsBenchmarkTProducer; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet(); if (isCheckFalse) { return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState executeLocalTransaction(final Message msg, final Object arg) { if (isCheckLocal) { return LocalTransactionState.UNKNOW; } return LocalTransactionState.COMMIT_MESSAGE; }}class StatsBenchmarkTProducer { private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L); private final AtomicLong sendRequestFailedCount = new AtomicLong(0L); private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L); private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L); private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L); private final AtomicLong sendMessageMaxRT = new AtomicLong(0L); private final AtomicLong checkRequestSuccessCount = new AtomicLong(0L); public Long[] createSnapshot() { Long[] snap = new Long[] { System.currentTimeMillis(), this.sendRequestSuccessCount.get(), this.sendRequestFailedCount.get(), this.receiveResponseSuccessCount.get(), this.receiveResponseFailedCount.get(), this.sendMessageSuccessTimeTotal.get(), this.checkRequestSuccessCount.get()}; return snap; } public AtomicLong getSendRequestSuccessCount() { return sendRequestSuccessCount; } public AtomicLong getSendRequestFailedCount() { return sendRequestFailedCount; } public AtomicLong getReceiveResponseSuccessCount() { return receiveResponseSuccessCount; } public AtomicLong getReceiveResponseFailedCount() { return receiveResponseFailedCount; } public AtomicLong getSendMessageSuccessTimeTotal() { return sendMessageSuccessTimeTotal; } public AtomicLong getSendMessageMaxRT() { return sendMessageMaxRT; } public AtomicLong getCheckRequestSuccessCount() { return checkRequestSuccessCount; }}