安装参考:
记住要下载编译之后的包
原因: 运行的时候内存不足:
解决方案:
由于我的虚拟器内存比较小,所以启动前需要调节一下,启动的虚拟内存参数配置。
vi ./alibaba-rocketmq/bin/runserver.sh #nameserver 内存
vi ./alibaba-rocketmq/bin/runbroker.sh #broke内存
JAVA_OPT_1="-server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"(参考你自己的机器内存)
定义生成者
package com.bb.bbtest.mq;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;/** * @author :taohong.ouyang * 2017/2/6. */public class Producer { private static DefaultMQProducer producer = new DefaultMQProducer("oythProducerGroupName"); private static int initalState = 0; private Producer(){ } public static DefaultMQProducer getDefaultMQProducer(){ if(producer == null){ producer = new DefaultMQProducer("oythProducerGroupName"); } if(initalState == 0){ producer.setNamesrvAddr("10.2.223.71:9876"); try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); return null; } initalState = 1; } return producer; }}
定义消费者
package com.bb.bbtest.mq;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;/** * @author :taohong.ouyang * 2017/2/6. */public class Consumer { private static DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("oythPushConsumer"); private static int initalState = 0; private Consumer(){ } public static DefaultMQPushConsumer getDefaultMQPushConsumer(){ if(consumer == null){ consumer = new DefaultMQPushConsumer("oythPushConsumer"); } if (initalState == 0 ){ consumer.setNamesrvAddr("10.2.223.71:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); initalState = 1; } return consumer; }}
定义生产者应用
package com.bb.bbtest.mq;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.client.exception.MQBrokerException;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;import com.alibaba.rocketmq.remoting.exception.RemotingException;/** * @author :taohong.ouyang * 2017/2/6. */public class MqProducer { public static void main(String[] args){ sendMsg(); } public static void sendMsg(){ DefaultMQProducer producer = Producer.getDefaultMQProducer(); try { for(int i=0;i<2;i++){ Message msg = new Message( "TopicTest1", // topic "TagC", // tag "OrderID00"+i, // key ("Hello MetaQ"+i).getBytes()); // body SendResult sendResult = producer.send(msg); System.out.println(String.format("sendResult:{%s}", sendResult)); } } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (RemotingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (MQBrokerException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } producer.shutdown(); }}
定义消费者应用
package com.bb.bbtest.mq;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;import java.util.List;/** * @author :taohong.ouyang * 2017/2/6. */public class MqConsumer { public static void main(String[] args){ receiveMsg(); } public static void receiveMsg(){ // 获取消息生产者 DefaultMQPushConsumer consumer = Consumer.getDefaultMQPushConsumer(); // 订阅主体 try { consumer.subscribe("TopicTest1", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息 */ public ConsumeConcurrentlyStatus consumeMessage( Listmsgs, ConsumeConcurrentlyContext context) { System.out.println(String.format("currentThreadName:{%s} and Receive New Messages:{%s}",Thread.currentThread().getName(),msgs)); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("TopicTest1")) { // 执行TopicTest1的消费逻辑 if (msg.getTags() != null && msg.getTags().equals("TagA")) { // 执行TagA的消费 System.out.println("MsgBody:{}"+new String(msg.getBody())); } else if (msg.getTags() != null && msg.getTags().equals("TagC")) { System.out.println("TagC 开始消费"); // 执行TagC的消费 } else if (msg.getTags() != null && msg.getTags().equals("TagD")) { // 执行TagD的消费 } } else if (msg.getTopic().equals("TopicTest2")) { // 执行TopicTest2的消费逻辑 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可 */ consumer.start(); System.out.println("Consumer Started."); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } }}