博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rocketmq 事物消息压测
阅读量:7305 次
发布时间:2019-06-30

本文共 11880 字,大约阅读时间需要 39 分钟。

hot3.png

同步master,同步刷盘 。broker8G

[root@dev-mysql-16-103 ~]# java -cp rocketmq-example.jar org.apache.rocketmq.example.benchmark.TransactionProducer  1  20  true true
Send TPS: 438 Max RT: 277 Average RT:   2.277 Send Failed: 0 Response Failed: 0 transaction checkCount: 0 
Send TPS: 571 Max RT: 277 Average RT:   1.749 Send Failed: 0 Response Failed: 0 transaction checkCount: 0 
Send TPS: 514 Max RT: 277 Average RT:   1.942 Send Failed: 0 Response Failed: 0 transaction checkCount: 0 
Send TPS: 538 Max RT: 277 Average RT:   1.856 Send Failed: 0 Response Failed: 0 transaction checkCount: 0 
Send TPS: 562 Max RT: 277 Average RT:   1.776 Send Failed: 0 Response Failed: 0 transaction checkCount: 0 

[root@dev-mysql-16-103 ~]# java -cp rocketmq-example.jar org.apache.rocketmq.example.benchmark.TransactionProducer  2 20  true true
Send TPS: 617 Max RT: 271 Average RT:   3.235 Send Failed: 0 Response Failed: 0 transaction checkCount: 0 
Send TPS: 660 Max RT: 271 Average RT:   3.026 Send Failed: 0 Response Failed: 0 transaction checkCount: 0 

异步master,同步刷盘 。broker8G
[root@dev-mysql-16-103 ~]# java -cp rocketmq-example.jar org.apache.rocketmq.example.benchmark.TransactionProducer  1  20  true true
Send TPS: 466 Max RT: 287 Average RT:   2.143 Send Failed: 0 Response Failed: 0 transaction checkCount: 1244 
Send TPS: 621 Max RT: 287 Average RT:   1.609 Send Failed: 0 Response Failed: 0 transaction checkCount: 1244 
Send TPS: 698 Max RT: 287 Average RT:   1.432 Send Failed: 0 Response Failed: 0 transaction checkCount: 1244 
Send TPS: 730 Max RT: 287 Average RT:   1.368 Send Failed: 0 Response Failed: 0 transaction checkCount: 1244 
Send TPS: 638 Max RT: 287 Average RT:   1.566 Send Failed: 0 Response Failed: 0 transaction checkCount: 1244 
Send TPS: 671 Max RT: 287 Average RT:   1.489 Send Failed: 0 Response Failed: 0 transaction checkCount: 1244 

异步master,异步刷盘 。broker8G
[root@dev-mysql-16-103 ~]# java -cp rocketmq-example.jar org.apache.rocketmq.example.benchmark.TransactionProducer  1  20  true true
Send TPS: 1480 Max RT: 265 Average RT:   0.545 Send Failed: 8 Response Failed: 0 transaction checkCount: 17614 

 

 

 

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements.  See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License.  You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.rocketmq.example.benchmark;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.LocalTransactionState;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.client.producer.TransactionMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException;import java.util.LinkedList;import java.util.Timer;import java.util.TimerTask;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicLong;public class TransactionProducer {    private static int threadCount;    private static int messageSize;    private static boolean isCheck;    private static boolean isCheckFalse;    private static String nameSrv = null;    private static int sleepTime = 0;    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {        threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 1;        messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2;        isCheck = args.length >= 3 && Boolean.parseBoolean(args[2]);        isCheckFalse = args.length >= 4 && Boolean.parseBoolean(args[3]);        nameSrv = args.length >= 5 ? args[4] : "101.101.16.103:9876;102.103.161.104:9876"; //        sleepTime = args.length >= 6 ? Integer.parseInt(args[5]) : 0;                        final Message msg = buildMessage(messageSize);        final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);        final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer();        final Timer timer = new Timer("BenchmarkTimerThread", true);        final LinkedList
snapshotList = new LinkedList
(); timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { snapshotList.addLast(statsBenchmark.createSnapshot()); while (snapshotList.size() > 10) { snapshotList.removeFirst(); } } }, 1000, 1000); timer.scheduleAtFixedRate(new TimerTask() { private void printStats() { if (snapshotList.size() >= 10) { Long[] begin = snapshotList.getFirst(); Long[] end = snapshotList.getLast(); final long sendTps = (long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L); final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]); System.out.printf( "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n", sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4], end[6]); } } @Override public void run() { try { this.printStats(); } catch (Exception e) { e.printStackTrace(); } } }, 10000, 10000); final TransactionListener transactionListener = new TransactionListenerImpl(isCheckFalse, isCheck, statsBenchmark); final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer"); producer.setInstanceName(Long.toString(System.currentTimeMillis())); producer.setTransactionListener(transactionListener); producer.setDefaultTopicQueueNums(1000); producer.setNamesrvAddr(nameSrv); producer.start(); for (int i = 0; i < threadCount; i++) { sendThreadPool.execute(new Runnable() { @Override public void run() { while (true) { try { // Thread.sleep(1000); final long beginTimestamp = System.currentTimeMillis(); SendResult sendResult = producer.sendMessageInTransaction(msg, null); if (sendResult != null) { statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); } final long currentRT = System.currentTimeMillis() - beginTimestamp; statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT); long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); while (currentRT > prevMaxRT) { boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT); if (updated) { break; } prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); } } catch (MQClientException e) { e.printStackTrace(); statsBenchmark.getSendRequestFailedCount().incrementAndGet(); }finally { try { if(sleepTime!=0) { Thread.sleep(sleepTime); } } catch (InterruptedException e) { e.printStackTrace(); } } } } }); } } private static Message buildMessage(final int messageSize) throws UnsupportedEncodingException { Message msg = new Message(); msg.setTopic("BenchmarkTest"); StringBuilder sb = new StringBuilder(); for (int i = 0; i < messageSize; i += 10) { sb.append("hello baby"); } msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); return msg; }}class TransactionListenerImpl implements TransactionListener { private boolean isCheckFalse; private StatsBenchmarkTProducer statsBenchmarkTProducer; private boolean isCheckLocal; public TransactionListenerImpl(boolean isCheckFalse, boolean isCheckLocal, StatsBenchmarkTProducer statsBenchmarkTProducer) { this.isCheckFalse = isCheckFalse; this.isCheckLocal = isCheckLocal; this.statsBenchmarkTProducer = statsBenchmarkTProducer; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet(); if (isCheckFalse) { return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState executeLocalTransaction(final Message msg, final Object arg) { if (isCheckLocal) { return LocalTransactionState.UNKNOW; } return LocalTransactionState.COMMIT_MESSAGE; }}class StatsBenchmarkTProducer { private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L); private final AtomicLong sendRequestFailedCount = new AtomicLong(0L); private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L); private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L); private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L); private final AtomicLong sendMessageMaxRT = new AtomicLong(0L); private final AtomicLong checkRequestSuccessCount = new AtomicLong(0L); public Long[] createSnapshot() { Long[] snap = new Long[] { System.currentTimeMillis(), this.sendRequestSuccessCount.get(), this.sendRequestFailedCount.get(), this.receiveResponseSuccessCount.get(), this.receiveResponseFailedCount.get(), this.sendMessageSuccessTimeTotal.get(), this.checkRequestSuccessCount.get()}; return snap; } public AtomicLong getSendRequestSuccessCount() { return sendRequestSuccessCount; } public AtomicLong getSendRequestFailedCount() { return sendRequestFailedCount; } public AtomicLong getReceiveResponseSuccessCount() { return receiveResponseSuccessCount; } public AtomicLong getReceiveResponseFailedCount() { return receiveResponseFailedCount; } public AtomicLong getSendMessageSuccessTimeTotal() { return sendMessageSuccessTimeTotal; } public AtomicLong getSendMessageMaxRT() { return sendMessageMaxRT; } public AtomicLong getCheckRequestSuccessCount() { return checkRequestSuccessCount; }}

 

转载于:https://my.oschina.net/xiaominmin/blog/1929127

你可能感兴趣的文章
[case分享]Exchange 2013通配证书无法分配POP和IMAP服务
查看>>
elasticsearch无法启动报错汇总
查看>>
IOS---SQLite3使用
查看>>
【网络资料URL】—— hive
查看>>
java.lang.NoSuchMethodException: UpdateTime报错
查看>>
flume实时收集+文件滚动+断点续传
查看>>
【Spring开发】—— AOP之方法级拦截
查看>>
微信接口开发缓存全局token,ticket
查看>>
脚本找出mysql中缺少主键的表
查看>>
使用Hyper-V创建虚拟机
查看>>
NSString用法总结
查看>>
js字符串截取函数slice()、substring()、substr()
查看>>
平时使用较多的,总结元素居中的几种方法
查看>>
Could not resolve com.android.support:appcompat-v7:28.0.0 错误处理
查看>>
CCNP---BGP笔记之二
查看>>
关于浏览器禁用cook URL重写
查看>>
安全观之我见(一):信息安全是一种生产要素
查看>>
qt5.3.1+opencv2.4.9编译环境的搭建
查看>>
IE10 访问 ASP.NET 站点的问题
查看>>
Cetnos 6.4 安装NFS
查看>>