2016年4月19日星期二

Marathonn_008:2016金山岭长城越野半程马拉松

我们奔跑在公路上
我们行走在长城上
我们穿行在半山间

天是那么的蓝
云是那么的闲

古老的城墙就那么倔强的挺立着,像男人的胸膛
朔硬的北风打在墙上
也打在那些帅气的脸庞上

每一个垛口都曾有一个士兵伫立
每一个垛口都有一个深情的遥望

又高又陡的台阶
再也抬不起来的双腿

一个声音在耳边响起:
“加油,前面就是戚继光”
哦,戚继光,向你致敬
我愿做你的一名士兵

弯下腰
匍匐前进
前进
前进












2016年4月9日星期六

ActiveMQ_028:使用临时队列实现 Request-Response 通信

环境:OS X EI Capitan 10.11.4 + ActiveMQ 5.13

实际情况中,很多应用需要请求-问答方式的通讯方式。请求-应答方式并不是JMS规范系统默认提供的通信方式,而是通过在现有通信方式的基础上稍微运用一点技巧实现的。
下图是典型的请求-应答方式的交互过程:

1. TemporaryQueue_Client_AutoAck_NonPeresistent.java

package com.travelsky.activemq;

import java.util.Random;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TemporaryQueue_Client_AutoAck_NonPeresistent implements MessageListener {

    private static final int SEND_NUMBER = 5;

    // ConnectionFactory:连接工厂,JMS用它创建连接
    private ConnectionFactory connectionFactory;
    // Connection:JMS客户端到JMS Provider的连接
    private Connection connection = null;
    // Session:一个发送或接收消息的线程
    private Session session;
    // Destination:消息的目的地
    private Destination destination;
    // Destination: 临时消息目的地
    private Destination tempDest;
    // MessageProducer: 消息生产者
    private MessageProducer producer;
    // MessageConsumer: 响应消息消费者
    private MessageConsumer responseConsumer;

    public TemporaryQueue_Client_AutoAck_NonPeresistent() {

        //构造ConnectionFactory实例对象,此处采用ActiveMq的实现
        connectionFactory = new ActiveMQConnectionFactory(
                "admin",
                "admin",
                "failover:(tcp://127.0.0.1:61616)");

        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("Client.MessageQueue");
           
            producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
           
            // This class will handle the messages to the temp queue as well
            tempDest = session.createTemporaryQueue();
            responseConsumer = session.createConsumer(tempDest);
            responseConsumer.setMessageListener(this);
            sendMessage(session, producer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void sendMessage(Session session, MessageProducer producer)
            throws Exception {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session
                    .createTextMessage("MyProtocolMessage");
            message.setJMSReplyTo(tempDest);
            String correlationId = this.createRandomString();
            message.setJMSCorrelationID(correlationId);
            //Thread.sleep(5000);
            System.out.println("发送消息:" + "MyProtocolMessage");
            producer.send(message);
        }
    }

    @Override
    public void onMessage(Message message) {
        String messageText = null;
        try {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                messageText = textMessage.getText();
                System.out.println("messageText = " + messageText);
            }
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }
   
    private String createRandomString() {
        Random random = new Random(System.currentTimeMillis());
        long randomLong = random.nextLong();
        return Long.toHexString(randomLong);
    }

    public static void main(String[] args) {
        new TemporaryQueue_Client_AutoAck_NonPeresistent();
    }
}

2. TemporaryQueue_Server_AutoAck_NonPeresistent.java

package com.travelsky.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TemporaryQueue_Server_AutoAck_NonPeresistent implements MessageListener {

    private static final int SEND_NUMBER = 5;

    // ConnectionFactory:连接工厂,JMS用它创建连接
    private ConnectionFactory connectionFactory;
    // Connection:JMS客户端到JMS Provider的连接
    private Connection connection = null;
    // Session:一个发送或接收消息的线程
    private Session session;
    // Destination:消息的目的地
    private Destination destination;
    // Destination: 临时消息目的地
    private Destination tempDest;
    // MessageProducer: 反馈消息生产者
    private MessageProducer replyProducer;
    private MessageProtocol messageProtocol;

    public TemporaryQueue_Server_AutoAck_NonPeresistent() {

        //构造ConnectionFactory实例对象,此处采用ActiveMq的实现
        connectionFactory = new ActiveMQConnectionFactory(
                "admin",
                "admin",
                "failover:(tcp://127.0.0.1:61616)");
       
        try {
            this.messageProtocol = new MessageProtocol();

            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("Client.MessageQueue");

            replyProducer = session.createProducer(null);
            replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            //Set up a consumer to consume messages off of the admin queue
            MessageConsumer consumer = this.session.createConsumer(destination);
            consumer.setMessageListener(this);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onMessage(Message message) {
        try {
            TextMessage response = this.session.createTextMessage();
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage) message;
                String messageText = txtMsg.getText();
                response.setText(this.messageProtocol.handleProtocolMessage(messageText));
            }

            //Set the correlation ID from the received message to be the correlation id of the response message
            //this lets the client identify which message this is a response to if it has more than
            //one outstanding message to the server
            response.setJMSCorrelationID(message.getJMSCorrelationID());

            //Send the response to the Destination specified by the JMSReplyTo field of the received message,
            //this is presumably a temporary queue created by the client
            this.replyProducer.send(message.getJMSReplyTo(), response);
            System.out.println("replyMessageText = " + response.getText());
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }

    public static void main(String[] args) {
        new TemporaryQueue_Server_AutoAck_NonPeresistent();
    }
}

3. MessageProtocol.java

package com.travelsky.activemq;

public class MessageProtocol {
    public String handleProtocolMessage(String messageText) {
        String responseText;
        if ("MyProtocolMessage".equalsIgnoreCase(messageText)) {
            responseText = "I recognize your protocol message";
        } else {
            responseText = "Unknown protocol message: " + messageText;
        }
        
        return responseText;
    }
}

参考文献:
1. http://www.csdn123.com/html/exception/567/567525_567522_567528.htm
2. http://www.360doc.com/content/09/0712/21/18042_4241975.shtml
3. http://stackoverflow.com/questions/4722022/activemq-sending-a-message-to-a-temporary-queue-specified-using-a-string-nms
4. http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html

2016年4月7日星期四

ActiveMQ_027:LevelDB 原理

1. LevelDB 存储
LevelDB 是一个基于文件的消息持久化方式。经过优化后,它比 KahaDB 更快。与 KahaDB 原理类似,不过它使用的是一个定制化的 B-Tree 去索引 write ahead 日志文件,它提供了一些属性,用以提高“append only”追加模式的性能。
(1)Fast updates (No need to do random disk updates)
(2)Concurrent reads
(3)Fast index snapshots using hard links

KahaDB 和 LevelDB 都定期进行文件扫描,搜集可以删除的消息日志文件,然后再将其删除。使用KahaDB时,当消息日志文件很多时,搜集行为可能会导致消息的读写“失速”。而 LevelDB 使用了更简单的算法来判断哪个消息日志文件可以被删除,从而避免了消息的读写“失速”。

2. LevelDB 属性介绍
(1)directory 存储消息日志文件的目录,默认值:LevelDB。
(2)sync 如果设为 false, 将不同步进行写入操作到磁盘,默认值:true。
(3)logSize 最大消息日志文件大小,默认值:104857600 (100 MB)
(4)verifyChecksums 如果设为 true,将强制检查 checksum,默认值:false。
(5)paranoidChecks 发现文件内部有损坏时立即报错,默认值:false。
(6)indexFactory 用来创建 LevelDB indexes 的工厂类,默认值:org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory
(7)indexMaxOpenFiles 索引可以打开的最大文件数量,默认值:1000。
(8)indexBlockRestartInterval Number keys between restart points for delta encoding of keys,默认值:16。
(9)indexWriteBufferSize 可以放到缓存中的索引大小,超过此值,将会写入磁盘,默认值:6291456 (6 MB)。
(10)indexBlockSize The size of index data packed per block,默认值:4096 (4 K)。
(11)indexCacheSize The maximum amount of off-heap memory to use to cache index blocks,默认值:268435456 (256 MB)。
(12)indexCompression index blocks 的压缩类型,可以为 snappy 或 none,默认值:snappy。
(13)logCompression 消息日志文件的压缩类型,可以为 snappy 或 none,默认值:none。

参考文献:
1. http://activemq.apache.org/leveldb-store.html

2016年4月1日星期五

ActiveMQ_026:KahaDB 原理

1. KahaDB 存储
KahaDB 是一个内嵌的、事务型的消息存储方式,快速而可靠。
消息以数据日志文件的形式保存在磁盘上,当一个数据日志中的所有的消息都被消费了的时候,该数据日志文件将被标记 deletable。
在下一次清理行动中,该数据日志文件将被删除或者归档。
KahaDB 支持多个 slave,支持动态增加 slave。
KahaDB 支持使用 ZooKeeper 选举 Leader。

2. KahaDB 架构
KahaDB 的架构是按照可以满足高性能的消息读取而设计的,数据日志文件按照大小滚动,消息是以 append 方式追加到文件结尾处,所以写的速度很快。为了快速的获取消息,使用了一个 B-tree index,其中包含了所有消息的指针。完整的 B-tree index 存储在磁盘上,根据情况,部分或全部的 B-tree index 会加载到内存中。
数据日志文件包含所有持久化后的消息以及命令(比如事务边界、消息删除等等),每个数据日志文件大小是固定的,到达最大值后,会自动生成一个新的数据日志文件。日志文件中的所有消息的状态将被跟踪,当所有的消息都被消费后,该数据日志文件将被删除或者归档。
缓存中临时保存了活跃消费者将要消费的消息,如果活跃消费者存在,消息将会直接派发给与消费者关联的队列,同时也准备把消息写入数据日志文件。如果消费者承认收到消息的速度足够快,消息甚至没有写入数据日志文件。
B-tree index 为 Queue 和 Durable Topic 保持了先进先出的数据结构。
redo log 只有在 Broker 非正常停止时才会使用,用来保证 B-tree index 的完整性。

当 Broker 正常停止时, KahaDB 会将所有的缓存数据刷到文件系统中。主要包括:
(1)所有未处理的消息数据
(2)所有缓存的 Metadata 数据
并且保证 Metadata Store 中的数据与 Data Logs 中的数据一致性。

当 Broker 异常终止时,由于 Metadata Cache 中的索引信息是周期性地更新到 Metadat Store 中的,可能 Data Logs 中的某些数据在 Metadata Store 中没有对应的索引。
KahaDB 在恢复时会首先读取 Metadata Store 中的数据,然后再读取 Data Logs 中有但是在 Metadata Store 中不存在的数据,然后再根据这些数据重新在 Metadata Store 中建立索引信息。
当 Broker 异常终止时,KahaDB 会在更新 Metadata Store 之前,保存更新操作的概要信息到重做日志(db.redo)中。因为重做日志非常小,所以能在 Broker 异常终止时快速写入。当系统恢复时会判断重做日志中的信息是否需要更新到 Metadata Store 中。

如果 Metadata Store 中的数据被不可挽回的损坏了,可以删除 Metadata Store文件(db.data)来强制恢复。不过这个时候,Broker 会读取所有的 Data Logs 文件来重建 Metadata Store,需要一段比较长的时间。

KahaDB 可以检测是否有数据日志文件丢失,如果有丢失,默认将会抛出一个异常然后关闭。便于管理员调查丢失的数据日志文件,并手动还原。
可以通过设置 ignoreMissingJournalfiles=true,让 Broker 在启动时忽略这些丢失的数据日志文件。
可以通过设置 checkForCorruptJournalFiles=true,让 Broker 在启动时检测数据日志文件的完整性。

3. KahaDB 目录结构


3.1 db-*.log
用来存放完整的每条消息(包括事务、目的地、id、优先级、具体内容等)和producerSequenceIdTracker (用来验证每个消息生成者发送的消息是否重复的数据结构)。它随着消息数量的增多,默认每 32M 一个文件(可以修改默认大小),文件名按照数字进行编号,如db-1.log、db-2.log、db-3.log。
当一条消息的大小超过数据日志文件大小时,要修改数据文件大小。



3.2 db.data
包含多个 B-tree index 实例,作用分别如下:
(1)Metadata 类的 destinations
用来保存该 Broker上有哪些 Queue 或 Topic。
(2)StoredDestination 类的 orderIndex 属性中的 defaultPriorityIndex、lowPriorityIndex、highPriorityIndex
这三个属性是为消息优先级排序而设计的,它们的主要作用是为 AbstractStoreCursor 类的 doFillBatch 方法服务的,也就是常说的消息指针(message cursors)。当消息指针需要从磁盘文件中装载一批消息的时候会使用这三个属性。
(3)StoredDestination 类的 locationIndex
该btree的主要作用包括:
系统重启进行恢复操作的时候,要移除掉不在db-*.log文件里的消息;
在系统进行定时checkpointUpdate时使用
(4)StoredDestination类的messageIdIndex
主要作用是消息确认 acknowledge 操作时,通过消息ID在 messageIdIndex 中删除对应的记录,并依据返回的值删除orderIndex和locationIndex中的记录。


3.3 db.redo
主要作用是 “Double Write”,具体代码参看 PageFile类的 writeBatch 方法。它的原理可参考(http://www.mysqlperformanceblog.com/2006/08/04/innodb-double-write/)
 


3.4 db.free
当前 db.data 文件里哪些页面是空闲的,文件具体内容是所有空闲页的ID。

4. KahaDB 属性说明
(1)directory 保存 Message Store 数据文件的目录,默认值:activemq-data。
(2)indexWriteBatchSize 批量更新索引的阀值,当要更新的索引到达这个索引时,批量更新到 Metadata Store中,默认值:1000。
(3)indexCacheSize    指定metadata cache的大小,默认值:10000。
(4)enableIndexWriteAsync 写入索引文件到 Metadata Store 中的方式是否采用异步写入,默认值:false。
(5)journalMaxFileLength    消息持久数据文件的大小,默认值:32M。
(6)enableJournalDiskSyncs    如果为true,保证使用同步写入的方式持久化消息到数据日志文件中,默认值:true。
(7)cleanupInterval 清除(清除或归档)不再使用的数据日志文件的时间周期(毫秒),默认值:30000。
(8)checkpointInterval    写入索引信息到 Metadata Store 中的时间周期(毫秒),默认值:5000。
(9)ignoreMissingJournalfiles 是否忽略丢失的数据日志文件。如果为 false,当丢失了数据日志文件时,Broker 启动时会抛异常并关闭,默认值:false。
(10)checkForCorruptJournalFiles    如果为 true,Broker 在启动的时候会检测数据日志文件是否损坏,若损坏便尝试恢复它,默认值:false。
(11)checksumJournalFiles 如果为true,KahaDB 为数据日志文件产生一个 checksum,以便能够检测数据日志文件是否损坏,默认值:false。
(12)archiveDataLogs    如果为 true,当达到 cleanupInterval 周期时,会归档数据日志文件而不是删除,默认值:false。
(13)directoryArchive 指定归档数据日志文件存放的路径,默认值:null。
(14)databaseLockedWaitDelay    在使用主从数据库备份时,等待获取数据库上的锁的延迟时间,默认值:10000。
(15)maxAsyncJobs 等待写入数据日志文件的任务队列的最大数量,应该大于或等于最大并发 producer 的数量,配合并行存储转发属性使用,默认值:10000。
(16)concurrentStoreAndDispatchTransactions    如果为 true,转发消息的时候同时提交事务,默认值:false。
(17)concurrentStoreAndDispatchTopics    如果为 true,转发 Topic 消息的时候同时存储消息的 Message Store 中,默认值:false。
(18)concurrentStoreAndDispatchQueues    如果为 true,转发 Queue 消息的时候同时存储消息到 Message Store 中,默认值:true。

注意,以上这些 KahaDB 这些特性并不能100%保证 Broker 异常终止时不造成消息丢失。如果需要保证系统的高可靠性,建议部署到容灾系统上,例如 RAID 磁盘阵列中。

参考文献:
1. http://netcomm.iteye.com/blog/1455086
2. http://berdy.iteye.com/blog/813300
3. http://activemq.apache.org/kahadb.html
4. http://activemq.apache.org/kahadb-master-slave.html

Marathonn_007:2016都江堰马拉松

2016年3月27日 成都·都江堰 双遗马拉松 4:47:57

青春,无论安放在什么地方的青春,总是好的。
这次我只身一人,不远千里,要从北京赶到成都,更要从成都->三星堆->黄龙溪->彭山->眉山赶上都江堰来的理由,也不过想再饱尝一尝这里的山水。

拜水都江堰,问道青城山。

跑着跑着,远远就望见了那座通往二王庙的吊桥,清澈的河水像2000年前那样缓缓流过,蕴育了天府人的安逸与自在的精神气质。
桥上廿一年前的两个青年向我挥了挥手,转身上山去了。

跑着跑着,路两旁出现了大片大片的油菜花,宛如巨大的花毯从大地延绵到天边。
路旁廿一年前的一个青年在给另一个青年拍照,我向他们招了招手。

跑着跑着,路两旁的油菜花不知何时不见了,笔直参天的松树像哨兵一样耸立在道路的两侧,山上的古寺清幽寂静。
石阶上廿一年前的一个青年正在兴奋地催促走在后面的青年快点,我跑过他们的身边,我们彼此互相致意。

跑着跑着,终点就在眼前,冲过大门那一刻的喜悦,让曾经的痛苦是那么的不值一提,我忍不住回头望了望来时的漫漫长路,那两个一路陪伴我,给我鼓励的年轻人呢?












你的青春折半之地在哪?