2012年9月24日星期一

Coherence_005:Coherence入门指南之五:查询与统计

运行环境:JDeveloper 11.1.2.2.0 + Coherence3.7.1 + Oracle Database 10g Express Edition 10.2.0.1。

1. 查询
Coherence的缓存提供了QueryMap Interface用于查询,它提供了如下方法:
(1)Set entrySet(Filter filter), 返回Map中满足Filter条件的Entries。
(2)addIndex(ValueExtractor extractor, boolean fOrdered, Comparator comparator) ,增加索引。
被索引的字段不会被序列化,因此可以提高访问速度。
(3)Set keySet(Filter filter) ,与(1)类似,只不过返回的是Keys,而不是values。
说明:Filter是作用在Cache Entry Owner上的,即作用在主分区上,这意味着可以Filter操作并行运行在多个Coherence缓存节点。

2. 统计
Coherence的缓存提供了aggregate方法用于统计,它提供了如下方法:
(1)Object aggregate(Collection keys, InvocableMap.entryAggregator agg),根据集合中的Keys做统计。
(2)Object aggregate(Filter filter, InvocableMap.entryAggregator agg),针对满足Filter条件的Entries做统计。
本实验使用的是(2)。
说明:aggregate操作可以并行运行在多个Coherence缓存节点。

3. 代码
(1)PopulatePeople.java,向Cache中随机存储1000个People对象(不同的年龄、性别)。

package com.oracle.coherence.handson;


import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.extractor.ReflectionExtractor;

import java.io.IOException;

import java.util.Random;


public class PopulatePeople {
    public PopulatePeople() {
    }

    public static void main(String[] args) throws IOException {

        System.setProperty("tangosol.coherence.distributed.localstorage", "false");
        NamedCache person = CacheFactory.getCache("person");

        // add indexes
        person.addIndex(new ReflectionExtractor("getGender"), true, null);
        person.addIndex(new ReflectionExtractor("getAgeDouble"), false, null);

        Random generator = new Random();

        for (int i = 1; i <= 10000; i++) {
            Person p =
                new Person(i, "Surname" + i, "Firstname" + i, "Address" + i, generator.nextInt(100) + 1, (generator.nextInt(2) ==
                                                                                                          1 ?
                                                                                                          Person.FEMALE :
                                                                                                          Person.MALE));
            person.put(p.getId(), p);

        }

        System.out.println(person.entrySet().size());

    }
}

(2)QueryExample.java

package com.oracle.coherence.handson;


import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.filter.AndFilter;
import com.tangosol.util.filter.EqualsFilter;
import com.tangosol.util.filter.GreaterEqualsFilter;

import java.util.Set;

public class QueryExample {
    public QueryExample() {
    }

    public static void main(String[] args) {

        NamedCache person = CacheFactory.getCache("person");

        // get a set of all males
        Set males = person.entrySet(new EqualsFilter("getGender", Person.MALE));

        Set malesOver35 =
            person.entrySet(new AndFilter(new EqualsFilter("getGender", Person.MALE), new GreaterEqualsFilter("getAge",
                                                                                                              35)));

        System.out.println("Total number of males is " + males.size());
        System.out.println("Total number of males > 35 " + malesOver35.size());
    }
}

(3)AggregationExample.java

package com.oracle.coherence.handson;


import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.Filter;
import com.tangosol.util.aggregator.DoubleAverage;
import com.tangosol.util.aggregator.DoubleMax;
import com.tangosol.util.filter.AlwaysFilter;
import com.tangosol.util.filter.EqualsFilter;

public class AggregationExample {
    public AggregationExample() {
    }

    public static void main(String[] args) {
        Double averageAgeMales = null;
        Double averageAgeFemales = null;
        Double maxAge = null;
        Double averageAge = null;
        int max = 100;

        NamedCache person = CacheFactory.getCache("person");

        // create a new query

        averageAgeMales =
                (Double)person.aggregate(new EqualsFilter("getGender", Person.MALE), new DoubleAverage("getAge"));

        averageAgeFemales =
                (Double)person.aggregate(new EqualsFilter("getGender", Person.FEMALE), new DoubleAverage("getAge"));

        maxAge = (Double)person.aggregate((Filter)null, // new AlwaysFilter()
                    new DoubleMax("getAge"));

        averageAge = (Double)person.aggregate(new AlwaysFilter(), new DoubleAverage("getAge"));


        System.out.println("Average age of males is " + averageAgeMales);
        System.out.println("Average age of females is " + averageAgeFemales);
        System.out.println("Max age is " + maxAge);
        System.out.println("Average age  " + averageAge);
    }
}

说明:如果想要Filter所有对象,可以使用new AlwaysFilter(),或让Filter=null。

4. 运行
(1)选择运行DefaultCacheServer Profile,作为存储数据的Cache。
(2)选择NoLocalStorage Profile,并运行PopulatePerson。
(3)选择NoLocalStorage Profile,并运行 QueryExample 。
(4)选择NoLocalStorage Profile,并运行 AggregationExample。
(5)选择 DefaultCacheServer Profile,再启动一个Cache节点。
(6)选择NoLocalStorage Profile,再次运行 AggregationExample,统计时间应该快了很多。
注意,如果你的机器是双核CPU,运行(6)才能看出变化比较明显。

Project 下载:CoherenceApp(Lab5).7z

没有评论: