本篇主要粗略介绍程序在启动时,连接mq做了哪些操作,你可能需要先自己阅读一遍源码,再来看本篇文章,或对照源码看本篇文章
1 抛开spring,创建一个简单生产者
1.1 安装windows版activemq
- 下载地址:http://activemq.apache.org/download.html
- 解压,启动bin\win64\activemq.bat
- 访问:http://localhost:8161/
1.2 生产者
pom.xml
1 2 3 4 5
| <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.4</version> </dependency>
|
Producter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public class Producter { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory; Connection connection; Session session; public void init() { try { connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(true, Session.SESSION_TRANSACTED); } catch (JMSException e) { e.printStackTrace(); } } public void sendMessage(String disname, String content) { try { Queue queue = session.createQueue(disname); MessageProducer messageProducer = session.createProducer(queue); TextMessage msg = session.createTextMessage(content); System.out.println(content); messageProducer.send(msg); session.commit(); } catch (JMSException e) { session.rollback(); e.printStackTrace(); } } }
|
测试
1 2 3 4 5
| public static void main(String[] args) { Producter producter = new Producter(); producter.init(); producter.sendMessage("test", "interest"); }
|
2 分析init()方法
2.1 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
该语句只是将USERNAME, PASSWORD, BROKEN_URL设置到ActiveMQConnectionFactory的变量里面。
2.2 connection = connectionFactory.createConnection();
createConnection()方法只是单纯调用createActiveMQConnection()方法,createActiveMQConnection的源码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { if (brokerURL == null) { throw new ConfigurationException("brokerURL not set."); } ActiveMQConnection connection = null; try { Transport transport = createTransport(); connection = createActiveMQConnection(transport, factoryStats);
connection.setUserName(userName); connection.setPassword(password);
configureConnection(connection);
transport.start();
if (clientID != null) { connection.setDefaultClientID(clientID); } return connection; } catch (JMSException e) { connection.close(); throw e; } catch (Exception e) { connection.close(); throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e); } }
|
2.2.1 创建Transport,createTransport()
粗略介绍:createTransport(),该方法首先从brokerURL中,取出scheme,scheme就是写在brokerUrl中的tcp、auto等关键字,然后根据scheme在一个保存了TransportFactory的ConcurrentMap中查找TransportFactory,没有就新new一个TransportFactory保存到ConcurrentMap中。然后调用TransportFactory.doConnect(),根据brokerUrl中的参数返回一个Transport的对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| protected Transport createTransport() throws JMSException { try { URI connectBrokerUL = brokerURL; String scheme = brokerURL.getScheme(); if (scheme == null) { throw new IOException("Transport not scheme specified: [" + brokerURL + "]"); } if (scheme.equals("auto")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp")); } else if (scheme.equals("auto+ssl")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl")); } else if (scheme.equals("auto+nio")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio")); } else if (scheme.equals("auto+nio+ssl")) { connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl")); } return TransportFactory.connect(connectBrokerUL); } catch (Exception e) { throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e); } }
|
2.2.1.1 根据scheme在Map中取出TransportFactory,获得Transport对象
1 2 3 4 5 6
| public static Transport connect(URI location) throws Exception { TransportFactory tf = findTransportFactory(location); return tf.doConnect(location); }
|
2.2.1.1.1 从ConcurrentMap中获取TransportFactory
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public static TransportFactory findTransportFactory(URI location) throws IOException { String scheme = location.getScheme(); if (scheme == null) { throw new IOException("Transport not scheme specified: [" + location + "]"); } TransportFactory tf = TRANSPORT_FACTORYS.get(scheme); if (tf == null) { try { tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme); TRANSPORT_FACTORYS.put(scheme, tf); } catch (Throwable e) { throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e); } } return tf; }
|
2.2.1.1.2 从TransportFactory返回Transport对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public Transport doConnect(URI location) throws Exception { try { Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); if( !options.containsKey("wireFormat.host") ) { options.put("wireFormat.host", location.getHost()); } WireFormat wf = createWireFormat(options); Transport transport = createTransport(location, wf); Transport rc = configure(transport, wf, options); IntrospectionSupport.extractProperties(options, "auto.");
if (!options.isEmpty()) { throw new IllegalArgumentException("Invalid connect parameters: " + options); } return rc; } catch (URISyntaxException e) { throw IOExceptionSupport.create(e); } }
|
2.2.1.1.2.1 创建Transport对象
1 2 3 4
| protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException, UnknownHostException, IOException { throw new IOException("createTransport() method not implemented!"); }
|
2.2.1.1.2.2 createTransport()由子类实现,TcpTransportFactory
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Override protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { URI localLocation = null; String path = location.getPath(); if (path != null && path.length() > 0) { int localPortIndex = path.indexOf(':'); try { Integer.parseInt(path.substring(localPortIndex + 1, path.length())); String localString = location.getScheme() + ":/" + path; localLocation = new URI(localString); } catch (Exception e) { LOG.warn("path isn't a valid local location for TcpTransport to use", e.getMessage()); if(LOG.isDebugEnabled()) { LOG.debug("Failure detail", e); } } } SocketFactory socketFactory = createSocketFactory(); return createTcpTransport(wf, socketFactory, location, localLocation); }
|
2.2.2 由transport和状态管理器创建连接
粗略介绍:createActiveMQConnection()字面意思就是创建一个activeMQ的连接。这个连接是根据transport,和一个连接状态管理器JMSStats创建的,源码如下。
1 2 3 4 5 6 7 8 9
| protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), getConnectionIdGenerator(), stats); return connection; }
|
*2.2.2.1 JMSStatsImpl *
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| public class JMSStatsImpl extends StatsImpl { private List connections; public JMSStatsImpl() { connections = new CopyOnWriteArrayList(); } public JMSConnectionStatsImpl[] getConnections() { Object connectionArray[] = connections.toArray(); int size = connectionArray.length; JMSConnectionStatsImpl answer[] = new JMSConnectionStatsImpl[size]; for(int i = 0; i < size; i++) { ActiveMQConnection connection = (ActiveMQConnection)connectionArray[i]; answer[i] = connection.getConnectionStats(); }
return answer; } public void addConnection(ActiveMQConnection connection) { connections.add(connection); } public void removeConnection(ActiveMQConnection connection) { connections.remove(connection); } public void dump(IndentPrinter out) { out.printIndent(); out.println("factory {"); out.incrementIndent(); JMSConnectionStatsImpl array[] = getConnections(); for(int i = 0; i < array.length; i++) { JMSConnectionStatsImpl connectionStat = array[i]; connectionStat.dump(out); }
out.decrementIndent(); out.printIndent(); out.println("}"); out.flush(); } public void setEnabled(boolean enabled) { super.setEnabled(enabled); JMSConnectionStatsImpl stats[] = getConnections(); int size = stats.length; for(int i = 0; i < size; i++) stats[i].setEnabled(enabled); } }
|
2.2.2.2 ActiveMQConnection的构造
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
this.transport = transport; this.clientIdGenerator = clientIdGenerator; this.factoryStats = factoryStats;
executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport); return thread; } }); String uniqueId = connectionIdGenerator.generateId(); this.info = new ConnectionInfo(new ConnectionId(uniqueId)); this.info.setManageable(true); this.info.setFaultTolerant(transport.isFaultTolerant()); this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
this.transport.setTransportListener(this);
this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection); this.factoryStats.addConnection(this); this.timeCreated = System.currentTimeMillis(); this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant()); }
|
2.3 session = connection.createSession(true, Session.SESSION_TRANSACTED);
该方法返回一个seesion,该session用于操作activemq,该方法先做了保护性校验查看连接是否关闭,如果没有,那么将ConnectionInfo发送给broker,没有异常情况下,new 一个 Session对象返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Override public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { checkClosedOrFailed(); ensureConnectionInfoSent(); if (!transacted) { if (acknowledgeMode == Session.SESSION_TRANSACTED) { throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session"); } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) { throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " + "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)"); } } return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : acknowledgeMode, isDispatchAsync(), isAlwaysSessionAsync()); }
|
3 分析sendMessage()方法
这个方法基本上就属于业务层了,你需要关心的是Queue,Topic。
可参考点击查看