`

ActiveMQ入门实例

    博客分类:
  • JMS
阅读更多

一、ActiveMQ入门实例

二、Spring整合JMS(一)——基于ActiveMQ实现

三、Spring整合JMS(二)——三种消息监听器

四、Spring整合JMS(三)——MessageConverter消息转换器

五、Spring整合JMS(四)——事务管理

 

1.下载ActiveMQ

去官方网站下载:http://activemq.apache.org/

2.运行ActiveMQ

将apache-activemq-5.5.1-bin.zip解压缩,双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。

启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。

3.导入jar包:



 

4.实例

1、发送端

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

	/** 创建Session */
	public static Session createSession() {
		// 连接工厂,JMS 用它创建连接
		ConnectionFactory connectionFactory = null;
		// MS 客户端到JMS Provider 的连接
		Connection connection = null;
		// 一个发送或接收消息的会话
		Session session = null;
		try {
			// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
			connectionFactory = new ActiveMQConnectionFactory(
					ActiveMQConnection.DEFAULT_USER,
					ActiveMQConnection.DEFAULT_PASSWORD,
					"tcp://localhost:61616");
			// 通过连接工厂创建连接
			connection = connectionFactory.createConnection();
			// 启动连接
			connection.start();
			// 创建Session
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return session;
	}

	/** 创建消息发送者 */
	public static MessageProducer createProducer(Session session) {
		// 消息的目的地;消息发送给谁.
		Destination destination = null;
		// 消息发送者
		MessageProducer producer = null;
		try {
			// 获取session注意参数值FirstQueue是一个服务器的queue,须在在ActiveMq的console配置
			destination = session.createQueue("FirstQueue");
			// 消息--发送者
			producer = session.createProducer(destination);
			// 设置不持久化,实际根据项目决定
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return producer;
	}

	/** 发送消息 */
	public static void sendMessage(Session session, MessageProducer producer,
			String content) {
		TextMessage textMessage = null;
		try {
			textMessage = session.createTextMessage(content);
			producer.send(textMessage);
			session.commit();// 提交
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	/** 测试 */
	public static void main(String[] args) {
		Session session = createSession();
		MessageProducer producer = createProducer(session);
		for (int i = 0; i < 5; i++) {
			String content = "Message---" + i;
			System.out.println(content);
			sendMessage(session, producer, content);
		}
	}
}

 2、接收端

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {

	/** 创建Session */
	public static Session createSession() {
		// 连接工厂,JMS 用它创建连接
		ConnectionFactory connectionFactory = null;
		// MS 客户端到JMS Provider 的连接
		Connection connection = null;
		// 一个发送或接收消息的会话
		Session session = null;
		try {
			// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
			connectionFactory = new ActiveMQConnectionFactory(
					ActiveMQConnection.DEFAULT_USER,
					ActiveMQConnection.DEFAULT_PASSWORD,
					"tcp://localhost:61616");
			// 通过连接工厂创建连接
			connection = connectionFactory.createConnection();
			// 启动连接
			connection.start();
			// 创建Session
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return session;
	}

	/** 创建消息接收者 */
	public static MessageConsumer createConsumer(Session session) {
		// 消息的目的地;消息发送给谁.
		Destination destination = null;
		// 消息接收者
		MessageConsumer consumer = null;
		try {
			// 获取session注意参数值FirstQueue是一个服务器的queue,须在在ActiveMq的console配置
			destination = session.createQueue("FirstQueue");
			// 得到消息生成者"发送者"
			consumer = session.createConsumer(destination);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return consumer;
	}

	/** 发送消息 */
	public static void receiverMessage(MessageConsumer consumer) {
		try {
			while (true) {
				// 设置接收者接收消息的时间
				TextMessage message = (TextMessage) consumer.receive(6 * 1000);
				if (null != message) {
					System.out.println("ReceiverMessage--" + message.getText());
				} else {
					System.out.println("break");
					break;
				}
			}

		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/** 测试 */
	public static void main(String[] args) {
		receiverMessage(createConsumer(createSession()));
	}
}

 5、测试结果

 



 
执行Sender:

Message---0

Message---1

Message---2

Message---3

Message---4

执行Receive:

ReceiverMessage--Message---0

ReceiverMessage--Message---1

ReceiverMessage--Message---2

ReceiverMessage--Message---3

ReceiverMessage--Message---4

 

 

  • 大小: 9.2 KB
  • 大小: 103.3 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics