`
somebody_hjh
  • 浏览: 180906 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

ActiveMQ分享(一)JMS简介

    博客分类:
  • Java
阅读更多
一、概述

  Message,即消息。人与人之间通过消息传递信息。言语、眼神、肢体动作都可被视为消息体。当然还有我们经常用到的邮件、短信。计算机系统也由消息来主导运行。每一条指令的执行,每一个数据包的传递。软件系统间的合作也不例外,消息告诉各个系统应该怎样协作。事件处理机制,也是消息传送的过程。消息无处不在。

  消息分为同步消息和异步消息。同步消息在接收到对方的返回前,需要挂起,直到返回或超时。异步消息只需要发送消息,不需要对方系统的立即反馈。

  同步消息如java RPC调用,同步调用依赖于被调用方,如果被调用方失败或网络错误,那么程序就没办法继续执行下去,造成一个系统最薄弱的环节依赖于对方系统。而多个系统通过同步调用方式耦合在一起的时候,那么可靠性取决于最薄弱的一方系统。而异步调用能增强一个系统的健壮性。当然,不是任何情况都适合异步调用,还是那句话,能异步的地方,尽量异步。

  异步消息,如同一个邮箱系统,我们把信件丢入邮桶,邮递员会更具上面的地址,送达到这封信要去的地方。邮箱和信件格式由邮局提供定义,比如邮箱需要有一个口子投递邮件,邮件需要有地址,邮政编码等等。而这些邮箱具体的加工和制作均交由各自的厂商来完成。
在java消息领域,我们也有一个称为消息中间件的东西,来提供这样一个服务。消息的发送、消费接口、消息体的格式等都由JMS来定义,而具体的实现由各个消息中间件厂商来实现。JMS是sun公司对于消息中间件的一个规范。对java领域里的消息起到举足轻重的作用。以前的消息交互,均各自实现一套格式,如同一个国家的人都用不同的方言跟另外来自不同省份的人交流一样。自从规范了普通话,我们的交流成本降低了。这也正如JMS规范在整个java消息领域的作用。

二、JMS简介

  JMS1.1规范定义了一些概念和一组API,可以使得在我们利用消息系统的过程中,不依赖于各个厂商的具体实现,便能写出消息代码。由于不依赖具体厂商实现,这样的代码有很好的移植性。
JMS1.1定义了的部分概念:
1、JMS客户端:接收或发送消息的java系统
2、JMS消息体:系统间发送的消息体
3、JMS提供商:JMS规范实现厂商
4、JMS管理对象:预先配置好的用于JMS客户端的JMS对象。如ConnectionFactory跟JMS服务端的连接工厂,Destination 接收和发送消息的目标地址等
5、JMS Domain:JMS定义了两种域模型,一种是PTP(point-to-point)即点对点消息传输模型,一种是pub/sub(publish-subscribe)即发布订阅模型。
  PTP通过一个先进先出的queue实现。很多人在这个PTP概念上有所误解,所谓点对点不是指生产者和消费者只有一个。PTP如下图所示:



  我们可以看到,一个或多个生产者发送消息,消息m2先抵达了queue,然后m1也发出了,并一同存在于一个先进先出的queue里面。消费者也存在一个或多个,对queue里的消息进行消费。但消息被随机的一个消费者消费且仅消费一次。

在pub/sub消息模型中,消息被广播给所有订阅者。如下图:




JMS1.1定义了如下接口:


JMS通用接口JMS PTP接口JMS pub/sub 接口描述
ConnectionFactoryQueueConnectionFactoryTopicConnectionFactory创建与JMS服务的连接
ConnectionQueueConnection TopicConnectionJMS服务的连接,可创建Session
DestinationQueueTopic发送接收消息的目标,Queue和Topic均可视为一种Destination
SessionQueueSessionTopicSession Session
MessageProducerQueueSender TopicPublisher 消息发送者
MessageConsumerQueueReceiver、QueueBrowserTopicSubscriber消息接收者/浏览者/订阅者



接口关系如下:





  可以看到一个JMS应用的发送端的标准流程是:创建连接工厂>创建连接>创建session>创建发送者>创建消息体>发送消息到Destination(queue或topic)。

  接收端则为:创建连接工厂>创建连接>创建session>创建接收者>创建消息监听器监听某Destination的消息>获取消息并执行业务逻辑

下面展示了JMS的编码模板:

发送端:

   
public class QueueSender {
        public static void main(String[] args) throws JMSException{
            ConnectionFactory factory=new ActiveMQConnectionFactory("tcp://localhost:61616");//这里为简便,依赖ActiveMQ的JMS实现。现实程序中可以通过jndi查找方式查找factory,这样彻底避免关联特定JMS实现者。并拥有最大的可移植性。
            Connection connection=factory.createConnection();
            Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            Message message=session.createTextMessage("Hello World!"+new Date().getTime());
            Queue queue=new ActiveMQQueue("queue.somebody");//同上,这里依赖了ActiveMQ的JMS实现,现实中可以采用jndi查找方式获得Queue。
            MessageProducer producer=session.createProducer(queue);
            producer.send(message);
            session.close();
            connection.close();
        }
    }


接收端:
       
public class QueueReceiver {
            public static void main(String[] args) throws JMSException{
                ConnectionFactory factory=new ActiveMQConnectionFactory("tcp://localhost:61616");
                Connection connection=factory.createConnection();
                Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
                Queue queue=new ActiveMQQueue("queue.somebody");
                MessageConsumer receiver=session.createConsumer(queue);//创建两个接收者,同时消费同一个queue的消息。queue里的消息派发且仅派发一次给唯一一个消费者。
                MessageConsumer receiver2=session.createConsumer(queue);
                receiver.setMessageListener(new QueueMessageListener("1"));
                receiver2.setMessageListener(new QueueMessageListener("2"));
                connection.start();
            }
        }


消息侦听者:
   
public class QueueMessageListener implements MessageListener{
    private String num;
    public QueueMessageListener(String num){
        super();
        this.num=num;
    }
    public QueueMessageListener(){
        super();
    }
    public void onMessage(Message message) {
        System.out.println(message.toString()+num);
    }


  我们安装了ApacheMQ之后,启动本地MQ服务,便能测试这几个类的执行方式,观察执行过程。我们可以发现消息仅被消费了一次,JMS提供商必须保证一条消息不会重复被消费,当然更不能丢失。

  在PTP下,我们建立一个queue以后,如果没有消费者连入该queue,那么消息生产者发送的消息将堵塞在queue里面,直到有消费者来消费。如果消费者出现故障,不再连入queue,消息还会保存在queue里面,等待下次消费者的连入,再进行消费。在pub/sub模型下,如果消费者出现故障,那么所有的消息,将不在为之保留,下次连接之时,只能消费此时生产者发送的消息。JMS于是定义了另一种订阅者,叫Durable Subscription(持久订阅者),该订阅者在第一次连上topic后,就注册了该topic的消息,消息会在消费者不可用的情况下为之保留消息,并在其再次连上topic后,重推消息给消费者。

  理解了JMS中的两种消息模型以后,我们来说说JMS中的消息体格式。JMS对消息体定为三个部分 1)head 消息头信息 2)properties 消息属性值 3)body 消息内容。如下图所示:



其含义大致解释如下:

头类型描述设置方
JMSDestination描述该消息发往的目的地在发送方法中设定
JMSDeliveryModeNON_PERSISTENT 非持久化 表示消息发往JMS消息服务器之后,保存在内存中,不做持久化;PERSISTENT 持久化 消息发往JMS消息服务器之后,持久化数据。以保证消息服务器拓机造成的消息丢失发送方法中设定
JMSExpiration 消息过期时间。消费者在发送消息时,可设定消息的time-to-live时间,如producer.setTimeToLive(10000),在消息发送后,该时间保留在消息的JMSExpiration字段中,如果在指定的这段时间内消息未被消费,该消息将会被丢弃。在发送方法中设定
JMSPriority 消息优先级,JMS把消息分为10个等级,0-4为普通优先级,5-9为加快优先级,ActiveMQ中默认的消息优先级为4在发送方法中设定
JMSMessageId消息唯一性ID,必须以“ID:”为前缀在发送方法中设定
JMSTimestamp消息发送时间,表示消息的发送时间点,而非传送时间在发送方法中设定
JMSCorrelationID客户端
JMSReplyTo客户端
JMSType客户端
JMSRedeliveredJMS提供商
属性值含义
properties
消息体含义
body消息体分为1:StreamMessage2:MapMessage 3:TextMessage 4:ObjectMessage 5:BytesMessage


再谈消息可靠性

    在上面,谈及消息体格式定义中,有个字段 JMSDeliveryMode用来表示该消息发送后,JMS提供商应该怎么处理消息。PERSISTENT(持久化)的消息在JMS服务器中持久化。接收端如果采用点对点的queue方式或者Durable Subscription(持久订阅者)方式,那么消息可保证只且只有一次被成功接收。NON_PERSISTENT(非持久化)的消息在JMS服务器关闭或宕机时,消息丢失。根据发送端和接收端采用的方式,列出如下可靠性表格,以作参考。

引用
注意:以下的可靠性不包括JMS服务器由于资源关系,造成的消息不能持久化等因素引起的不可靠(该类不可靠应该是JMS提供商或硬件引起的的资源和处理能力的极限问题,应该由管理人员解决)。也不包括消息由于超时时间造成的销毁丢失。

消息发送端消息接收端可靠性及因素
PERSISTENTqueue receiver/durable subscriber消费一次且仅消费一次。可靠性最好,但是占用服务器资源比较多。
PERSISTENTnon-durable subscriber最多消费一次。这是由于non-durable subscriber决定的,如果消费端宕机或其他问题导致与JMS服务器断开连接,等下次再联上JMS服务器时的一系列消息,不为之保留。
NON_PERSISTENTqueue receiver/durable subscriber最多消费一次。这是由于服务器的宕机会造成消息丢失
NON_PERSISTENTnon-durable subscriber最多消费一次。这是由于服务器的宕机造成消息丢失,也可能是由于non-durable subscriber的性质所决定


消息的通知确认
在客户端接收了消息之后,JMS服务怎样有效确认消息是否已经被客户端接收呢?Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);这段代码创建一个非事务性的session,并采用auto_acknowledge方式通知JMS服务器。如果采用事务性session时,通知会伴随session的commit/rollback同时发送通知。在我们采用非事务session时,有三种通知方式。
通知方式效果
DUPS_OK_ACKNOWLEDGE session延迟通知。如果JMS服务器宕机,会造成重复消息的情况。程序必须保证处理重复消息而不引起程序逻辑的混乱。
AUTO_ACKNOWLEDGE 当receive或MessageListener方法成功返回后自动通知。
CLIENT_ACKNOWLEDGE客户端调用消息的acknowledge方法通知




JMS就先简要的介绍到这里,详细资料请参看JMS1.1规范。关于ActiveMQ的文章文章请继续关注 ActiveMQ分享(二)




  • 大小: 26.9 KB
  • 大小: 29.2 KB
  • 大小: 26.6 KB
  • 大小: 5 KB
13
3
分享到:
评论
8 楼 yanjingan 2016-10-31  
通俗易懂 理解的很深 好文章!
7 楼 u012049463 2013-11-28  
谢谢,如果一开始就看这篇文章,又可以少走多少弯路。很具体也很通俗易懂,转走.../cy
6 楼 chenzhou123520 2013-07-28  
正准备学ActiveMQ这块,文章写得很详细啊,通俗易懂。感谢楼主分享,赞一个
5 楼 jingjiniao0118 2013-01-25  
很详尽,谢谢分享!!!
4 楼 763691 2013-01-11  
思路很清晰,谢谢分享
3 楼 coolinc 2012-12-03  
这个介绍好
2 楼 DDT_123456 2012-04-08  
感谢这么详尽的分享
1 楼 rockyeah 2010-08-23  
期待后续的分享.

相关推荐

    spring整合jms+activemq

    spring3.0整合了activemQ和jms可以发送信息和接收消息

    JMS之ActiveMQ工具类分享(关于同步回执和异步回执)(新)

    ActiveMQ工具类分享(关于同步回执和异步回执),本穷屌CSDN没币下载资源了,莫怪莫怪。后面会调整回来。。。 资源描述具体看文章啦。http://blog.csdn.net/kunloz520/article/details/78830656

    JMS简介与ActiveMQ实战代码分享

    主要介绍了JMS简介与ActiveMQ实战代码分享,具有一定借鉴价值,需要的朋友可以参考下

    SpringJMS源码

    ActiveMQ实现的JMS,跟大家分享下

    apache-activemq-5.16.0-bin-windows.zip----windows版用

    分享-windows版最新的apache-activemq-5.16.0 。 使用activeMQ来完成jms的发送,必须要下载activeMQ,然后再本机安装,并且启动activeMQ的服务才行。在官网下载完成之后,运行bin目录下面的activemq.bat,将activeMQ...

    详解spring boot整合JMS(ActiveMQ实现)

    本篇文章主要介绍了详解spring boot整合JMS(ActiveMQ实现),小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

    xmljava系统源码-high-end-technology::angry_face_with_horns:Sharesomeofthehigh-endtechnologiescomm

    是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。 实例 RocketMQ RcoketMQ 是一款低延迟、高可靠、可伸缩、易于...

    JAVA上百实例源码以及开源项目源代码

    5个目标文件,演示Address EJB的实现,创建一个EJB测试客户端,得到名字上下文,查询jndi名,通过强制转型得到Home接口,getInitialContext()函数返回一个经过初始化的上下文,用client的getHome()函数调用Home接口...

    JAVA上百实例源码以及开源项目

    百度云盘分享 简介 笔者当初为了学习JAVA,收集了很多经典源码,源码难易程度分为初级、中级、高级等,详情看源码列表,需要的可以直接下载! 这些源码反映了那时那景笔者对未来的盲目,对代码的热情、执着,对...

    商业vs开源MQ对比

    市场上目前主流的消息中间件有IBM MQ、WebLogic JMS、ActiveMQ、Rabbit MQ、Rocket MQ、Apollo等。他们的功能性差异往往成为企业在中间件选型过程中不得不考虑的因素,通过本文档,你可以详细了解商业MQ和开源MQ之间...

    分布式消息中间件-Rocketmq

    本文来自于csdn,文章分享了分布式消息中间件,主要基于JMS规范、Rocketmq的介绍、部署方式、特性的一些使用几大模块阐述。 今天要给大家分享的是分布式消息中间件。消息中间件主要是实现分布式系统中解耦、异步消息...

    fourinone-3.04.25

    Fourinone整体代码短小精悍,跟Hadoop, Zookeeper, Memcache, ActiveMq等开源产品代码上没有任何相似性,不需要任何依赖,引用一个jar包就可以嵌入式使用,良好支持window环境,可以在一台机器上模拟分布式环境,更...

    C#实现同Active MQ通讯的方法

    JMS 程序的最终目的是生产和消费的消息能被其他程序使用,JMS 的 Message 是一个既简单又不乏灵活性的基本格式,允许创建不同平台上符合非JMS 程序格式的消息。 Message 由消息头,属性和消息体三部份组成。 Active ...

    【白雪红叶】JAVA学习技术栈梳理思维导图.xmind

    ActiveMQ 常用开源框架 Spring Spring MVC Spring WebFlow spring tx aop ioc Struts ibatis Mybatis CAS Dubbo 工作能力 软实力 应急能力 创新能力 管理能力 分享能力 学习能力 沟通能力 ...

Global site tag (gtag.js) - Google Analytics