运行环境:JDeveloper 11.1.2.2.0 + Coherence3.7.1 + Oracle Database 10g Express Edition 10.2.0.1。
在前面的实验中,我们感受了Coherence的缓存特性,在本实验中我们将体验Coherence的并行计算特性。
EntryProcessor是执行计算的代理,它可以对每个Entry执行诸如增加、修改、删除以及计算的操作。
如果同一个Entry上有多个EntryProcessors等待执行,那么其上的操作将排队执行。
Coherence的缓存实现了InvocableMap Interface,包含如下方法:
(1)Object invoke(Object oKey, InvocableMap.EntryProcessor processor),针对某个Key执行Entry Processor,并返回结果。
(2)Map invokeAll(Collection keys, InvocableMap.EntryProcessor processor),针对集合中的Key执行Entry Processor,并返回结果。
(3)Map invokeAll(Filter filter, InvocableMap.EntryProcessor processor),针对满足Filter的Entries执行Entry Processor,并返回结果。
1. 代码
(1)Employee.java 可序列化的员工对象
package com.oracle.coherence.handson;
import com.tangosol.io.ExternalizableLite;
import com.tangosol.util.ExternalizableHelper;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigDecimal;
public class Employee implements ExternalizableLite {
private int empId;
private String surname;
private String firstname;
private double salary;
public Employee() {
}
public Employee(int empId1, String surname1, String firstname1,
double salary1) {
super();
this.empId = empId1;
this.surname = surname1;
this.firstname = firstname1;
this.salary = salary1;
}
public void setEmpId(int param) {
this.empId = param;
}
public int getEmpId() {
return empId;
}
public void setSurname(String param) {
this.surname = param;
}
public String getSurname() {
return surname;
}
public void setFirstname(String param) {
this.firstname = param;
}
public String getFirstname() {
return firstname;
}
public void setSalary(double param) {
this.salary = param;
}
public double getSalary() {
return salary;
}
@Override
public boolean equals(Object object) {
if (this == object) {
return true;
}
if (!(object instanceof Employee)) {
return false;
}
final Employee other = (Employee)object;
if (empId != other.empId) {
return false;
}
if (!(surname == null ? other.surname == null : surname.equals(other.surname))) {
return false;
}
if (!(firstname == null ? other.firstname == null : firstname.equals(other.firstname))) {
return false;
}
if (Double.compare(salary, other.salary) != 0) {
return false;
}
return true;
}
@Override
public int hashCode() {
final int PRIME = 37;
int result = 1;
result = PRIME * result + ((surname == null) ? 0 : surname.hashCode());
result = PRIME * result + ((firstname == null) ? 0 : firstname.hashCode());
long temp = Double.doubleToLongBits(salary);
result = PRIME * result + (int) (temp ^ (temp >>> 32));
return result;
}
public void readExternal(DataInput dataInput) throws IOException {
this.empId = ExternalizableHelper.readInt(dataInput);
this.surname = ExternalizableHelper.readSafeUTF(dataInput);
this.firstname = ExternalizableHelper.readSafeUTF(dataInput);
this.salary = ExternalizableHelper.readBigDecimal(dataInput).doubleValue();
}
public void writeExternal(DataOutput dataOutput) throws IOException {
ExternalizableHelper.writeInt(dataOutput, this.empId);
ExternalizableHelper.writeSafeUTF(dataOutput, this.surname);
ExternalizableHelper.writeSafeUTF(dataOutput, this.firstname);
ExternalizableHelper.writeBigDecimal(dataOutput, new BigDecimal(this.salary));
}
}
(2)RaiseSalary.java 为员工涨10%的Processor操作
package com.oracle.coherence.handson;
import com.tangosol.util.processor.AbstractProcessor;
import com.tangosol.util.InvocableMap.Entry;
import java.util.Map;
public class RaiseSalary extends AbstractProcessor {
public RaiseSalary() {
}
public Object process(Entry entry ) {
Employee emp = (Employee)entry.getValue();
emp.setSalary(emp.getSalary() * 1.10);
entry.setValue(emp);
return null;
}
}
(3)InvokeTest.java 执行RaiseSalary Processor
package com.oracle.coherence.handson;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.filter.AlwaysFilter;
public class InvokeTest {
public InvokeTest() {
}
public static void main(String[] args) {
NamedCache empCache = CacheFactory.getCache("employees");
Employee e1 = new Employee(1,"Middleton","Tim",5000);
empCache.put(e1.getEmpId(), e1);
Employee e2 = new Employee(2,"Jones","Chris",10000);
empCache.put(e2.getEmpId(), e2);
empCache.invokeAll(AlwaysFilter.INSTANCE, new RaiseSalary());
e1 = (Employee)empCache.get(e1.getEmpId());
e2 = (Employee)empCache.get(e2.getEmpId());
System.out.println("Salary for emp 1 is now: " + e1.getSalary());
System.out.println("Salary for emp 2 is now: " + e2.getSalary());
}
}
(4)SayHelloProcessor.java 查看数据到底是保存在哪个Cache节点上
package com.oracle.coherence.handson;
import com.tangosol.util.InvocableMap.Entry;
import com.tangosol.util.processor.AbstractProcessor;
public class SayHelloProcessor extends AbstractProcessor {
public SayHelloProcessor() {
}
public Object process(Entry entry) {
Employee emp = (Employee)entry.getValue();
System.out.println("\nHello from " + emp.getFirstname() + " " + emp.getSurname() + "\n");
return null;
}
}
(5)WhereAreMyEmployees.java 执行SayHelloProcessor
package com.oracle.coherence.handson;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.filter.AlwaysFilter;
public class WhereAreMyEmployees {
public WhereAreMyEmployees() {
}
public static void main(String[] args) {
NamedCache empCache = CacheFactory.getCache("employees");
empCache.invokeAll(AlwaysFilter.INSTANCE, new SayHelloProcessor());
}
}
2. 运行
(1)选择运行DefaultCacheServer Profile,作为存储数据的Cache。
(2)选择NoLocalStorage Profile,并运行InvokeTest,会发现所有员工的工资涨了10%。
(3)选择运行DefaultCacheServer Profile,再增加一个Cache节点。
(4)选择NoLocalStorage Profile,并运行WhereAreMyEmployees,会发现在两个DefaultCacheServer Console中都有员工信息的输出。
Project 下载:CoherenceApp(Lab7).7z
没有评论:
发表评论