本文共 21405 字,大约阅读时间需要 71 分钟。
ActiveMQ提供消息队列服务。
ActiveMQ高可用由三部分组成。
两个运行的ActiveMQ instance如果同时使用一套持久化存储,那么这两个ActiveMQ instance就会构成master-slave关系。持久化数据放在一个单独的文件系统目录上或者放在一个共享的文件系统目录上,这个目录中会有一个lock锁文件。谁先启动ActiveMQ instance谁就会抢占这个锁,谁抢占了这个锁谁就是master,slave运行在standby状态,只有master服务停止或者中断后,slave就会立刻抢占这个锁,成为新的master,而另一个ActiveMQ instance启动后无法抢占这个锁,会以slave方式运行。
ActiveMQ的networkConnectors可以实现多个mq之间进行路由,假设有两个mq,分别为brokerA和brokerB,当有一条消息发送到brokerA的队列queueName中,brokerA中队列queueName的消息就会路由到brokerB的队列queueName上,反之brokerB的消息也会路由到brokerA。
networkConnectors可以配置成静态的(固定的)也可以配置成动态的,取决于ActiveMQ instance运行的网络环境。与其说是协议不如说是策略。failover传输是指在客户端连接列表中配置多个连接配置信息(称之为URIs list),当list中的某一个连接信息不可用时,就会尝试下一个连接信息,直到找到可用的连接信息。
ActiveMQ可以运行在主机内,也可以运行在虚拟机内、更可以运行在Docker中。但无论运行在哪种平台中,都需要考虑集群容错问题。例如集群节点应该尽可能地位于不同的host上,避免host发生故障导致整个集群不可用的情况。
根据公有云、私有云环境的不同和ActiveMQ实现集群方式的不同分为两种集群方案。在私有云环境下,网络是受自己控制的,ActiveMQ集群可以运行在同一个局域网内,利用局域网内特有的一些局域网协议实现集群,这些协议包括多播、VRRP等。多播能使一个或多个多播源只把数据包发送给特定的多播组,而只有加入该多播组的主机才能接收到数据包。一些分布在各处的进程需要以组的方式协同工作,组中的进程通常要给其他所有的成员发送消息。即有这样的一种方法能够给一些明确定义的组发送消息,这些组的成员数量虽然很多,但是与整个网络规模相比却很小。给这样一个组发送消息称为多点点播送,简称多播。多播技术可以用于自动发现,因此它能实现集群中的自动发现节点功能。借助多播技术,可以使用动态发现实现集群节点的自动添加和移除。
在公有云环境中,往往不是受自己控制的,多台云主机也可能不在同一个局域网内,此时就不能利用多播技术实现集群应用。除了利用多播的自动发现实现集群自动发现节点外,同样可以使用静态配置的方式,告诉集群内的每一个节点它的的对等节点有谁。当集群内的某个节点发生故障时,静态配置的节点信息可能发生改变,导致集群内存在一个或多个不可用的节点地址,从而导致集群对于客户端而言不可用。或者当集群内的某个节点故障恢复后,不能动态的告知它的对等节点它已恢复,同样可能造成集群对客户端而言不可用或者无法提供相应的服务标准。这种情况下可以使用一种特殊的方式配置集群与客户端的连接,虽然客户端的集群节点配置是静态的,但是客户端可以通过某种方式智能迅速的判断集群中的节点是否可用,从而实现高可用。
公有云环境+docker环境还需要考虑多主机环境下的容器间通信问题。# ActiveMQ references
# # # # # # # # # # # # SharedFile System Master Slave and Dynamic Discovery Clustering Design 192.168.1.241 server1.51devops.com activemq master,activemq cluster A 192.168.1.242 server2.51devops.com activemq slave 192.168.1.243 server3.51devops.com nfs server,activemq cluster B # add new disk to 192.168.1.243 fdisk /dev/sdb n p 1w
mkfs.xfs /dev/sdb1 mkdir -p /data mount /dev/sdb1 /data mkdir /data/ActivemqSharedBrokerData # install nfs on 192.168.1.243 yum -y install nfs-utils nfs-utils-lib chkconfig --levels 235 nfs on cat >/etc/exports<<eof /data 192.168.1.0/255.255.255.0(rw,no_root_squash,no_all_squash,sync) eof exportfs -a exportfs -r # /etc/init.d/rpcbind start /etc/init.d/nfslock start /etc/init.d/nfs restart # end install nfs on 192.168.1.243 # mount nfs on 192.168.1.241 192.168.1.242 yum -y install nfs-utils nfs-utils-lib mkdir /data mount -t nfs -o rw 192.168.1.243:/data /data # 192.168.1.243:/data on /data type nfs (rw,vers=4,addr=192.168.1.243,clientaddr=192.168.1.241) # install java yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel # install activemq wget -c gpg --import KEYS wget -c wget -c gpg --verify apache-activemq-5.13.1-bin.tar.gz.asc tar zxf apache-activemq-5.13.1-bin.tar.gz mv apache-activemq-5.13.1 /usr/local/activemq ls /usr/local/activemq cd /usr/local/activemq # end install activemq # on 192.168.1.241 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | < broker xmlns = "http://activemq.apache.org/schema/core" dataDirectory = "${activemq.data}" brokerName = "192.168.1.241" useJmx = "true" advisorySupport = "false" persistent = "true" deleteAllMessagesOnStartup = "false" useShutdownHook = "false" schedulerSupport = "true" > < networkConnectors > < networkConnector uri = "multicast://default" /> </ networkConnectors > < transportConnectors > < transportConnector name = "openwire" uri = "tcp://0.0.0.0:61618" discoveryUri = "multicast://default" /> </ transportConnectors > < persistenceAdapter > < kahaDB directory = "/data/ActivemqSharedBrokerData" /> </ persistenceAdapter > |
# end on 192.168.1.241
# on 192.168.1.242 vim conf/activemq.xml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | < broker xmlns = "http://activemq.apache.org/schema/core" dataDirectory = "${activemq.data}" brokerName = "192.168.1.242" useJmx = "true" advisorySupport = "false" persistent = "true" deleteAllMessagesOnStartup = "false" useShutdownHook = "false" schedulerSupport = "true" > < networkConnectors > < networkConnector uri = "multicast://default" /> </ networkConnectors > < transportConnectors > < transportConnector name = "openwire" uri = "tcp://0.0.0.0:61618" discoveryUri = "multicast://default" /> </ transportConnectors > < persistenceAdapter > < kahaDB directory = "/data/ActivemqSharedBrokerData" /> </ persistenceAdapter > |
# end on 192.168.1.242
# on 192.168.1.241 and 192.168.1.242 cd /usr/local/activemq bin/activemq start bin/activemq status bin/activemq stop true > data/activemq.log bin/activemq restart sleep 2 tail -n30 data/activemq.log # end on 192.168.1.241 and 192.168.1.242 # on 192.168.1.243 cd /usr/local/activemq vim conf/activemq.xml 1 2 3 4 5 6 7 8 9 10 11 | < broker xmlns = "http://activemq.apache.org/schema/core" dataDirectory = "${activemq.data}" brokerName = "192.168.1.243" useJmx = "true" advisorySupport = "false" persistent = "true" deleteAllMessagesOnStartup = "false" useShutdownHook = "false" schedulerSupport = "true" > < networkConnectors > < networkConnector uri = "multicast://default" /> </ networkConnectors > < transportConnectors > < transportConnector name = "openwire" uri = "tcp://0.0.0.0:61618" discoveryUri = "multicast://default" /> </ transportConnectors > |
bin/activemq start
bin/activemq status bin/activemq stop true > data/activemq.log bin/activemq restart sleep 2 tail -n30 data/activemq.log公有云环境的配置与私有云环境的差异就在必须将动态配置换成静态配置。
1 2 3 | < networkConnectors > < networkConnector uri = "static:(tcp://host1:61616,tcp://host2:61616,tcp://..)" /> </ networkConnectors > |
在Docker的加入后就有很大的不同了。首先Docker的配置文件(如/etc/hosts)中必须灵活的指定主机名称与IP地址的映射关系,这样使得ActiveMQ的配置文件更加灵活,不依赖于IP地址、具有更好的应用范围。其次要解决主机间容器通信的问题,每一个主机上运行的Docker容器之间必须能相互访问。
如果想使用Docker,建议使用Linux 3.10版本以上的内核的操作系统,这样的Linux内核在CentOS7、Ubuntu14以上都支持。 此处以CentOS7 1511为例,跨主机网络互通靠docker native plugin中的overlay实现。此方案中会用到一个kv存储,这个kv存储可以使用Consul、etcd等实现,此处用Consul实现。 yum -y update history -c && shutdown -r now uname -a # Linux localhost.localdomain 3.10.0-327.10.1.el7.x86_64 #1 SMP Tue Feb 16 17:03:50 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux # set FQDN hostname use edit file or hostname command or nmtui command vim /etc/hostname setenforce 0 # for service docker # Refer: # Refer: which curl 2>/dev/null || yum -y -q install curl curl -fsSL | gpg --import curl -fsSL | sh # for program docker-enter # Refer: which curl > /dev/null || apt-get -qq install -y curl # # cd /tmp; curl | tar -zxf-; cd util-linux-2.24; # cd /tmp; wget -q tar xzvf util-linux-2.24.tar.gz # cd util-linux-2.24 # ./configure --without-ncurses && make nsenter # cp nsenter '/usr/local/bin' which nsenter cd which wget 2>/dev/null || yum -y -q install wget wget -P ~ echo "[ -f ~/.bashrc_docker ] && . ~/.bashrc_docker" >> ~/.bashrc; source ~/.bashrc service docker start docker version rpm -ql docker-engine # if there are only two docker hosts commmunicated each other, then nameserver set to each other # if there are more than 3 docker hosts, then first node's nameserver set to last one, second set to first one, third set to second one vim /etc/resolv.conf vim /usr/lib/systemd/system/docker.service -H tcp://0.0.0.0:2376 -H unix:///var/run/docker.sock --cluster-store=consul://consul.service.dc1.consul.:8500 --cluster-advertise=eno16777728:2376 systemctl daemon-reload systemctl restart docker systemctl status docker -l docker network create -d overlay interconnection docker network ls # after this, start a docker container with "--net interconnection", then containers running on different hosts can communited each other. mkdir -p /data/docker/activemq/data mkdir -p /data/docker/activemq/data/kahadb mkdir -p /data/docker/activemq/log-master mkdir -p /data/docker/activemq/log-slave mkdir -p /data/docker/activemq/confvim /data/docker/activemq/conf/activemq.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | < beans xmlns = "http://www.springframework.org/schema/beans" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation = "http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd" > < bean class = "org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" > < property name = "locations" > < value >file:${activemq.conf}/credentials.properties</ value > </ property > </ bean > < bean id = "logQuery" class = "io.fabric8.insight.log.log4j.Log4jLogQuery" lazy-init = "false" scope = "singleton" init-method = "start" destroy-method = "stop" ></ bean > < broker xmlns = "http://activemq.apache.org/schema/core" dataDirectory = "${activemq.data}" brokerName = "localhost" useJmx = "true" advisorySupport = "false" persistent = "true" deleteAllMessagesOnStartup = "false" useShutdownHook = "false" > < networkConnectors > < networkConnector uri = "static:(tcp://server1-activemq-01-master:61616,tcp://server2-activemq-02-master:61616)" /> </ networkConnectors > < destinationPolicy > < policyMap > < policyEntries > < policyEntry topic = ">" > < pendingMessageLimitStrategy > < constantPendingMessageLimitStrategy limit = "1000" /> </ pendingMessageLimitStrategy > </ policyEntry > </ policyEntries > </ policyMap > </ destinationPolicy > < managementContext > < managementContext createConnector = "false" /> </ managementContext > < persistenceAdapter > < kahaDB directory = "/data/activemq/kahadb" /> </ persistenceAdapter > < systemUsage > < systemUsage > < memoryUsage > < memoryUsage percentOfJvmHeap = "70" /> </ memoryUsage > < storeUsage > < storeUsage limit = "100 gb" /> </ storeUsage > < tempUsage > < tempUsage limit = "50 gb" /> </ tempUsage > </ systemUsage > </ systemUsage > < transportConnectors > < transportConnector name = "openwire" uri = "tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" /> < transportConnector name = "amqp" uri = "amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600" /> < transportConnector name = "stomp" uri = "stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600" /> < transportConnector name = "mqtt" uri = "mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600" /> < transportConnector name = "ws" uri = "ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600" /> </ transportConnectors > < shutdownHooks > < bean xmlns = "http://www.springframework.org/schema/beans" class = "org.apache.activemq.hooks.SpringContextHook" /> </ shutdownHooks > </ broker > < import resource = "jetty.xml" /> </ beans > |
docker run --restart="always" --name='server1-activemq-01-master' --net interconnection -d --hostname=server1-activemq-01-master \
-e 'ACTIVEMQ_NAME=amqp-srv1-master' \ -e 'ACTIVEMQ_REMOVE_DEFAULT_ACCOUNT=true' \ -e 'ACTIVEMQ_ADMIN_LOGIN=admin' -e 'ACTIVEMQ_ADMIN_PASSWORD=your_password' \ -e 'ACTIVEMQ_WRITE_LOGIN=producer_login' -e 'ACTIVEMQ_WRITE_PASSWORD=producer_password' \ -e 'ACTIVEMQ_READ_LOGIN=consumer_login' -e 'ACTIVEMQ_READ_PASSWORD=consumer_password' \ -e 'ACTIVEMQ_JMX_LOGIN=jmx_login' -e 'ACTIVEMQ_JMX_PASSWORD=jmx_password' \ -e 'ACTIVEMQ_STATIC_TOPICS=topic1;topic2;topic3' \ -e 'ACTIVEMQ_STATIC_QUEUES=queue1;queue2;queue3' \ -e 'ACTIVEMQ_MIN_MEMORY=1024' -e 'ACTIVEMQ_MAX_MEMORY=4096' \ -e 'ACTIVEMQ_ENABLED_SCHEDULER=true' \ -v /data/docker/activemq/data:/data/activemq \ -v /data/docker/activemq/data/kahadb:/data/activemq/kahadb \ -v /data/docker/activemq/log-master:/var/log/activemq \ -v /data/docker/activemq/conf/activemq.xml:/opt/activemq/conf/activemq.xml \ -p 8161:8161 \ -p 61616:61616 \ -p 61613:61613 \ webcenter/activemqdocker run --restart="always" --name='server1-activemq-01-slave' --net interconnection -d --hostname=server1-activemq-01-slave \
-e 'ACTIVEMQ_NAME=amqp-srv1-slave' \ -e 'ACTIVEMQ_REMOVE_DEFAULT_ACCOUNT=true' \ -e 'ACTIVEMQ_ADMIN_LOGIN=admin' -e 'ACTIVEMQ_ADMIN_PASSWORD=your_password' \ -e 'ACTIVEMQ_WRITE_LOGIN=producer_login' -e 'ACTIVEMQ_WRITE_PASSWORD=producer_password' \ -e 'ACTIVEMQ_READ_LOGIN=consumer_login' -e 'ACTIVEMQ_READ_PASSWORD=consumer_password' \ -e 'ACTIVEMQ_JMX_LOGIN=jmx_login' -e 'ACTIVEMQ_JMX_PASSWORD=jmx_password' \ -e 'ACTIVEMQ_STATIC_TOPICS=topic1;topic2;topic3' \ -e 'ACTIVEMQ_STATIC_QUEUES=queue1;queue2;queue3' \ -e 'ACTIVEMQ_MIN_MEMORY=1024' -e 'ACTIVEMQ_MAX_MEMORY=4096' \ -e 'ACTIVEMQ_ENABLED_SCHEDULER=true' \ -v /data/docker/activemq/data:/data/activemq \ -v /data/docker/activemq/data/kahadb:/data/activemq/kahadb \ -v /data/docker/activemq/log-slave:/var/log/activemq \ -v /data/docker/activemq/conf/activemq.xml:/opt/activemq/conf/activemq.xml \ -p 8171:8161 \ -p 61626:61616 \ -p 61623:61613 \ webcenter/activemqrm -rf /data/docker/activemq
docker stop server1-activemq-01-master && docker rm server1-activemq-01-master docker stop server1-activemq-01-slave && docker rm server1-activemq-01-slave mkdir -p /data/docker/activemq/data mkdir -p /data/docker/activemq/data/kahadb mkdir -p /data/docker/activemq/log-master mkdir -p /data/docker/activemq/log-slave mkdir -p /data/docker/activemq/confdocker run --restart="always" --name='server2-activemq-02-master' -d --hostname=server2-activemq-02-master \
-e 'ACTIVEMQ_NAME=amqp-srv2-master' \ -e 'ACTIVEMQ_REMOVE_DEFAULT_ACCOUNT=true' \ -e 'ACTIVEMQ_ADMIN_LOGIN=admin' -e 'ACTIVEMQ_ADMIN_PASSWORD=your_password' \ -e 'ACTIVEMQ_WRITE_LOGIN=producer_login' -e 'ACTIVEMQ_WRITE_PASSWORD=producer_password' \ -e 'ACTIVEMQ_READ_LOGIN=consumer_login' -e 'ACTIVEMQ_READ_PASSWORD=consumer_password' \ -e 'ACTIVEMQ_JMX_LOGIN=jmx_login' -e 'ACTIVEMQ_JMX_PASSWORD=jmx_password' \ -e 'ACTIVEMQ_STATIC_TOPICS=topic1;topic2;topic3' \ -e 'ACTIVEMQ_STATIC_QUEUES=queue1;queue2;queue3' \ -e 'ACTIVEMQ_MIN_MEMORY=1024' -e 'ACTIVEMQ_MAX_MEMORY=4096' \ -e 'ACTIVEMQ_ENABLED_SCHEDULER=true' \ -v /data/docker/activemq/data:/data/activemq \ -v /data/docker/activemq/data/kahadb:/data/activemq/kahadb \ -v /data/docker/activemq/log-master:/var/log/activemq \ -v /data/docker/activemq/conf/activemq.xml:/opt/activemq/conf/activemq.xml \ -p 8161:8161 \ -p 61616:61616 \ -p 61613:61613 \ webcenter/activemqdocker run --restart="always" --name='server2-activemq-02-slave' -d --hostname=server2-activemq-02-slave \
-e 'ACTIVEMQ_NAME=amqp-srv2-slave' \ -e 'ACTIVEMQ_REMOVE_DEFAULT_ACCOUNT=true' \ -e 'ACTIVEMQ_ADMIN_LOGIN=admin' -e 'ACTIVEMQ_ADMIN_PASSWORD=your_password' \ -e 'ACTIVEMQ_WRITE_LOGIN=producer_login' -e 'ACTIVEMQ_WRITE_PASSWORD=producer_password' \ -e 'ACTIVEMQ_READ_LOGIN=consumer_login' -e 'ACTIVEMQ_READ_PASSWORD=consumer_password' \ -e 'ACTIVEMQ_JMX_LOGIN=jmx_login' -e 'ACTIVEMQ_JMX_PASSWORD=jmx_password' \ -e 'ACTIVEMQ_STATIC_TOPICS=topic1;topic2;topic3' \ -e 'ACTIVEMQ_STATIC_QUEUES=queue1;queue2;queue3' \ -e 'ACTIVEMQ_MIN_MEMORY=1024' -e 'ACTIVEMQ_MAX_MEMORY=4096' \ -e 'ACTIVEMQ_ENABLED_SCHEDULER=true' \ -v /data/docker/activemq/data:/data/activemq \ -v /data/docker/activemq/data/kahadb:/data/activemq/kahadb \ -v /data/docker/activemq/log-slave:/var/log/activemq \ -v /data/docker/activemq/conf/activemq.xml:/opt/activemq/conf/activemq.xml \ -p 8171:8161 \ -p 61626:61616 \ -p 61623:61613 \ webcenter/activemqrm -rf /data/docker/activemq
docker stop server2-activemq-02-master && docker rm server2-activemq-02-master docker stop server2-activemq-02-slave && docker rm server2-activemq-02-slavecat /data/docker/activemq/log-master/activemq.log
cat /data/docker/activemq/log-slave/activemq.log客户端连接ActiveMQ集群
# About: "Caught: javax.jms.JMSSecurityException: User name [xxx] or password is invalid. javax.jms.JMSSecurityException: User name [xxx] or password is invalid."
# Refer to # At last, I find that only set "-e 'ACTIVEMQ_ADMIN_LOGIN=yourName' -e 'ACTIVEMQ_ADMIN_PASSWORD=yourPassword' \" like this self can login success, then I got a success!Client connection example:
1 | ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "admin" , "your_password" , "failover:(tcp://192.168.1.241:61616,tcp://192.168.1.242:61616)?randomize=false" ); |
完整的例子如下:
activemq hello world writen with java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; /** * Hello world! */ public class activemq5Failover { public static void main(String[] args) throws Exception { thread( new HelloWorldProducer(), false ); thread( new HelloWorldProducer(), false ); thread( new HelloWorldConsumer(), false ); Thread.sleep( 1000 ); thread( new HelloWorldConsumer(), false ); thread( new HelloWorldProducer(), false ); thread( new HelloWorldConsumer(), false ); thread( new HelloWorldProducer(), false ); Thread.sleep( 1000 ); thread( new HelloWorldConsumer(), false ); thread( new HelloWorldProducer(), false ); thread( new HelloWorldConsumer(), false ); thread( new HelloWorldConsumer(), false ); thread( new HelloWorldProducer(), false ); thread( new HelloWorldProducer(), false ); Thread.sleep( 1000 ); thread( new HelloWorldProducer(), false ); thread( new HelloWorldConsumer(), false ); thread( new HelloWorldConsumer(), false ); thread( new HelloWorldProducer(), false ); thread( new HelloWorldConsumer(), false ); thread( new HelloWorldProducer(), false ); thread( new HelloWorldConsumer(), false ); thread( new HelloWorldProducer(), false ); thread( new HelloWorldConsumer(), false ); thread( new HelloWorldConsumer(), false ); thread( new HelloWorldProducer(), false ); } public static void thread(Runnable runnable, boolean daemon) { Thread brokerThread = new Thread(runnable); brokerThread.setDaemon(daemon); brokerThread.start(); } public static class HelloWorldProducer implements Runnable { public void run() { try { // Create a ConnectionFactory // Refer: http://activemq.apache.org/maven/apidocs/org/apache/activemq/ActiveMQConnectionFactory.html ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "admin" , "your_password" , "failover:(tcp://192.168.1.241:61616,tcp://192.168.1.242:61616)?randomize=false" ); // Create a Connection Connection connection = connectionFactory.createConnection(); connection.start(); // Create a Session Session session = connection.createSession( false , Session.AUTO_ACKNOWLEDGE); // Create the destination (Topic or Queue) Destination destination = session.createQueue( "TEST.FOO" ); // Create a MessageProducer from the Session to the Topic or Queue MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // Create a messages String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this .hashCode(); TextMessage message = session.createTextMessage(text); // Tell the producer to send the message System.out.println( "Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName()); producer.send(message); // Clean up session.close(); connection.close(); } catch (Exception e) { System.out.println( "Caught: " + e); e.printStackTrace(); } } } public static class HelloWorldConsumer implements Runnable, ExceptionListener { public void run() { try { // Create a ConnectionFactory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "admin" , "your_password" , "failover:(tcp://192.168.1.241:61616,tcp://192.168.1.242:61616)?randomize=false" ); // Create a Connection Connection connection = connectionFactory.createConnection(); connection.start(); connection.setExceptionListener( this ); // Create a Session Session session = connection.createSession( false , Session.AUTO_ACKNOWLEDGE); // Create the destination (Topic or Queue) Destination destination = session.createQueue( "TEST.FOO" ); // Create a MessageConsumer from the Session to the Topic or Queue MessageConsumer consumer = session.createConsumer(destination); // Wait for a message Message message = consumer.receive( 1000 ); if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println( "Received: " + text); } else { System.out.println( "Received: " + message); } consumer.close(); session.close(); connection.close(); } catch (Exception e) { System.out.println( "Caught: " + e); e.printStackTrace(); } } public synchronized void onException(JMSException ex) { System.out.println( "JMS Exception occured. Shutting down client." ); } } } |
tag:activemq clustering,ActiveMQ 集群,ActiveMQ 负载均衡,ActiveMQ 主备,ActiveMQ 高可用
--end--本文转自 urey_pp 51CTO博客,原文链接:http://blog.51cto.com/dgd2010/1749983,如需转载请自行联系原作者