运行环境:JDeveloper 11.1.2.2.0 + Coherence3.7.1 + Oracle Database 10g Express Edition 10.2.0.1。
1. MapListener
Coherence的缓存提供了ObservableMap接口用于监听数据,当数据发生变化时,可以采取相应操作。
监听的方法有三个:
(1)void addMapListener(MapListener listener),监听所有的数据对象。
(2)void addMapListener(MapListener listener, Filter filter, boolean fLite),监听满足Filter的数据对象。
(3)void addMapListener(MapListener listener, Object oKey, boolean fLite),监听指定Key的数据对象。
其中,第三个参数fLite=true,表示在MapEvent中是否包括新值和旧值。
监听的事件类型可以通过MapListener来定义:
namedCache.addMapListener(new MapListener() {
public void entryDeleted(MapEvent mapEvent) {
//TODO... handle deletion event
}
public void entryInserted(MapEvent mapEvent) {
//TODO... handle inserted event
}
public void entryUpdated(MapEvent mapEvent) {
//TODO... handle updated event
}
});
1.1 监听器例子代码
(1)ListenForNewPerson.java
package com.oracle.coherence.handson;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class ListenForNewPerson {
public ListenForNewPerson() {
}
public static void main(String[] args) throws IOException {
// connect to named cache
NamedCache person = CacheFactory.getCache("person");
// listen for insert events on Person
// This can be done in an easier way by using a new AbstractMapListener()
// and then overriding only the method you want to
//
person.addMapListener(new MapListener() {
public void entryDeleted(MapEvent mapEvent) {
// ignore
}
public void entryInserted(MapEvent mapEvent) {
Person p = (Person)mapEvent.getNewValue();
System.out.println("New person added: " + p.getFirstname() + " " + p.getSurname());
}
public void entryUpdated(MapEvent mapEvent) {
// ignore
}
});
System.out.println("waiting for events");
BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
String text = console.readLine();
}
}
说明:为了不让程序进程不退出,这里使用了一个“土招”:监听控制台的输入。
如果没有输入,则一直不退出;如果在控制台输入了字符,则立即退出。
注意,为了能够运行时出现Input输入框,需要在NoLocalStorage Profile中,选中Allow Program Input选项。
(2)ListenForUpdatedPerson.java
package com.oracle.coherence.handson;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.AbstractMapListener;
import com.tangosol.util.MapEvent;
import com.tangosol.util.filter.EqualsFilter;
import com.tangosol.util.filter.MapEventFilter;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class ListenForUpdatedPerson {
public ListenForUpdatedPerson() {
}
public static void main(String[] args) throws IOException {
// connect to named cache
NamedCache person = CacheFactory.getCache("person");
// listen for insert events on Person
// This can be done in an easier way by using a new AbstractMapListener()
// and then overriding only the method you want to
//
person.addMapListener(new AbstractMapListener() {
public void entryUpdated(MapEvent mapEvent) {
Person oldPerson = (Person)mapEvent.getOldValue();
Person newPerson = (Person)mapEvent.getNewValue();
// better to implement toString() on the person object to display it.. :)
System.out.println("Old person is " + oldPerson.getFirstname() + " " + oldPerson.getSurname());
System.out.println("New person is " + newPerson.getFirstname() + " " + newPerson.getSurname());
}
}, new MapEventFilter(MapEventFilter.E_UPDATED, new EqualsFilter("getGender", Person.MALE)),
false) // not a lite event
;
System.out.println("waiting for events");
BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
String text = console.readLine();
}
}
(3)PersonEventTester.java
package com.oracle.coherence.handson;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
public class PersonEventTester {
public PersonEventTester() {
}
public static void main(String[] args) {
NamedCache person = CacheFactory.getCache("person");
Person p1 = new Person(1, "Middleton", "Tim", "Level 2, 66 Kings Park Road, West Perth", 39, Person.MALE);
System.out.println("put person");
person.put(p1.getId(), p1);
Person p2 = (Person)person.get(p1.getId());
p2.setFirstname("Timothy");
System.out.println("Update person");
person.put(p2.getId(), p2);
}
}
1.2 运行监听器例子
(1)选择运行DefaultCacheServer Profile,作为存储数据的Cache。
(2)选择NoLocalStorage Profile,并运行ListenForNewPerson。
(3)选择NoLocalStorage Profile,并运行ListenForUpdatedPerson。
(4)选择NoLocalStorage Profile,并运行PersonEventTester。
2. MapTrigger
MapListener与MapTrigger的区别在于:MapListener是事前行为,即数据发生变化之后的操作。而MapListener是事前行为,即数据发生变化之前的操作。
比如我们可以使用触发器来自动把小写改成大写。
2.1 触发器器例子代码
(1)UppercaseMapTrigger.java
package com.oracle.coherence.handson;
import com.tangosol.util.MapTrigger;
public class UppercaseMapTrigger implements MapTrigger {
public UppercaseMapTrigger() {
}
public void process(MapTrigger.Entry entry) {
Person person = (Person)entry.getValue();
String sName = person.getSurname();
String sNameUC = sName.toUpperCase();
if (!sNameUC.equals(sName)) {
person.setSurname(sNameUC);
System.out.println("Changed last name of [" + sName + "] to [" + person.getSurname() + "]");
entry.setValue(person);
}
}
// ---- hashCode() and equals() must be implemented
public boolean equals(Object o) {
return o != null && o.getClass() == this.getClass();
}
public int hashCode() {
return getClass().getName().hashCode();
}
}
(2)RunMapTrigger.java
package com.oracle.coherence.handson;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.MapTrigger;
import com.tangosol.util.MapTriggerListener;
public class RunMapTrigger {
public RunMapTrigger() {
}
public static void main(String[] args) {
NamedCache person = CacheFactory.getCache("person");
MapTrigger trigger = new UppercaseMapTrigger();
person.addMapListener(new MapTriggerListener(trigger));
Person p1 = new Person(1, "jones", "Tom", "Address1", 60, Person.MALE);
person.put(p1.getId(), p1);
Person p2 = (Person)person.get(p1.getId());
System.out.println("Person id = " + p2.getId() + ", surname= " + p2.getSurname());
}
}
2.2 运行触发器例子
(1)选择运行DefaultCacheServer Profile,作为存储数据的Cache。
(2)选择NoLocalStorage Profile,并运行UppercaseMapTrigger。
(3)选择NoLocalStorage Profile,并运行RunMapTrigger。
3. 一个简单的聊天室例子
通过MapListener,我们可以实现一个简单的聊天室。
基本设计如下:
(1)创建一个Cache,用来监听哪些用户进入或离开了聊天室。
(2)创建另一个Cache,用来监听用户输入的信息。
当然,这样实现的聊天室是一个公共的聊天室,每个人的发言都可以被聊天室中的所有人看到。
你可以在这个例子的基础上,增加聊天用户列表和私聊的功能。
3.1 聊天室代码
(1)ChatMessage.java
package com.oracle.coherence.handson.chat;
import java.io.Serializable;
public class ChatMessage implements Serializable {
private String from;
private long entryTime;
private String message;
public ChatMessage() {
}
public ChatMessage(String from, String message) {
this.from = from;
this.message = message;
this.entryTime = System.currentTimeMillis();
}
public void setFrom(String from) {
this.from = from;
}
public String getFrom() {
return from;
}
public void setEntryTime(long entryTime) {
this.entryTime = entryTime;
}
public long getEntryTime() {
return entryTime;
}
public void setMessage(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
}
(2)ChatClient.java
package com.oracle.coherence.handson.chat;
import com.tangosol.net.*;
import java.io.*;
import com.tangosol.util.UUID;
import com.tangosol.util.filter.*;
import com.tangosol.util.MapEvent;
import com.tangosol.util.AbstractMapListener;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
public class ChatClient {
public static void main(String[] args) {
String userName = null;
String message;
NamedCache chatmembers = null;
try {
System.out.println("Welcome to the Coherence Chat Client");
System.out.println("------------------------------------");
BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
System.out.print("User Name:");
userName = console.readLine();
// join the chatroom named cache as storage enabled = true
NamedCache cache = CacheFactory.getCache("chatroom");
chatmembers = CacheFactory.getCache("chatmembers");
chatmembers.put(userName, userName);
// register a listener to display the messages
cache.addMapListener(new AbstractMapListener() {
public void entryInserted(MapEvent event) {
ChatMessage msg = (ChatMessage)event.getNewValue();
System.out.println("From: " + msg.getFrom());
System.out.println("Time: " + new Date(msg.getEntryTime()));
System.out.println("Mesg: " + msg.getMessage());
System.out.println();
}
}, new MapEventFilter(MapEvent.ENTRY_INSERTED, new NotEqualsFilter("getFrom", userName)), false);
chatmembers.addMapListener(new AbstractMapListener() {
public void entryDeleted(MapEvent event) {
String who = (String)event.getOldValue();
System.out.println(who + " has left the chat");
}
public void entryInserted(MapEvent event) {
String who = (String)event.getNewValue();
System.out.println(who + " has entered the chat");
}
});
do {
System.out.print("\nEnter message or bye to quit: ");
message = console.readLine();
if ("bye".equals(message))
break;
// else add this to the chat
else if ("help".equals(message)) {
System.out.println("HELP:");
System.out.println("bye - quit");
System.out.println("who - list of users in the chat\n");
} else if ("who".equals(message)) {
System.out.println("Current chat memebers");
System.out.println("=====================");
Set s = chatmembers.entrySet();
for (Iterator
Map.Entry entry = entries.next();
String member = (String)entry.getValue();
System.out.println(member);
}
} else {
cache.put(new UUID(), new ChatMessage(userName, message));
}
} while (true);
System.out.println("Bye");
} catch (Exception e) {
e.printStackTrace();
} finally {
chatmembers.remove(userName);
}
}
}
3.2 运行聊天室例子
(1)选择运行DefaultCacheServer Profile,作为存储数据的Cache。
(2)选择NoLocalStorage Profile,并运行ChatClient。
(3)选择NoLocalStorage Profile,再启动一个ChatClient。
(4)分别在第1和第2个Console输入姓名,然后输入聊天内容,开始聊天。
Project 下载:
CoherenceApp(Lab6).7z