2016年7月28日星期四

Linux_115:精通 shell 编程之五:局部变量、环境变量、shell 变量

环境:RHEL 7.1 或 MAC 10.11.6

局部变量只能在创建它们的 shell 中使用。
环境变量可以在创建它们的 shell 及其派生出来的任意子进程中使用。
shell 变量是在 shell 初始化的时候被系统设定的变量,比如前文提到的《shell 变量》。

那么,区别到底在哪里呢?通过一个简单的测试说明一下。
实验前,先介绍一下 set、env、export 命令。
(1)set:显示或设置局部变量。
(2)env:显示或设置当前用户的环境变量。
(3)export:显示或设置导出成当前用户的环境变量的局部变量。

开始实验:
(1)设置一个局部变量:$ aaa=bbb
(2)使用 echo 查看刚刚设置的局部变量:$ echo $aaa,输出 bbb,说明局部变量中有 aaa。
(3)使用 env 查看当前用户的环境变量:$ env | grep aaa,没有任何输出,说明当前环境变量中没有 aaa。
(4)使用 set 查看刚刚设置的局部变量:$ set | grep aaa,输出 aaa=bbb,说明局部变量中有 aaa。
(5)使用 export 查看当前用户的环境变量:$ export | grep aaa,没有任何输出,说明当前环境变量中没有 aaa。
(6)使用 export 导出局部变量:$ export aaa
(7)使用 env 再次查看当前用户的环境变量:$ env | grep aaa,输出 aaa=bbb,说明当前环境变量中有 aaa。
(8)使用 export 查看当前用户的环境变量:$ export | grep aaa,输出 aaa=bbb,说明当前环境变量中有 aaa。

小结:

特征
局部变量
环境变量
shell 变量
受子进程影响

被用户设定

shell 设定


可被用户更改

shell 限制



参考文献:
1. 《精通 shell 编程(第二版)》

Linux_114:精通 shell 编程之四:局部变量

环境:RHEL 7.1 或 MAC 10.11.6

局部变量只能在创建它们的 shell 中使用。

1. 标量变量(“名-值”对)
定义格式:name=value,name 是变量名称,value 是变量值。
变量名称只能包括字母,数字和下划线,只能以字母或下划线为开始。
带空格的变量值要用引号(单引号或双引号)引起来。
访问变量只需要在变量名称前加 $ 符号即可。

2. 数组变量
数组是使用一个名称配置一组变量的方法。
定义格式:name[index]=value,index 是介于 0 ~ 1023 之间的一个整数。
给数组赋初值,比如:$ band=(derri terry mike gene),该命令等价于
$ band[0]=derri
$ band[1]=terry
$ band[2]=mike
$ band[3]=gene
访问数组变量:${name[index]}。
访问所有数组变量:${name[*]} 或 ${name[@]} 。
${name[*]} 和 ${name[@]} 的区别在于,如果数组某项值带空格,这时要使用${name[@]},而不能使用${name[*]}。
比如:$ band[4]="ma ping",${name[*]} 和 ${name[@]} 输出虽然看起来一样,但是 ${name[*]} 的结果是 6 项而不是 5 项。

3. 只读变量
当一个变量被指定为只读变量,它的值就无法更改或删除,一直保持到 shell 退出为止。

$ FRUIT=kiwi
$ readonly FRUIT
$ FRUIT=cantaloupe
-bash: FRUIT: readonly variable
$ unset FRUIT
-bash: unset: FRUIT: cannot unset: readonly variable

4. 删除变量
unset  name







3.

参考文献:1. 《精通 shell 编程(第二版)》

Linux_113:精通 shell 编程之三:特殊的 shell 变量

环境:RHEL 7.1 或 MAC 10.11.6

这些 shell 变量主要在 shell 脚本传参中使用。

1. $0
被执行的命令的名字。如果是 shell 脚本查看系统默认的 shell,注意这个不能反映当前使用的 shell 是哪个。

2. $n
这些变量对应的是一个脚本调用的参数。这里的 n 是一个十进制的整数,对应的是参数的位置(第一个参数是 $1,第二个参数是 $2,以此类推)。

3. $#
脚本支持的参数的个数。

4. $*
所有的参数被双重引用。

5. $@
所有的参数被独立地双重引用。

6. $?
最后一个被执行的命令的退出状态。

7. $$
当前 shell 的进程号。对于 shell 脚本来说,这就是它们执行的进程号。

8. $!
最后一个后台命令的进程号。

参考文献:
1. 《精通 shell 编程(第二版)》

2016年7月27日星期三

Linux_112:精通 shell 编程之二:shell 变量

环境:RHEL 7.1 或 MAC 10.11.6

shell 变量是在 shell 初始化的时候被设定的内部变量。

1. PWD
当前的工作目录。

2. UID
当前用户的数字标识。

3. SHLVL
每打开一个新的 bash,就自增 1,用于确认是否关闭了当前会话。

4. REPLY
REPLY 变量 与 read 和 select 命令有关。
(1)read variable 命令用于读取标准输入的变量值,该值会存储到 variable 变量中。
如果 read 命令不带任何变量名,此时,read 就将该值存储到 REPLY 变量中。
(2)select 命令是一种建立菜单的工具,它提供一组字符串供用户选择,用户不必完整地输入字符串,而只需输入相应的序号进行选择。比如:
select variable in list
do
   shell 命令1
   shell 命令2
   shell 命令3
   ……
   break
done
select 自动将 list 形成有编号的菜单,用户输入序号以后,将该序号所对应 list 中的字符串赋给variable 变量,而序号值则保存到 REPLY 变量中。

5.RANDOM
每次随机产生一个介于 0 到 32767 之间的一个整数。

6. SECONDS
shell 启动的时间(秒)。

7. IFS(Internal Field Seprator)
内部域分隔符,用来拆解读入的变量值。默认的分隔符是空格符(space)、制表符(tab)、换行符(\n)。
$ echo $IFS
直接输出 IFS 只能看到一个空白行,看不到任何东西。
$ echo $IFS | od -b
0000000   012                                                           
0000001
转化为二进制就可以看到了,"040"是空格符,"011"是制表符,"012"是换行符 。

8. PATH
命令的搜索路径。

9. HOME
当前用户的主目录。

参考文献:
1. 《精通 shell 编程(第二版)》

Linux_111:精通 shell 编程之一:确定当前使用的 shell

环境:RHEL 7.1 或 MAC 10.11.6

1. echo $SHELL
查看系统默认的 shell,注意这个不能反映当前使用的 shell 是哪个。

2. ps -p $$
查看当前的 shell。
说明:$$ 是一个特殊的 shell 变量,保存的是当前运行 shell 的 PID。

3. echo $0
查看当前的 shell。
说明:$0 是一个特殊的 shell 变量,保存的是被执行的命令的名字。
因为进入一个终端时,第一个被执行的命令总是某个 shell 的名字,所以该 shell 的名字就保存在 $0 中。

4. 查看 shell 版本 
echo $BASH_VERSION

参考文献:
1. 《精通 shell 编程(第二版)》

Linux_110:常用命令之三十六:sed

sed 是一种在线编辑器,它一次处理一行内容。处理时,把当前处理的行存储在临时缓冲区中,称为“模式空间”(pattern space),接着用 sed 命令处理缓冲区中的内容,处理完成后,把缓冲区的内容送往屏幕。接着处理下一行,这样不断重复,直到文件末尾。文件内容并没有改变,除非你使用重定向存储输出。sed主要用来自动编辑一个或多个文件;简化对文件的反复操作;编写转换程序等。

1. 语法:sed [-nefr] [n1[,n2]] function 
(1)-n :安静模式。默认情况下,所有来自标准输入的数据都会列出到终端上。加上 -n 参数后,则只有经过sed 特殊处理的那一行(或者动作)才会被列出来。
(2)-e :直接在命令列模式上进行 sed 的动作编辑。
(3)-f :直接将 sed 的动作写在一个文件内, -f filename 则可以运行 filename 内的 sed 动作;
(4)-r :sed 的动作支持的是延伸型正规表示法的语法。(默认是基础正规表示法语法)
(5)-i :直接修改读取的文件内容,而不是输出到终端。
(6)[n1[,n2]] function :n1,n2 行数起止;function,要进行的动作。
其中,function 可以设定的动作有:
(1)a :新增,a 的后面可以接字串,而这些字串会在新的一行出现(目前的下一行)~
(2)c :取代,c 的后面可以接字串,这些字串可以取代 n1,n2 之间的行!
(3)d :删除,因为是删除啊,所以 d 后面通常不接任何咚咚;
(4)i :插入,i 的后面可以接字串,而这些字串会在新的一行出现(目前的上一行);
(5)p :列印,亦即将某个选择的数据印出。通常 p 会与参数 sed -n 一起运行~
(6)s :取代,可以直接进行取代的工作哩!通常这个 s 的动作可以搭配正规表示法!例如 1,20s/old/new/g 就是啦!

2. 例子
(1)nl /etc/passwd | sed '2,5d'
显示所有的数据,但是把 2-5 行数据删除。注意, sed 后面接的动作,要用单引号引起来。
(2)nl /etc/passwd | sed '2d'
显示所有的数据,但是只删除第 2 行。
(3)nl /etc/passwd | sed '3,$d'
显示所有的数据,但是第 3 行到最后一行。
(4)nl /etc/passwd | sed '2a drink tea'
在第 2 行后的下一行加上 drink tea。
(5)nl /etc/passwd | sed '2i drink tea'
在第 2 行前的上一行加上 drink tea。
(6)nl /etc/passwd | sed '2a drink tea or \
> drink beer ?
在第 2 行后面加入两行文字,drink tea or  与 drink beer ?
添加的每一行之间都必须要以反斜杠  \ 来添加新行。
(7)nl /etc/passwd | sed '2,5c No 2-5 number'
把第 2~5 行的内容取代成为 No 2-5 number。
(8)nl /etc/passwd | sed -n '5,7p'
 只列出 /etc/passwd 文件的第 5~7 行。

3. 搜索和替换例子
除了整行的处理模式之外, sed 还可以用行为单位进行部分数据的搜寻并取代。
搜索和替换语法格式:sed 's/要被取代的字串/新的字串/g'
(1)nl /etc/passwd | sed '/root/p'
输出含有 root 的匹配行和所有行。
(2)nl /etc/passwd | sed -n '/root/p'
只输出含有 root 的匹配行。
(3)nl /etc/passwd | sed  '/root/d'
删除所有包含 root 的行,输出其它行。
(4)nl /etc/passwd | sed -n '/root/{s/bash/blueshell/;p}'
搜索所有包含 root 的行,执行后面花括号中的一组命令,每个命令之间用分号分隔,这里把 bash 替换为 blueshell,再输出这行。
(5)nl /etc/passwd | sed -n '/bash/{s/bash/blueshell/;p;q}'
搜索所有包含 root 的行,只替换第 1 个 bash 关键字为 blueshell,就退出。
(6)/sbin/ifconfig eth0 | grep 'inet addr' | sed 's/^.*addr://g'
删除 IP 前面的部分。
其中 ifconfig eth0 输出如下:inet addr:192.168.1.100 Bcast:192.168.1.255 Mask:255.255.255.0
(7)/sbin/ifconfig eth0 | grep 'inet addr' | sed 's/^.*addr://g' | sed 's/Bcast.*$//g'
删除 IP 后面的部分。
(8)nl /etc/passwd | sed -e '3,$d' -e 's/bash/blueshell/'
 -e 表示多点编辑,第一条命令删除第 3 行到末尾的数据,第二条命令搜索 bash 并替换为 blueshell。

参考文献:
1. http://www.cnblogs.com/ggjucheng/archive/2013/01/13/2856901.html

Linux_109:常用命令之三十五:awk

awk 是一个强大的文本分析工具,它逐行的读入文件,按分隔符抽取每行的信息片段,再进行各种分析处理。
awk工作流程是这样的:读入有'\n'换行符分割的一条记录,然后将记录按指定的域分隔符划分域,填充域,$0 表示所有域,$1 表示第 1 个域,$n 表 示第 n 个域。
默认域分隔符是空格键 或 tab 键。

1. 命令语法:awk '{pattern + action}' {filenames}
(1)pattern 是根据正则表达式要查找的内容。
(2)action 是在找到匹配内容时所执行的一系列命令。
(3)大括号不需要在程序中始终出现,但它们用于根据特定的模式对一系列指令进行分组。 pattern就是要表示的正则表达式,用斜杠括起来。

2. 使用方式

2.1 命令行方式
awk [-F  field-separator]  'commands'  input-file(s)
(1)commands 是 awk 命令。
(2)-F field-separator 是域分隔符,不指定的话,默认是空格。
(3)input-file(s) 是待处理的文件。

2.2 shell 脚本方式
将所有的awk命令插入一个文件,并使awk程序可执行,然后awk命令解释器作为脚本的首行,一遍通过键入脚本名称来调用。
相当于shell脚本首行的:#!/bin/sh 可以换成:#!/bin/awk

2.3 文件方式
awk -f awk-script-file input-file(s)
(1)-f awk-script-file 是 awk 脚本文件。
(2)input-file(s) 是待处理的文件。

3. 内置变量
(1)ARGC 命令行参数个数
(2)ARGV 命令行参数排列
(3)ENVIRON 支持队列中系统环境变量的使用
(4)FILENAME 浏览的文件名
(5)FNR 浏览文件的记录数
(6)FS 设置输入域分隔符,等价于命令行 -F 选项
(7)NF 浏览记录的域的个数
(8)NR 已读的记录数
(9)OFS  输出域分隔符
(10)ORS  输出记录分隔符
(11)RS 控制记录分隔符

4. 例子
(1)只显示 /etc/passwd 的账户名称
# cat /etc/passwd | awk  -F ':'  '{print $1}'
这里的 $1表示登录用户,$3 表示登录用户IP,以此类推。
(2)只显示/etc/passwd 的账户和账户对应的 shell,并且账户和 shell 之间以 tab 键间隔。
# cat /etc/passwd |awk  -F ':'  '{print $1"\t"$7}'
(3)只显示/etc/passwd 的账户和账户对应的 shell,并且账户和 shell 之间以逗号间隔,而且在所有行添加列名 name、shell,在最后一行添加 "blue,/bin/nosh"。
# cat /etc/passwd |awk  -F ':'  'BEGIN {print "name,shell"}  {print $1","$7} END {print "blue,/bin/nosh"}'
(4)搜索 /etc/passwd 中以 root 开头的所有行
# awk -F: '/^root/' /etc/passwd
(5)统计 /etc/passwd 的文件名,每行的行号,每行的列数,对应的完整行内容:
# awk  -F ':'  '{print "filename:" FILENAME ",linenumber:" NR ",columns:" NF ",linecontent:"$0}' /etc/passwd
# awk  -F ':'  '{printf("filename:%10s,linenumber:%s,columns:%s,linecontent:%s\n",FILENAME,NR,NF,$0)}' /etc/passwd

5. 编程

参考文献:
1. http://www.cnblogs.com/ggjucheng/archive/2013/01/13/2858470.html

MAC_043:下载和安装 wget

OS  X 版本:10.9.2。

为了学习OpenShift,看到官网提供了VM,于是准备下载下来研究一下。
没想到,下载时还遇到了一些小问题,记录下来,以备忘。

OpenShift VM 下载地址:
http://openshift.github.io/documentation/oo_deployment_guide_vm.html。

下载需要用wget,而我的MAC OS上没有wget,需要先安装wget。

1.  下载wget
下载地址:http://www.gnu.org/software/wget/ 
除了直接从网页下载,还可以使用curl命令下载:curl -O http://ftp.gnu.org/gnu/wget/wget-1.15.tar.gz

2. 解压、编译、安装wget
tar -zxvf wget-1.15.tar.gz
cd wget-1.15
./configure --with-ssl=openssl
make
sudo make install

运行which wget,应该输出:/usr/local/bin/wget。

3. 使用wget下载OpenShift虚机 
wget https://mirror.openshift.com/pub/origin-server/release/3/images/openshift-origin.zip --secure-protocol=SSLv3
如果你想给下载文件指定一个别的名称,可以增加-O参数:
wget -O openshift_origin_3.zip https://mirror.openshift.com/pub/origin-server/release/3/images/openshift-origin.zip --secure-protocol=SSLv3

好了,一切就绪,等着吧。

参考文献:
1. http://coolestguidesontheplanet.com/install-and-configure-wget-on-os-x/
2. https://thomashunter.name/blog/install-wget-on-os-x-lion/
3. http://whyjava.wordpress.com/2013/08/11/download-openshift-origin-virtual-image/

Linux_108:常用命令之三十四:tr sort uniq

tr、sort、uniq 命令都是用来统计单词数的,wc 命令同样也可以统计单词的个数,但是要统计一个单词在一个文件中出现的次数,wc 命令就无能为力了。

1. tr (transliterate)
tr命令可以对来自标准输入的字符进行替换、压缩和删除。它可以将一组字符变成另一组字符,经常用来编写优美的单行命令,作用很强大。
 

1.1 语法:tr 选项 参数

1.2 选项
(1)-c 或 --complerment:取代所有不属于第1字符集的字符。
(2)-d 或 --delete:删除所有属于第1字符集的字符。
(3)-s 或 --squeeze-repeats:把连续重复的字符以单独一个字符表示。
(4)-t 或 --truncate-set1:先删除第一字符集较第二字符集多出的字符。

1.3 参数
(1)字符集1:指定要转换或删除的原字符集。当执行转换操作时,必须使用参数“字符集2”指定转换的目标字符集。但执行删除操作时,不需要参数“字符集2”;
(2)字符集2:指定要转换成的目标字符集。

1.3 例子
(1)tr '!?":;\[\]{}(),.' ' ' < ch15.doc
去除文件中的所有标点和分隔符。
(2)echo "HELLO WORLD" | tr 'A-Z' 'a-z' hello world
大写转小写。
(3)echo "hello 123 world 456" | tr -d '0-9'
删除数字。
(4)cat text | tr '\t' ' '
制表符转为空格。
(5)echo "thissss is a text linnnnnnne." | tr -s ' sn'
压缩重复的字符。

2. sort
将文件的每一行作为一个单位,相互比较,比较原则是从首字符向后,依次按ASCII码值进行比较,最后按升序输出。

2.1 语法:sort [options] 文件名

2.2 选项
(1)-u:除去重复行。
(2)-r:sort 默认按升序排,使用 -r 后按降序排。
(3)-o:把排序结果输出到原文件。
(4)-n:按照数值而不是字符排序。比如:按照数字排 10 比 2 大,按照字符排 10 比 2 小。
(5)-t:指定间隔符。
(6)-k:指定列数。
(7)-f:会将小写字母都转换为大写字母来进行比较,亦即忽略大小写。
(8)-c:检查文件是否已排好序,如果乱序,则输出第一个乱序的行的相关信息,并返回1。
(9)-C:会检查文件是否已排好序,如果乱序,不输出内容,只返回1。
(10)-M:以月份来排序,比如 JAN 小于 FEB 等等。
(11)-b:忽略每一行前面的所有空白部分,从第一个可见字符开始比较。

2.3 例子
(1)sort -r number.txt -o number.txt
按降序排列,并写入原文件。
(2)sort -n -k 2 -t : fruits.txt
以第2列水果数量排序。
文件有三列,列与列之间用冒号隔开,第1列表示水果类型,第2列表示水果数量,第3列表示水果价格。
banana:30:5.5
apple:10:2.5
pear:90:2.3
orange:20:3.4

3. uniq
输出一个文件中所有的唯一的行, 对于那些连续重复的行只显示一次。

3.1 语法:uniq [-cdu][-f<栏位>][-s<字符位置>][-w<字符位置>][--help][--version][输入文件][输出文件]

3.2 选项
(1)-c: 在每列旁边显示该行重复出现的次数。
(2)-d: 仅显示重复出现的行列。
(3)-f: 忽略比较指定的栏位。
(4)-s: 忽略比较指定的字符。
(5)-u: 仅显示出一次的行列。
(6)-w: 指定要比较的字符。
(7)-n:前n个字段和每个字段前的空白一起被忽略。
(8)+n:前n个字符被忽略。

3.3 例子
(1)uniq -c fruits.txt
显示每一行出现的次数和内容。
(2)uniq -d fruits.txt  
仅显示文件中连续重复出现的行。
(3)uniq -u fruits.txt
显示文件中没有连续出现的行。
 
参考文献:
1. http://man.linuxde.net/tr
2. http://www.cnblogs.com/51linux/archive/2012/05/23/2515299.html
3. http://blog.chinaunix.net/uid-26495963-id-3282526.html

Linux_107:常用命令之三十三:wc

wc (word count) 命令可以统计指定文件中的字节数、字数、行数,并将统计结果显示输出。如果没有给出文件名,则从标准输入读取。如果是多个文件,最后一行会给出所有指定文件的总统计数。

1. 语法:wc [选项] 文件...

2. 参数说明
(1)-l 统计行数。
(2)-w 统计单词数。一个字被定义为由空白、跳格或换行字符分隔的字符串。
(3)-m 统计字符数。这个标志不能与 -c 标志一起使用。
(4)-c 统计字节数。
(5)-L 打印最长行的长度。

3. 例子
(1)wc test.txt
查看文件的行数、单词数、字符数。
(2)wc -l test.txt
查看文件的行数。
(3)wc -l fruits users
查看每个文件的行数,以及所有文件的总行数。
(4)wc -w fruits users
查看每个文件的单词数,以及所有文件的总单词数。
(5)wc -m fruits users
查看每个文件的字符数,以及所有文件的总字符数。
(6)cat test.txt | wc -l
只打印统计数字不打印文件名。
(7)ls -l | wc -l
统计当前目录下的文件数。

Linux_106:常用命令之三十二:cat

cat 命令用来显示文件内容。

1. 语法:cat [options] file1 ... fileN

2. 参数说明
(1)-n 或 --number 由 1 开始对所有输出的行数编号。
(2)-b 或 --number-nonblank 和 -n 相似,只不过对于空白行不编号。
(3)-s 或 --squeeze-blank 当遇到有连续两行以上的空白行,就代换为一行的空白行。
(4)-v 或 --show-nonprinting 显示非打印字符。
(4)-E 或 --show-ends 在每行结束处显示 $。

3. 例子
(1)cat fruits
(2)cat fruits users
显示两个文件,输出内容是把两个文件的内容连接在一起。
(3)cat -n fruits
输出的每一行前显示行号。
(4)cat -b fruits
跳过空白行。
(5)cat -b fruits users
显示两个文件,每个文件的行号都是从1开始。输出内容是把两个文件的内容连接在一起。
(6)cat file1 file2 > file
合并两个文件为一个文件。

4. 与 EOF 一起使用 
EOF(End OF File)是文本终止符。
在命令行状态下,也可以输入 control + D 代替输入 EOF。
也可以用其它字符来代替 EOF,因为只是一个标识符,不过还是建议使用 EOF。

(1)手工输入多行创建文件
$ cat << EOF > hello.txt
在出现输入提示符">",输入以下内容:
> Hello
> Ma Ping
> EOF
(2)手工输入多行追加文件
$ cat << EOF >> hello.txt
在出现输入提示符">",输入以下内容:
> Ma Qing Chuan
> EOF

总的来说,cat 和 EOF 的组合让用户可以直接输入文件的内容,相当于一个临时文件,但又没有创建任何文件,使用起来非常方便。

2016年7月26日星期二

Linux_105:常用命令之三十二:head、tail

1. head [ -n ] file
用来显示一个文件的最开始几行。
例子:
(1)head -n 5 file
(2)ls -1ut /Users/maping | head -5
某目录中访问最频繁的五个文件。

2. tail [ -n ] file
用来显示一个文件的最后几行。
例子:
(1)tail -n 5 file
(2)ls -1rt /Users/maping | tail -5
某目录中最长时间没有访问的五个文件
(3)tail -f /var/log/httpd/access_log

实时跟踪 follow 选项 -f

2016年7月21日星期四

ActiveMQ_041:消息的预取机制

环境:MAC OS X 10.11.5 + ActiveMQ 5.13.3

ActiveMQ 通过消息预取机制来提高性能,这意味着客户端的内存里可能会缓存一定数量的消息。缓存消息的数量由 prefetch size 来控制。
当某个 consumer 的 prefetch size 已经达到上限,那么broker 不会再向 consumer 分发消息,直到 consumer 向 broker 发送消息的确认。
对于快消费者,你可以设置较大的 prefetch size;对于慢消费者,建议设置 prefetch size =1。
prefetch size 的缺省值如下:
  • persistent queues : 1000
  • non-persistent queues : 1000
  • persistent topics: 100
  • non-persistent topics: Short.MAX_VALUE -1
客户端可以重写 prefetch size,有两种方式:

1. Connection  级别重写
(1)通过 Connection URI 指定 prefetch size
例如:
所有的消费者的预取大小:tcp://localhost:61616?jms.prefetchPolicy.all=50
所有 Queue 的消费者的预取大小:tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
 (2)纯代码实现
在 ActiveMQConnectionFactory 或者 ActiveMQConnection 上设置 ActiveMQPrefetchPolicy 对象来配置 prefetch policy。

2.  Destination 级别重写
例如:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
consumer = session.createConsumer(queue);

参考文献:
1. http://activemq.apache.org/what-is-the-prefetch-limit-for.html

ActiveMQ_040:Topic 消息分发策略


环境:MAC OS X 10.11.5 + ActiveMQ 5.13.3  

1. 平均分发消息:Round Robin Dispatch Policy

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry topic="FOO.>">
        <dispatchPolicy>
          <roundRobinDispatchPolicy />
        </dispatchPolicy>       
      </policyEntry>     
    </policyEntries>
  </policyMap>
</destinationPolicy>

2. 以相同顺序接收消息:Strict Order Dispatch Policy
保证每个 Topic 订阅者以相同的顺序接收消息,代价是性能上的损失。

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry topic="ORDERS.>">
        <dispatchPolicy>
          <strictOrderDispatchPolicy />
        </dispatchPolicy>
      </policyEntry>
      </policyEntries>
  </policyMap>
</destinationPolicy>

参考文献:
1. http://whitesock.iteye.com/blog/165458

2016年7月20日星期三

ActiveMQ_039:TimeStampPlugin Interceptors

环境:MAC OS X 10.11.5 + ActiveMQ 5.13.3

当客户端的时钟不正确时,TimeStampPlugin 可以把消息的时间戳以 Broker所在机器的时钟为准。
注意,当使用 TimeStampPlugin 后,生产者发送消息时看到的消息时间戳将和消费者收到消息时看到的时间戳不一致。
配置如下:

<plugins>

  <!-- lets enable detailed logging in the broker but ignore ConnectionEvents -->
  <loggingBrokerPlugin logAll="true" logConnectionEvents="false"/>
  <timeStampingBrokerPlugin ttlCeiling="2000" zeroExpirationOverride="3000" futureOnly="true"/>

</plugins>

或者以这种简洁的方式:

<plugins>
  <timeStampingBrokerPlugin/>
</plugins>

参数说明:
(1)zeroExpirationOverride 默认值 0
当值非 0 时,配置的值会把所有没有配置过期时间的消息设置为过期时间。
(2)ttlCeiling 默认值 0
当值非 0 时,其值为最大的过期时间,即zeroExpirationOverride值最大不能超过此值。
(3)futureOnly  默认值 false
如果为 true,将不会把消息的时间戳和过期时间更新为一个比原始值还低的一个值。
如果为 false,会更新。

注意,如果消费者的时钟比 Broker 的时钟快,消费者可能认为消息已经过期,因此不会消费消息。
这时需要设置 futureOnly=”true”。

参考文献:
1. http://activemq.apache.org/timestampplugin.html
2. http://tmielke.blogspot.com/2011/01/sync-your-machine-clocks.html

Java_029:JDK 自带小工具介绍之六:jcmd

环境:MAC OS X 10.11.5 + Oracle JDK 1.7.0_80

JDK 7 以后,多了 jcmd 这个小工具,它的功能很多,可以导出 Heap Dump,导出 Thread Dump,查看 java 进程,最重要的是可以执行 GC
在分析内存泄露时,可以先后执行两次 Heap Dump 和 Thread Dump,在两次执行之间,可以执行一次 GC。
如果在执行 GC 后,还依旧存在的大对象,将作为重点内存泄露对象;还依旧执行的线程,将作为重点怀疑对象。

1. 列出当前所有运行的 java 进程:jcmd -l

2. 列出当前运行的 java 进程可以执行的操作:jcmd PID help
比如:jcmd 17249 help
17249:
The following commands are available:
VM.native_memory
VM.commercial_features
GC.rotate_log
ManagementAgent.stop
ManagementAgent.start_local
ManagementAgent.start
Thread.print
GC.class_histogram
GC.heap_dump
GC.run_finalization
GC.run
VM.uptime
VM.flags
VM.system_properties
VM.command_line
VM.version
help

3. 查看 JVM 的启动时长:jcmd PID VM.uptime
 
4. 查看 JVM 的类信息:jcmd PID  GC.class_histogram
这个可以查看每个类的实例数量和占用空间大小。

5. 查看 JVM 的Thread Dump:jcmd PID Thread.print

6. 查看 JVM 的Heap Dump:jcmd PID GC.heap_dump FILE_NAME
注意,如果只指定文件名,默认会生成在启动 JVM 的目录里。

7. 查看 JVM 的属性信息:jcmd PID VM.system_properties

8. 查看 JVM 的启动参数:jcmd PID VM.flags
注意,可以看到 -X 和 -XX 的参数信息,比较有用。

9. 查看 JVM 的启动命令行:jcmd PID VM.command_line

10. 对 JVM 执行 java.lang.System.runFinalization():jcmd PID GC.run_finalization
尽量去调用这个对象的finalize方法

11. 对 JVM 执行 java.lang.System.gc():jcmd PID GC.run
告诉垃圾收集器打算进行垃圾收集,而垃圾收集器进不进行收集是不确定的。


12. 查看 JVM 的性能:jcmd PID PerfCounter.print

参考文献:
1. http://blog.csdn.net/winwill2012/article/details/46364849

Java_028: Heap Dump 分析报告:MemorySessionData 对象过多导致 OutOfMemoryError

环境:MAC OS X 10.11.5 + Oracle JDK 1.7.0_80 + MAT 1.6.0

客户运行的WebLogic 频繁报告 OutOfMemoryError,在应用服务器再次挂起之后,收集了当时的 Heap Dump。以下是 Heap Dump 分析报告。

1. Heap 使用概况
已知分配给 JVM 的 Heap 最大值为 4 G,从下图可以看出,已经使用的 Heap 大小总共为 3.6 G,最大对象为 31.1 M。
鼠标指向深色的对象,发现该对象是 weblogic.servlet.internal.session.MemorySessionData。


2. 怀疑的有内存泄露的对象
从下图可以看出,有 7716 个 weblogic.servlet.internal.session.MemorySessionData 实例对象, 总共占了 3,611,539,432 (94.39%) 字节,大约 3.6 G。


3. 怀疑的有内存泄露的对象
从下图可以看出,有大量“活着的” weblogic.servlet.internal.session.MemorySessionData 实例对象。


4. 按照 java 对象个数和占用大小排序查看
从下图可以看出,蓝色选中的对象,是有严重嫌疑的对象,实例数量多,且占空间大,在 1G~3G 之间不等。

按照 MemorySessionData 过滤,发现确实有 7716 个该对象占了 3,611,539,432 (94.39%) 字节,大约 3.6 G的内存空间。

5. 进一步查看 weblogic.servlet.internal.session.MemorySessionData 引用的对象大小
从下图可以看出,经过层层查找,最终锁定对象 psft.pt8.util.PSSessionProp 对象。
开始怀疑每个 MemorySessionData 大对象中都包含一个 psft.pt8.util.PSSessionProp 大对象。

按照 psft.pt8.util.PSSessionProp 过滤,发现确实有 7716 个该对象占了 3,591,220,008 字节,大约 3.59 G的内存空间。而且,7716 这个数字和 MemorySessionData 的数量一样。
说明确实每个 MemorySessionData 大对象中都包含一个 psft.pt8.util.PSSessionProp 大对象。

6. 结论
当客户登陆系统后,会自动生成一个 weblogic.servlet.internal.session.MemorySessionData 对象,由于每个 weblogic.servlet.internal.session.MemorySessionData 对象中,都被放入了一个 psft.pt8.util.PSSessionProp 大对象(大小为 32 M)。一共有 7716 个 MemorySessionData 和 PSSessionProp 对象,导致内存几乎被占满,并最终导致 OutOfMemoryError。

建议如下:
(1)检查代码中,使用 psft.pt8.util.PSSessionProp 的地方。
(2)检查代码中,是否有死循环或递归调用。
(3)检查代码中,是否有大循环重复产生新对象实体。
(4)检查代码中,是否使用了 List、MAP等集合对象后没有清除,List、MAP 等集合对象会始终存有对对象的引用,使得这些对象不能被 GC 回收。
(5)修改 web.xml,把 session 过期时间适当调低,默认是半个小时。
<session-config>
     <session-timeout>30000</session-timeout>
</session-config>

参考文献:
1. http://www.voidcn.com/blog/guogang83/article/p-5979082.html
2. http://www.cnblogs.com/cyjch/archive/2012/04/10/2440421.html

2016年7月18日星期一

ActiveMQ_038:消息重发机制

环境:MAC OS X 10.11.5 + ActiveMQ 5.13.3

1. 参数说明
(1)collisionAvoidanceFactor 默认值 0.15,The percentage of range of collision avoidance if enabled
(2)maximumRedeliveries 默认值 6,消息被认为是“毒丸”之前的最大重发次数。一旦企图超过该重试次数,将被认为是“毒丸”,并发送到 DLQ。设置为 -1,表明无限次重发。
(3)maximumRedeliveryDelay 默认值 -1,下次重发时的延迟等待时间。设置为 -1,表示没有最大延迟等待时间限制。Sets the maximum delivery delay that will be applied if the useExponentialBackOff option is set.
(4)initialRedeliveryDelay 默认值 1000L,第一次重发的延迟等待时间。
(5)redeliveryDelay 默认值 1000L,每次重发的重发的延迟等待时间。
(6)useCollisionAvoidance 默认值 false,Should the redelivery policy use collision avoidance
(7)useExponentialBackOff 默认值 false,Should exponential back-off be used (i.e. to exponentially increase the timeout)
(8)backOffMultiplier 默认值 5,The back-off multiplier

2. 客户端重写重发策略

2.1 Connection 级别重写
(1)作为参数写在 Connection URI 中,比如:
tcp://localhost:61616?jms.prefetchPolicy.all=100&jms.redeliveryPolicy.maximumRedeliveries=5
(2)纯代码实现
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);

2. 2 Destination 级别重写
ActiveMQConnection connection ...  // Create a connection

RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
queuePolicy.setInitialRedeliveryDelay(0);
queuePolicy.setRedeliveryDelay(1000);
queuePolicy.setUseExponentialBackOff(false);
queuePolicy.setMaximumRedeliveries(2);

RedeliveryPolicy topicPolicy = new RedeliveryPolicy();
topicPolicy.setInitialRedeliveryDelay(0);
topicPolicy.setRedeliveryDelay(1000);
topicPolicy.setUseExponentialBackOff(false);
topicPolicy.setMaximumRedeliveries(3);

// Receive a message with the JMS API
RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
map.put(new ActiveMQTopic(">"), topicPolicy);
map.put(new ActiveMQQueue(">"), queuePolicy);

参考文献:
1. http://activemq.apache.org/redelivery-policy.html

ActiveMQ_037:死信队列的参数设置

环境:MAC OS X 10.11.5 + ActiveMQ 5.13.3

1.为每个 Queue 指定一个死信队列
<policyEntry queue=">">
  <deadLetterStrategy>
    <!--
      Use the prefix 'DLQ.' for the destination name, and make
      the DLQ a queue rather than a topic
    -->
    <individualDeadLetterStrategy
      queuePrefix="DLQ." useQueueForQueueMessages="true" />
  </deadLetterStrategy>
</policyEntry>

2. 自动丢弃超时消息
<policyEntry queue=">">
   <deadLetterStrategy>
     <sharedDeadLetterStrategy processExpired="false" />
   </deadLetterStrategy>
</policyEntry>

3.  如果希望非持久化消息超时后,也进入死信队列,需要增加如下设置
<policyEntry queue=">">
  <deadLetterStrategy>
    <sharedDeadLetterStrategy processNonPersistent="true" />
  </deadLetterStrategy>
</policyEntry>

参考文献:
1. http://activemq.apache.org/message-redelivery-and-dlq-handling.html

ActiveMQ_036:哪些消息会进入死信队列?

环境:MAC OS X 10.11.5 + ActiveMQ 5.13.3

1.  超时消息
(1)默认情况下,设置超时的持久化消息超时后,会进入死信队列。
./activemq/bin/activemq producer --brokerUrl tcp://localhost:61616 --user admin --password admin --destination queue://testQueue --persistent true --messageCount 1 --parallelThreads 1 --msgTTL 1000 --messageSize 1
(2)默认情况下,设置超时的非持久化消息超时后,不会进入死信队列,即直接丢弃。
./activemq/bin/activemq producer --brokerUrl tcp://localhost:61616 --user admin --password admin --destination queue://testQueue --persistent false --messageCount 1 --parallelThreads 1 --msgTTL 1000 --messageSize 1

2. 带事务的消费者的 session 调用了 rollback() 方法

import java.io.IOException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Queue_Receiver_Transaction_Async {

    public static void main(String[] args) {

        // ConnectionFactory:连接工厂,JMS用它创建连接
        ConnectionFactory connectionFactory;
        // Connection:JMS客户端到JMS Provider的连接
        Connection connection = null;
        // Session:一个发送或接收消息的线程
        final Session session;
        // Destination:消息的目的地;消息发送给谁.
        Destination destination;
        // MessageConsumer:消息消费者
        MessageConsumer consumer;

        connectionFactory = new ActiveMQConnectionFactory(
                "admin",
                "admin",
                "failover:(tcp://127.0.0.1:61616)");
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(true, Session.SESSION_TRANSACTED);
            destination = session.createQueue("FirstQueue");
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage tm = (TextMessage) message;
                    try {
                        System.out.println("Received message: " + tm.getText());
                        session.rollback();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
          
            try {
                System.in.read();
            } catch (IOException e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection) {
                    connection.close();
                }
            } catch (Throwable ignore) {
            }
        }
    }
}
向 FirstQueue 发送一条消息,然后运行此程序,输出如下:
run:
 INFO | Successfully connected to tcp://127.0.0.1:61616
Received message: ActiveMq发送的消息1
Received message: ActiveMq发送的消息1
Received message: ActiveMq发送的消息1
Received message: ActiveMq发送的消息1
Received message: ActiveMq发送的消息1
Received message: ActiveMq发送的消息1
Received message: ActiveMq发送的消息1

由此可见,一共重试了 6 次,加上原本的第 1 次,一共 7 次,7 次全部失败后,消息被放入 DLQ。

3. 带事务的消费者的 session 在调用 commit() 方法之前调用了 close() 方法

import java.io.IOException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Queue_Receiver_Transaction_Async {

    public static void main(String[] args) {

        // ConnectionFactory:连接工厂,JMS用它创建连接
        ConnectionFactory connectionFactory;
        // Connection:JMS客户端到JMS Provider的连接
        Connection connection = null;
        // Session:一个发送或接收消息的线程
        final Session session;
        // Destination:消息的目的地;消息发送给谁.
        Destination destination;
        // MessageConsumer:消息消费者
        MessageConsumer consumer;

        connectionFactory = new ActiveMQConnectionFactory(
                "admin",
                "admin",
                "failover:(tcp://127.0.0.1:61616)");
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(true, Session.SESSION_TRANSACTED);
            destination = session.createQueue("FirstQueue");
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage tm = (TextMessage) message;
                    try {
                        System.out.println("Received message: " + tm.getText());
                        session.close();
                        session.commit();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            try {
                System.in.read();
            } catch (IOException e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection) {
                    connection.close();
                }
            } catch (Throwable ignore) {
            }
        }
    }
}

向 FirstQueue 发送一条消息,然后运行此程序,输出如下:
run:
 INFO | Successfully connected to tcp://127.0.0.1:61616
Received message: ActiveMq发送的消息1
javax.jms.IllegalStateException: The Session is closed
    at org.apache.activemq.ActiveMQSession.checkClosed(ActiveMQSession.java:769)
    at org.apache.activemq.ActiveMQSession.commit(ActiveMQSession.java:569)
    at com.travelsky.activemq.Queue_Receiver_Transaction_Async$1.onMessage(Queue_Receiver_Transaction_Async.java:61)
    at org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:1404)
    at org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:131)
    at org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:202)
    at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133)
    at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

但是并没有重试 6 次,也没有把消息放入 DLQ,和官方文档描述不符。

4. 使用 CLIENT_ACKNOWLEDGE 模式的消费者的 session 调用了 recover() 方法

import java.io.IOException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Queue_Receiver_ClientAck_Async {

    public static void main(String[] args) {

        // ConnectionFactory:连接工厂,JMS用它创建连接
        ConnectionFactory connectionFactory;
        // Connection:JMS客户端到JMS Provider的连接
        Connection connection = null;
        // Session:一个发送或接收消息的线程
        final Session session;
        // Destination:消息的目的地;消息发送给谁.
        Destination destination;
        // MessageConsumer:消息消费者
        MessageConsumer consumer;

        connectionFactory = new ActiveMQConnectionFactory(
                "admin",
                "admin",
                "failover:(tcp://127.0.0.1:61616)");

        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            destination = session.createQueue("FirstQueue");
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage tm = (TextMessage) message;
                    try {
                        System.out.println("Received message: " + tm.getText());
                        // 如果不加这句,Broker就收不到确认信息,消息将一直保留
                        // 一个有意思的现象是:如果没加这句,相当于没有提交确认消息
                        // 如果这个消费者一直不退出,那么其它消费者也无法接收这些消息。
                        session.recover();
                        message.acknowledge();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            try {
                System.in.read();
            } catch (IOException e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection) {
                    connection.close();
                }
            } catch (Throwable ignore) {
            }
        }
    }
}

向 FirstQueue 发送一条消息,然后运行此程序,输出如下:
run:
 INFO | Successfully connected to tcp://127.0.0.1:61616
Received message: ActiveMq发送的消息1
Received message: ActiveMq发送的消息1
Received message: ActiveMq发送的消息1
Received message: ActiveMq发送的消息1
Received message: ActiveMq发送的消息1
Received message: ActiveMq发送的消息1
Received message: ActiveMq发送的消息1

由此可见,一共重试了 6 次,加上原本的第 1 次,一共 7 次,7 次全部失败后,消息被放入 DLQ。

参考文献:
1. http://activemq.apache.org/message-redelivery-and-dlq-handling.html

ActiveMQ_035:Destination 通配符含义

环境:MAC OS X 10.11.5 + ActiveMQ 5.13.3

假设你有 PRICE.STOCK.NASDAQ.ORCL 和 PRICE.STOCK.NYSE.IBM 这两个 Destination。
你可以直接指定完整的 Queue 名称进行完全匹配,也可以使用通配符进行匹配。
通配符有三个:
(1).  用来隔离各个名称部分。
(2)*  用来匹配任何名称部分。
(3)> 用来递归地匹配任何以此字符开头的 Destination。

举例说明:
(1)PRICE.> 匹配任何以 PRICE. 开头的 Destination,即匹配所有市场的所有产品价格。
(2)PRICE.STOCK.> 匹配任何以 PRICE.STOCK. 开头的 Destination(名称部分跨 .),即匹配所有股票市场的所有产品价格。。
(3)PRICE.STOCK.NASDAQ.* 匹配任何以 PRICE.STOCK.NASDAQ. 开头的 Destination(名称部分不跨 .),即匹配 NASDAQ 股票市场的所有产品价格。
(4)PRICE.STOCK.*.IBM 匹配任何以 PRICE.STOCK. 开头的、中间间隔一个任意的名称、结尾是 IBM 的 Destination,即匹配所有股票市场的 IBM 产品价格。

参考文献:
1. http://activemq.apache.org/wildcards.html

2016年7月13日星期三

ActiveMQ_034:为每一个 Queue 配置一个 KahaDB 实例

环境:MAC OS X 10.11.5 + ActiveMQ 5.13.3

默认情况下,某个 KahaDB 文件存储了所有 Queue 的未处理消息。
实际使用中,有些 Queue 的消费速度快,有些 Queue 的消费速度慢,这样不利于 KahaDB 文件的清除。

在这种情况下,可以使用 mKahaDB,为每一个 Queue 配置一个  KahaDB 实例。

1. 修改 activemq.xml 中的 persistenceAdapter 部分

<persistenceAdapter>
  <!--<kahaDB directory="/tmp/kahadb"/>-->
  <mKahaDB directory="${activemq.base}/data/kahadb">
    <filteredPersistenceAdapters>
      <!-- kahaDB per destinations -->
      <filteredKahaDB perDestination="true">
        <persistenceAdapter>
          <kahaDB journalMaxFileLength="32mb"/>
        </persistenceAdapter>
      </filteredKahaDB>
    </filteredPersistenceAdapters>
  </mKahaDB>
</persistenceAdapter>

2. 启动 ActiveMQ
./activemq-node3/bin/activemq console xbean:activemq-node3-master-slave.xml

3. 分别向 testQueue1 和 testQueue2 发送消息
(1)向 testQueue1 发送消息
./activemq-node3/bin/activemq producer --brokerUrl tcp://localhost:61619 --user admin --password admin --destination queue://testQueue1 --messageSize 1024 --messageCount 1000 --parallelThreads 10
(2)向 testQueue2 发送消息
./activemq-node3/bin/activemq producer --brokerUrl tcp://localhost:61619 --user admin --password admin --destination queue://testQueue2 --messageSize 1024 --messageCount 1000 --parallelThreads 10

4. 查看 KahaDB 目录
发现在 /data/kahadb/ 目录下,生成了 queue#3a#2f#2ftestQueue1 和 
queue#3a#2f#2ftestQueue2 目录。
在这两个目录下,分别有 testQueue1 和 testQueue2 的 db-1.log、db.data、db.redo 文件。
说明每个 Queue 的持久化 KahaDB 文件是各自独立分开存储的。

参考文献:
1. http://blog.garytully.com/2011/11/activemq-multiple-kahadb-instances.html
2. https://access.redhat.com/documentation/en-US/Fuse_MQ_Enterprise/7.1/html/Configuring_Broker_Persistence/files/FuseMBMultiKahaDB.html

2016年7月7日星期四

Java_027:使用 IBM Thread Dump Analyzer 分析 Thread Dump

环境:MAC OS X 10.11.5 + Oracle JDK 1.7.0_80 + JCA 4.5.7

thread dump 是 java 进程的线程快照,一般要多做几次,进行比较。
比如,等第 1 次 thread dump 文件生成后,接着再获取第 2 次 thread dump 文件。
这样可以看出在先后两个时间点上,线程执行的位置,通过对比这两个文件,进行分析,查出原因,进而解决问题。
比如,如果发现先后两组数据中同一线程都执行在同一位置,则说明此处可能有问题,因为程序运行是极快的,如果两次均在某一点上,说明这一点耗时是很大的。

1. 如何获得 thread dump 文件?
(1)kill -3 PID
(2)jstack -l PID
(3)Ctrl + \ (Linux 前台程序)
(4)Ctrl + Break (Windows 前台程序)

2.  IBM Thread Dump Analyzer 
主页:https://www.ibm.com/developerworks/community/groups/service/html/communityview?communityUuid=2245aa39-fa5c-4475-b891-14c205f7333c

3. 启动IBM Thread Dump Analyzer
java -Xmx500m -jar jca457.jar


Java_026:使用 MAT 分析 Heap Dump

环境:MAC OS X 10.11.5 + Oracle JDK 1.7.0_80 + MAT 1.6.0

MAT(Memory Analyzer Tool) 是一个 Java Heap 内存分析工具,是解决 OutOfMemoryError 的利器。
它可以快速的计算出内存中所有对象的数量以及占用空间,查找内存泄露,还可以看到是哪个对象阻止了垃圾收集器的回收工作。
下载地址:http://www.eclipse.org/mat/,安装很简单,解压即可。
MAT 可以离线分析 HPROF 格式的 heap dump 二进制文件。

MAT 有以下几种视图帮助我们分析:
(1)Biggest object by retained size:以饼状图列出系统中占用最大资源的对象。
(2)Histogram:以直方图的形式列出每个实例的详细信息。
(3)Dominator Tree:列出最大的对象和保持其存活的对象。
(4)Top Consumers:显示出最耗资源的对象信息。
(5)Duplicate Classes:显示重复类信息。
(6)Leak Suspects:内存泄露嫌疑对象。

1. 如何获得 heap dump 文件?
(1)jmap -dump:format=b,file=/tmp/heap.bin PID
(2)-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp
(3)-XX:+HeapDumpOnCtrlBreak -XX:HeapDumpPath=/tmp

2. shallow heap 和 retained heap
shallow heap 指的是对象本身占用内存的大小,不包含对其它对象的引用,也就是对象头加成员变量(不是成员变量的值)的总和。
retained heap 指的是对象本身的 shallow heap,加上从该对象能直接或间接访问到对象的 shallow heap 之和。
retained heap 是该对象被 GC 之后所能回收到内存的总和。

3. with incoming references 和 with outgoing references
List objects 有两个选项:with incoming references 和 with outgoing references。
with incoming references 表示的是当前查看的对象被外部引用的情况。
with outgoing references 表示的是当前查看的对象引用的外部对象。

4. 对比前后两次的 Heap Dump 

5. 通过 Merge Shortest Paths to GC Roots 找出某个实例没有被释放的原因

参考文献:
1. http://blog.csdn.net/rachel_luo/article/details/8990202
2. http://blog.csdn.net/rachel_luo/article/details/8992461
3. http://blog.csdn.net/rachel_luo/article/details/8992720
4. http://www.xuebuyuan.com/1656657.html
5. http://www.cnblogs.com/TestWorld/p/5681028.html
6. http://www.tuicool.com/articles/If2MVr

Java_025:JAXB 批注 @XmlJavaTypeAdapter 使用例子

@XmlJavaTypeAdapter 适用于一些比较复杂的对象,这时需要包装一下这些对象,然后再使用。

1. DateAdapter.java

package jaxb;

import java.util.Date;
import java.text.SimpleDateFormat;

import javax.xml.bind.annotation.adapters.XmlAdapter;

public class DateAdapter extends XmlAdapter {

    private String pattern = "yyyy-MM-dd HH:mm:ss";
    SimpleDateFormat fmt = new SimpleDateFormat(pattern);

    @Override
    public Date unmarshal(String dateStr) throws Exception {
        return fmt.parse(dateStr);
    }

    @Override
    public String marshal(Date date) throws Exception {
        return fmt.format(date);
    }
}
 

2. Boy.java

package jaxb;

import java.util.Date;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;

@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class Boy {

    @XmlJavaTypeAdapter(DateAdapter.class)
    private Date date = new Date();
    private String name = "CY";

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

3. JAXBTest.java

package jaxb;

import java.io.StringReader;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;

public class JAXBTest {

    public static void main(String[] args) throws JAXBException {
        JAXBContext context = JAXBContext.newInstance(Boy.class);

        Marshaller marshaller = context.createMarshaller();
        Unmarshaller unmarshaller = context.createUnmarshaller();

        Boy boy = new Boy();
        marshaller.marshal(boy, System.out);
        System.out.println();

//        String xml = "David";
//        Boy boy2 = (Boy) unmarshaller.unmarshal(new StringReader(xml));
//        System.out.println(boy2.name);
    }
}

4. 运行结果
生成的 XML 如下:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<boy>
  <name>CY</name>
  <date>2016-07-07 15:02:29</date>
</boy>

参考文献:
1. http://www.cnblogs.com/fragranting/archive/2012/03/25/xml--jaxb.html
2. http://alimama.iteye.com/blog/848895
3. http://www.cnblogs.com/holbrook/archive/2012/12/15/2818833.html

ActiveMQ_033:KahaDB 清扫日志配置

环境:MAC OS X 10.11.5 + ActiveMQ 5.13.3

使用 Kahadb 做持久化时,会生成诸如 db-1.log、db-2.log...文件,这些文件的存在的依据是:
  • 某个 Queue/Topic 包含未决消息,或者是持久化订阅。
  • 包含某个消息的 ack 消息,ack 消息不能移除,否则会导致重新发送原始  message。
  • 有未决事务。
  • 正在执行写入操作。
1. 配置 Kahadb 清扫日志
在 conf/log4j.properties 文件的最后,增加如下内容:

#############
# Kahadb log
#############

log4j.appender.kahadb=org.apache.log4j.RollingFileAppender
log4j.appender.kahadb.file=${activemq.base}/data/kahadb.log
log4j.appender.kahadb.maxFileSize=1024KB
log4j.appender.kahadb.maxBackupIndex=5
log4j.appender.kahadb.append=true
log4j.appender.kahadb.layout=org.apache.log4j.PatternLayout
log4j.appender.kahadb.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
log4j.logger.org.apache.activemq.store.kahadb.MessageDatabase=TRACE, kahadb

这样配置后,重启 ActiveMQ,清扫进程会开始工作。默认会每 5 秒钟会扫描一次数据日志文件(checkpointInterval),每 30 秒进行一次清除(cleanupInterval)。
它会遍历每一个数据文件,遍历每一个 Queue/Topic,检查是否有消息在某个数据文件之中,如果 Queue/Topic 都没有消息在某个文件中,该文件将作为清扫的候选文件。
Queue 用 0 表示,Topic 用 1 表示。
有时,你会发现某个候选的数据文件突然消失了,这是因为有 Queue/Topic 突然引用了它们,或者是 DLQ, 或者是一个离线的持久化订阅。

2. 举例说明

TRACE | Last update: 164:41712, full gc candidates set: [86, 87, 163, 164] | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker
TRACE | gc candidates after first tx:164:41712, [86, 87, 163] | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker
TRACE | gc candidates after dest:0:A, [86, 87, 163] | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker
TRACE | gc candidates after dest:1:B, [86, 87, 163] | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker
TRACE | gc candidates after dest:0:D, [86, 87, 163] | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker
TRACE | gc candidates after dest:0:E, [86, 87] | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker
TRACE | gc candidates after dest:0:H, [86, 87] | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker
TRACE | gc candidates after dest:0:I, [86, 87] | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker
TRACE | gc candidates after dest:0:J, [87] | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker
TRACE | gc candidates: [87] | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker
DEBUG | Cleanup removing the data files: [87] | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker

一开始,有 4 个文件候选:86、87、163、164。接着 164 中有未决事务,Queue E 引用了 163,Queue J 引用了 86,因此最后只有 87 是最终的可以被清扫的数据文件。

参考文献:
1. http://activemq.apache.org/why-do-kahadb-log-files-remain-after-cleanup.html

2016年7月6日星期三

ActiveMQ_032:共享文件存储方式的锁机制

环境:MAC OS X 10.11.5 + ActiveMQ 5.13.3

核心代码是 SharedFileLocker.java 和 LockFile.java,核心方法是 doStart 和 keepAlive。
其原理就是 java 文件锁,只要 ActiveMQ 进程在就锁住该文件,没有类似心跳或每隔一段时间更新 lock 时间的机制。

1. SharedFileLocker.java

package org.apache.activemq.store;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.broker.AbstractLocker;
import org.apache.activemq.util.LockFile;
import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Represents an exclusive lock on a database to avoid multiple brokers running
 * against the same logical database.
 *
 * @org.apache.xbean.XBean element="shared-file-locker"
 *
 */
public class SharedFileLocker extends AbstractLocker {

    public static final File DEFAULT_DIRECTORY = new File("KahaDB");
    private static final Logger LOG = LoggerFactory.getLogger(SharedFileLocker.class);

    private LockFile lockFile;
    protected File directory = DEFAULT_DIRECTORY;

    @Override
    public void doStart() throws Exception {
        if (lockFile == null) {
            File lockFileName = new File(directory, "lock");
            lockFile = new LockFile(lockFileName, false);
            if (failIfLocked) {
                lockFile.lock();
            } else {
                // Print a warning only once
                boolean warned = false;
                boolean locked = false;
                while ((!isStopped()) && (!isStopping())) {
                    try {
                        lockFile.lock();
                        if (warned) {
                            // ensure lockHolder has released; wait for one keepAlive iteration
                            try {
                                TimeUnit.MILLISECONDS.sleep(lockable != null ? lockable.getLockKeepAlivePeriod() : 0l);
                            } catch (InterruptedException e1) {
                            }
                        }
                        locked = keepAlive();
                        break;
                    } catch (IOException e) {
                        if (!warned) {
                            LOG.info("Database "
                                    + lockFileName
                                    + " is locked by another server. This broker is now in slave mode waiting a lock to be acquired");
                            warned = true;
                        }

                        LOG.debug("Database "
                                + lockFileName
                                + " is locked... waiting "
                                + (lockAcquireSleepInterval / 1000)
                                + " seconds for the database to be unlocked. Reason: "
                                + e);
                        try {
                            TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval);
                        } catch (InterruptedException e1) {
                        }
                    }
                }
                if (!locked) {
                    throw new IOException("attempt to obtain lock aborted due to shutdown");
                }
            }
        }
    }


    @Override
    public boolean keepAlive() {
        boolean result = lockFile != null && lockFile.keepAlive();
        LOG.trace("keepAlive result: " + result + (name != null ? ", name: " + name : ""));
        return result;
    }


    @Override
    public void doStop(ServiceStopper stopper) throws Exception {
        if (lockFile != null) {
            lockFile.unlock();
            lockFile = null;
        }
    }

    public File getDirectory() {
        return directory;
    }

    public void setDirectory(File directory) {
        this.directory = directory;
    }

    @Override
    public void configure(PersistenceAdapter persistenceAdapter) throws IOException {
        this.setDirectory(persistenceAdapter.getDirectory());
        if (name == null) {
            name = getDirectory().toString();
        }
    }
}

2. LockFile.java

package org.apache.activemq.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.Date;

/**
 * Used to lock a File.
 *
 * @author chirino
 */
public class LockFile {

    private static final boolean DISABLE_FILE_LOCK = Boolean.getBoolean("java.nio.channels.FileLock.broken");
    final private File file;
    private long lastModified;

    private FileLock lock;
    private RandomAccessFile randomAccessLockFile;
    private int lockCounter;
    private final boolean deleteOnUnlock;
    private volatile boolean locked;
    private String lockSystemPropertyName = "";

    private static final Logger LOG = LoggerFactory.getLogger(LockFile.class);

    public LockFile(File file, boolean deleteOnUnlock) {
        this.file = file;
        this.deleteOnUnlock = deleteOnUnlock;
    }

    /**
     * @throws IOException
     */
    synchronized public void lock() throws IOException {
        if (DISABLE_FILE_LOCK) {
            return;
        }

        if (lockCounter > 0) {
            return;
        }

        IOHelper.mkdirs(file.getParentFile());
        synchronized (LockFile.class) {
            lockSystemPropertyName = getVmLockKey();
            if (System.getProperty(lockSystemPropertyName) != null) {
                throw new IOException("File '" + file + "' could not be locked as lock is already held for this jvm. Value: " + System.getProperty(lockSystemPropertyName));
            }
            System.setProperty(lockSystemPropertyName, new Date().toString());
        }
        try {
            if (lock == null) {
                randomAccessLockFile = new RandomAccessFile(file, "rw");
                IOException reason = null;
                try {
                    lock = randomAccessLockFile.getChannel().tryLock(0, Math.max(1, randomAccessLockFile.getChannel().size()), false);
                } catch (OverlappingFileLockException e) {
                    reason = IOExceptionSupport.create("File '" + file + "' could not be locked.", e);
                } catch (IOException ioe) {
                    reason = ioe;
                }
                if (lock != null) {
                    //track lastModified only if we are able to successfully obtain the lock.
                    randomAccessLockFile.writeLong(System.currentTimeMillis());
                    randomAccessLockFile.getChannel().force(true);
                    lastModified = file.lastModified();
                    lockCounter++;
                    System.setProperty(lockSystemPropertyName, new Date().toString());
                    locked = true;
                } else {
                    // new read file for next attempt
                    closeReadFile();
                    if (reason != null) {
                        throw reason;
                    }
                    throw new IOException("File '" + file + "' could not be locked.");
                }

            }
        } finally {
            synchronized (LockFile.class) {
                if (lock == null) {
                    System.getProperties().remove(lockSystemPropertyName);
                }
            }
        }
    }

    /**
     */
    synchronized public void unlock() {
        if (DISABLE_FILE_LOCK) {
            return;
        }

        lockCounter--;
        if (lockCounter != 0) {
            return;
        }

        // release the lock..
        if (lock != null) {
            try {
                lock.release();
            } catch (Throwable ignore) {
            } finally {
                if (lockSystemPropertyName != null) {
                    System.getProperties().remove(lockSystemPropertyName);
                }
                lock = null;
            }
        }
        closeReadFile();

        if (locked && deleteOnUnlock) {
            file.delete();
        }
    }

    private String getVmLockKey() throws IOException {
        return getClass().getName() + ".lock." + file.getCanonicalPath();
    }

    private void closeReadFile() {
        // close the file.
        if (randomAccessLockFile != null) {
            try {
                randomAccessLockFile.close();
            } catch (Throwable ignore) {
            }
            randomAccessLockFile = null;
        }
    }

    /**
     * @return true if the lock file's last modified does not match the locally
     * cached lastModified, false otherwise
     */
    private boolean hasBeenModified() {
        boolean modified = false;

        //Create a new instance of the File object so we can get the most up to date information on the file.
        File localFile = new File(file.getAbsolutePath());

        if (localFile.exists()) {
            if (localFile.lastModified() != lastModified) {
                LOG.info("Lock file " + file.getAbsolutePath() + ", locked at " + new Date(lastModified) + ", has been modified at " + new Date(localFile.lastModified()));
                modified = true;
            }
        } else {
            //The lock file is missing
            LOG.info("Lock file " + file.getAbsolutePath() + ", does not exist");
            modified = true;
        }

        return modified;
    }

    public boolean keepAlive() {
        locked = locked && lock != null && lock.isValid() && !hasBeenModified();
        return locked;
    }


}

参考文献:
1. https://github.com/apache/activemq/blob/master/activemq-broker/src/main/java/org/apache/activemq/store/SharedFileLocker.java
2. https://github.com/apache/activemq/blob/master/activemq-broker/src/main/java/org/apache/activemq/util/LockFile.java