2012年9月24日星期一

Coherence_007:Coherence入门指南之七:并行计算

运行环境:JDeveloper 11.1.2.2.0 + Coherence3.7.1 + Oracle Database 10g Express Edition 10.2.0.1。

在前面的实验中,我们感受了Coherence的缓存特性,在本实验中我们将体验Coherence的并行计算特性。

EntryProcessor是执行计算的代理,它可以对每个Entry执行诸如增加、修改、删除以及计算的操作。
如果同一个Entry上有多个EntryProcessors等待执行,那么其上的操作将排队执行。
Coherence的缓存实现了InvocableMap Interface,包含如下方法:
(1)Object invoke(Object oKey, InvocableMap.EntryProcessor processor),针对某个Key执行Entry Processor,并返回结果。
(2)Map invokeAll(Collection keys, InvocableMap.EntryProcessor processor),针对集合中的Key执行Entry Processor,并返回结果。
(3)Map invokeAll(Filter filter, InvocableMap.EntryProcessor processor),针对满足Filter的Entries执行Entry Processor,并返回结果。

1. 代码
(1)Employee.java 可序列化的员工对象
package com.oracle.coherence.handson;

import com.tangosol.io.ExternalizableLite;
import com.tangosol.util.ExternalizableHelper;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import java.math.BigDecimal;


public class Employee implements  ExternalizableLite {
    private int empId;
    private String surname;
    private String firstname;
    private double salary;
 
     
    public Employee() {
    }

    public Employee(int empId1, String surname1, String firstname1,
                    double salary1) {
        super();
        this.empId = empId1;
        this.surname = surname1;
        this.firstname = firstname1;
        this.salary = salary1;
    }

    public void setEmpId(int param) {
        this.empId = param;
    }

    public int getEmpId() {
        return empId;
    }

    public void setSurname(String param) {
        this.surname = param;
    }

    public String getSurname() {
        return surname;
    }

    public void setFirstname(String param) {
        this.firstname = param;
    }

    public String getFirstname() {
        return firstname;
    }

    public void setSalary(double param) {
        this.salary = param;
    }

    public double getSalary() {
        return salary;
    }

    @Override
    public boolean equals(Object object) {
        if (this == object) {
            return true;
        }
        if (!(object instanceof Employee)) {
            return false;
        }
        final Employee other = (Employee)object;
        if (empId != other.empId) {
            return false;
        }
        if (!(surname == null ? other.surname == null : surname.equals(other.surname))) {
            return false;
        }
        if (!(firstname == null ? other.firstname == null : firstname.equals(other.firstname))) {
            return false;
        }
        if (Double.compare(salary, other.salary) != 0) {
            return false;
        }
        return true;
    }

    @Override
    public int hashCode() {
        final int PRIME = 37;
        int result = 1;
        result = PRIME * result + ((surname == null) ? 0 : surname.hashCode());
        result = PRIME * result + ((firstname == null) ? 0 : firstname.hashCode());
        long temp = Double.doubleToLongBits(salary);
        result = PRIME * result + (int) (temp ^ (temp >>> 32));
        return result;
    }
 
    public void readExternal(DataInput dataInput) throws IOException {
        this.empId = ExternalizableHelper.readInt(dataInput);  
        this.surname = ExternalizableHelper.readSafeUTF(dataInput);
        this.firstname = ExternalizableHelper.readSafeUTF(dataInput);
        this.salary  = ExternalizableHelper.readBigDecimal(dataInput).doubleValue();

    }

    public void writeExternal(DataOutput dataOutput) throws IOException {
        ExternalizableHelper.writeInt(dataOutput, this.empId);
        ExternalizableHelper.writeSafeUTF(dataOutput, this.surname);
        ExternalizableHelper.writeSafeUTF(dataOutput, this.firstname);
        ExternalizableHelper.writeBigDecimal(dataOutput, new BigDecimal(this.salary));
     
    }
}

(2)RaiseSalary.java 为员工涨10%的Processor操作

package com.oracle.coherence.handson;

import com.tangosol.util.processor.AbstractProcessor;
import com.tangosol.util.InvocableMap.Entry;

import java.util.Map;

public class RaiseSalary extends AbstractProcessor {
    public RaiseSalary() {
    }
 
    public Object process(Entry entry ) {
        Employee emp = (Employee)entry.getValue();
        emp.setSalary(emp.getSalary() * 1.10);
        entry.setValue(emp);
        return null;
    }
}

(3)InvokeTest.java 执行RaiseSalary Processor

package com.oracle.coherence.handson;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.filter.AlwaysFilter;

public class InvokeTest {
    public InvokeTest() {
    }

    public static void main(String[] args) {
     
        NamedCache empCache = CacheFactory.getCache("employees");
     
        Employee e1 = new Employee(1,"Middleton","Tim",5000);
        empCache.put(e1.getEmpId(), e1);
     
        Employee e2 = new Employee(2,"Jones","Chris",10000);
        empCache.put(e2.getEmpId(), e2);
     
        empCache.invokeAll(AlwaysFilter.INSTANCE, new RaiseSalary());
     
        e1 = (Employee)empCache.get(e1.getEmpId());
        e2 = (Employee)empCache.get(e2.getEmpId());
     
        System.out.println("Salary for emp 1 is now: " + e1.getSalary());
        System.out.println("Salary for emp 2 is now: " + e2.getSalary());
     
    }
}

(4)SayHelloProcessor.java 查看数据到底是保存在哪个Cache节点上
package com.oracle.coherence.handson;

import com.tangosol.util.InvocableMap.Entry;
import com.tangosol.util.processor.AbstractProcessor;

public class SayHelloProcessor extends AbstractProcessor {
    public SayHelloProcessor() {

    }

    public Object process(Entry entry) {
        Employee emp = (Employee)entry.getValue();
        System.out.println("\nHello from " + emp.getFirstname() + " " + emp.getSurname() + "\n");
        return null;
    }
}

(5)WhereAreMyEmployees.java 执行SayHelloProcessor
package com.oracle.coherence.handson;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.filter.AlwaysFilter;

public class WhereAreMyEmployees {
    public WhereAreMyEmployees() {
    }

    public static void main(String[] args) {

        NamedCache empCache = CacheFactory.getCache("employees");

        empCache.invokeAll(AlwaysFilter.INSTANCE, new SayHelloProcessor());
    }
}

2. 运行
(1)选择运行DefaultCacheServer Profile,作为存储数据的Cache。
(2)选择NoLocalStorage Profile,并运行InvokeTest,会发现所有员工的工资涨了10%。
(3)选择运行DefaultCacheServer Profile,再增加一个Cache节点。
(4)选择NoLocalStorage Profile,并运行WhereAreMyEmployees,会发现在两个DefaultCacheServer Console中都有员工信息的输出。

Project 下载:CoherenceApp(Lab7).7z

没有评论: