博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ
阅读量:7012 次
发布时间:2019-06-28

本文共 6736 字,大约阅读时间需要 22 分钟。

hot3.png

安装参考:

记住要下载编译之后的包

安装报错信息

原因: 运行的时候内存不足:

解决方案:

由于我的虚拟器内存比较小,所以启动前需要调节一下,启动的虚拟内存参数配置。

vi ./alibaba-rocketmq/bin/runserver.sh #nameserver 内存

vi ./alibaba-rocketmq/bin/runbroker.sh #broke内存

JAVA_OPT_1="-server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"(参考你自己的机器内存)

定义生成者

package com.bb.bbtest.mq;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;/** * @author :taohong.ouyang *         2017/2/6. */public class Producer {    private static DefaultMQProducer producer = new DefaultMQProducer("oythProducerGroupName");    private static int initalState = 0;    private Producer(){    }    public static DefaultMQProducer getDefaultMQProducer(){        if(producer == null){            producer = new DefaultMQProducer("oythProducerGroupName");        }        if(initalState == 0){            producer.setNamesrvAddr("10.2.223.71:9876");            try {                producer.start();            } catch (MQClientException e) {                e.printStackTrace();                return null;            }            initalState = 1;        }        return producer;    }}

定义消费者

package com.bb.bbtest.mq;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;/** * @author :taohong.ouyang *         2017/2/6. */public class Consumer {    private static DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("oythPushConsumer");    private static int initalState = 0;    private Consumer(){    }    public static DefaultMQPushConsumer getDefaultMQPushConsumer(){        if(consumer == null){            consumer =  new DefaultMQPushConsumer("oythPushConsumer");        }        if (initalState == 0 ){            consumer.setNamesrvAddr("10.2.223.71:9876");            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);            initalState = 1;        }        return consumer;    }}

定义生产者应用

package com.bb.bbtest.mq;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.client.exception.MQBrokerException;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;import com.alibaba.rocketmq.remoting.exception.RemotingException;/** * @author :taohong.ouyang *         2017/2/6. */public class MqProducer {    public static void main(String[] args){        sendMsg();    }    public static void sendMsg(){        DefaultMQProducer producer = Producer.getDefaultMQProducer();        try {            for(int i=0;i<2;i++){                Message msg = new Message(                        "TopicTest1",                   // topic                        "TagC",                         // tag                        "OrderID00"+i,                  // key                        ("Hello MetaQ"+i).getBytes());  // body                SendResult sendResult = producer.send(msg);                System.out.println(String.format("sendResult:{%s}", sendResult));            }        } catch (MQClientException e) {            // TODO Auto-generated catch block            e.printStackTrace();        } catch (RemotingException e) {            // TODO Auto-generated catch block            e.printStackTrace();        } catch (MQBrokerException e) {            // TODO Auto-generated catch block            e.printStackTrace();        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        producer.shutdown();    }}

定义消费者应用

package com.bb.bbtest.mq;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;import java.util.List;/** * @author :taohong.ouyang *         2017/2/6. */public class MqConsumer {    public static void main(String[] args){        receiveMsg();    }    public static void receiveMsg(){        // 获取消息生产者        DefaultMQPushConsumer consumer = Consumer.getDefaultMQPushConsumer();        // 订阅主体        try {            consumer.subscribe("TopicTest1", "*");            consumer.registerMessageListener(new MessageListenerConcurrently() {                /**                 * * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息                 */                public ConsumeConcurrentlyStatus consumeMessage(                        List
msgs, ConsumeConcurrentlyContext context) { System.out.println(String.format("currentThreadName:{%s} and Receive New Messages:{%s}",Thread.currentThread().getName(),msgs)); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("TopicTest1")) { // 执行TopicTest1的消费逻辑 if (msg.getTags() != null && msg.getTags().equals("TagA")) { // 执行TagA的消费 System.out.println("MsgBody:{}"+new String(msg.getBody())); } else if (msg.getTags() != null && msg.getTags().equals("TagC")) { System.out.println("TagC 开始消费"); // 执行TagC的消费 } else if (msg.getTags() != null && msg.getTags().equals("TagD")) { // 执行TagD的消费 } } else if (msg.getTopic().equals("TopicTest2")) { // 执行TopicTest2的消费逻辑 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
*/ consumer.start(); System.out.println("Consumer Started."); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } }}

转载于:https://my.oschina.net/ouyangtaohong/blog/832262

你可能感兴趣的文章
Java实现注册邮箱激活验证
查看>>
数据库缓存
查看>>
mvc 数据验证金钱格式decimal格式验证
查看>>
常用的Web服务器
查看>>
UPW学习资料整理 .NET C# 转
查看>>
Oracle12c中新建用户
查看>>
分布式编译工具
查看>>
对我而言晦涩的递归
查看>>
React Native 从入门到原理
查看>>
iOS如何随意的穿插跳跃,push来pop去
查看>>
使用maven编译Java项目 http://www.tuicool.com/articles/YfIfIrq
查看>>
【原创】JDK动态代理,此次之后,永生难忘。
查看>>
collection的框架结构
查看>>
c++中的对象复制
查看>>
ubuntu下linux内核源码阅读工具和调试方法总结
查看>>
PHP生成UTF-8编码的CSV文件用Excel打开乱码的解决办法
查看>>
IOS-5个可以帮你优化App的优秀网站
查看>>
ArrayIndexOutOfBoundsException
查看>>
JAVA判断各种类型数据是否为空
查看>>
如何使用kali的Searchsploit查找软件漏洞
查看>>