2014年8月18日星期一

JDG_006:JDG 6.3 Quick Start 例子学习:Clustered Cache

运行环境:JBoss Data Grid 6.3.0

Clustered Cache 是Infinispan Quick Start中的例子,运行在Library mode下。
代码地址:https://github.com/infinispan/infinispan-quickstart/tree/master/clustered-cache。

下载后,进入clustered-cache目录,然后

1. 编译
mvn clean compile dependency:copy-dependencies -DstripVersion

2. 运行在replication模式下
 (1)java -cp "target/classes:target/dependency/*" org.infinispan.quickstart.clusteredcache.Node -r A
 (2)java -cp "target/classes:target/dependency/*" org.infinispan.quickstart.clusteredcache.Node -r B
 (2)java -cp "target/classes:target/dependency/*" org.infinispan.quickstart.clusteredcache.Node -r C

可以继续启动多个节点,各个节点之间彼此全复制所有的缓存项目。


3. 运行在distribution模式下
 (1)java -cp "target/classes:target/dependency/*" org.infinispan.quickstart.clusteredcache.Node -d A
 (2)java -cp "target/classes:target/dependency/*" org.infinispan.quickstart.clusteredcache.Node -d B
 (2)java -cp "target/classes:target/dependency/*" org.infinispan.quickstart.clusteredcache.Node -d C 
可以继续启动多个节点,以分布式的方式在各个节点上存储数据,每个缓存项目数据有2份。

4. Node.java
package org.infinispan.quickstart.clusteredcache;

import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.quickstart.clusteredcache.util.LoggingListener;
import org.infinispan.util.logging.BasicLogFactory;
import org.jboss.logging.BasicLogger;
import org.jboss.logging.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;

public class Node {

    private static final BasicLogger log = Logger.getLogger(Node.class);

    private final boolean useXmlConfig;
    private final String cacheName;
    private final String nodeName;
    private volatile boolean stop = false;

    public Node(boolean useXmlConfig, String cacheName, String nodeName) {
        this.useXmlConfig = useXmlConfig;
        this.cacheName = cacheName;
        this.nodeName = nodeName;
    }

    public static void main(String[] args) throws Exception {
        boolean useXmlConfig = false;
        String cache = "repl";
        String nodeName = null;

        for (String arg : args) {
            if ("-x".equals(arg)) {
                useXmlConfig = true;
            } else if ("-p".equals(arg)) {
                useXmlConfig = false;
            } else if ("-d".equals(arg)) {
                cache = "dist";
            } else if ("-r".equals(arg)) {
                cache = "repl";
            } else {
                nodeName = arg;
            }
        }
        new Node(useXmlConfig, cache, nodeName).run();
    }

    public void run() throws IOException, InterruptedException {
        EmbeddedCacheManager cacheManager = createCacheManager();
        final Cache cache = cacheManager.getCache(cacheName);
        System.out.printf("Cache %s started on %s, cache members are now %s\n", cacheName, cacheManager.getAddress(),
                cache.getAdvancedCache().getRpcManager().getMembers());

        // Add a listener so that we can see the puts to this node
        cache.addListener(new LoggingListener());

        printCacheContents(cache);

        Thread putThread = new Thread() {
            @Override
            public void run() {
                int counter = 0;
                while (!stop) {
                    try {
                        cache.put("key-" + counter, "" + cache.getAdvancedCache().getRpcManager().getAddress() + "-" + counter);
                    } catch (Exception e) {
                        log.warnf("Error inserting key into the cache", e);
                    }
                    counter++;

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            }
        };
        putThread.start();

        System.out.println("Press Enter to print the cache contents, Ctrl+D/Ctrl+Z to stop.");
        while (System.in.read() > 0) {
            printCacheContents(cache);
        }

        stop = true;
        putThread.join();
        cacheManager.stop();
        System.exit(0);
    }

    /**
     * {@link org.infinispan.Cache#entrySet()}
     *
     * @param cache
     */
    private void printCacheContents(Cache cache) {
        System.out.printf("Cache contents on node %s\n", cache.getAdvancedCache().getRpcManager().getAddress());

        ArrayList> entries = new ArrayList>(cache.entrySet());
        Collections.sort(entries, new Comparator>() {
            @Override
            public int compare(Map.Entry o1, Map.Entry o2) {
                return o1.getKey().compareTo(o2.getKey());
            }
        });
        for (Map.Entry e : entries) {
            System.out.printf("\t%s = %s\n", e.getKey(), e.getValue());
        }
        System.out.println();
    }

    private EmbeddedCacheManager createCacheManager() throws IOException {
        if (useXmlConfig) {
            return createCacheManagerFromXml();
        } else {
            return createCacheManagerProgrammatically();
        }
    }

    private EmbeddedCacheManager createCacheManagerProgrammatically() {
        System.out.println("Starting a cache manager with a programmatic configuration");
        DefaultCacheManager cacheManager = new DefaultCacheManager(
                GlobalConfigurationBuilder.defaultClusteredBuilder()
                .transport().nodeName(nodeName).addProperty("configurationFile", "jgroups.xml")
                .build(),
                new ConfigurationBuilder()
                .clustering()
                .cacheMode(CacheMode.REPL_SYNC)
                .build()
        );
        // The only way to get the "repl" cache to be exactly the same as the default cache is to not define it at all
        cacheManager.defineConfiguration("dist", new ConfigurationBuilder()
                .clustering()
                .cacheMode(CacheMode.DIST_SYNC)
                .hash().numOwners(2)
                .build()
        );
        return cacheManager;
    }

    private EmbeddedCacheManager createCacheManagerFromXml() throws IOException {
        System.out.println("Starting a cache manager with an XML configuration");
        System.setProperty("nodeName", nodeName);
        return new DefaultCacheManager("infinispan.xml");
    }

}
 

5. infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd"
    xmlns="urn:infinispan:config:6.0">
    <global>
        <transport nodeName="${nodeName}">
            <properties>
                <property name="configurationFile" value="jgroups.xml"/>
            </properties>
        </transport>
    </global>

    <default>
        <!-- Configure a synchronous replication cache -->
        <clustering mode="replication">
            <sync/>
        </clustering>
    </default>

    <namedCache name="repl">
        <!-- Use the configuration of the default cache as it is -->
    </namedCache>

    <namedCache name="dist">
        <!-- Configure a synchronous distribution cache -->
        <clustering mode="distribution">
            <sync/>
            <hash numOwners="2"/>
        </clustering>
    </namedCache>

</infinispan>

没有评论: