Category Archives: Java

Java

Simple Key-Value Java Client for Riak

Riak 2.x’s new client is powerful, but can be pretty overwhelming if all one wants is to get(), put(), and delete() key/value pairs from a distributed store. This article covers a special, but useful, case of Riak usage and the code and configuration required to achieve it. I hope it saves someone all the time it took me to work it out (thanks to engineers at Basho for their comments.)

The first thing is to establish the requirements for our store and the configuration we’ll need to achieve it. For the sake of this example, I am assuming our default bucket setup is 3 replicas (n_val=3) and will achieve consistency-by-writes (w=all). Note that you can choose any value for w, including 1. Because read-repair is triggered after a value is returned to the client, we will usually want to use a value for r that is >1 (r=2) in order to make sure we don’t get a stale value and no chance to reconcile conflicts.

Besides n_val, w, and r, there are a few more bucket and global settings we need to adjust:

  • We do not want Riak to generate “siblings”, so we need to set ‘allow_mult = false’; this lets Riak resolve conflicts for us (see next item)
  • We do want Riak to use ‘causal context’ (a combination of vector clocks and time stamps) to reconcile inconsistent values, so we set ‘last_write_wins = false’ to prevent timestamps being the only criteria
  • Because we don’t want a deletion to mask the existence of a value, we have to turn off this optimization: ‘notfound_ok = false’
  • As a result of setting ‘notfound_ok = false’, we can alleviate the resulting performance hit with ‘basic_quorum = true’; this prevents the need to wait for all replicas if a deletion is found first

The default bucket properties in riak.config would like like this:

# Default bucket properties
buckets.default.n_val =  3
buckets.default.r = 2
buckets.default.w = 3
buckets.default.allow_mult = false
buckets.default.last_write_wins = false
buckets.default.notfound_ok = false
buckets.default.basic_quorum = true

Finally, in order to prevent certain edge cases causing spooky resurrection of deleted keys, we will set the delete_mode to immediate in advanced.config

[
  {riak_kv,
    [
       %% Delete mode
        {delete_mode, immediate},
       %% Dotted version vectors 
        {dvv_enabled, true}
    ]
  }
].

The ‘dotted version vectors’ are a nice-to-have but don’t affect this article. They are simply an enhancement of the original vector clocks used by older versions of Riak.

OK, enough prolog, let’s get to the code. Here, with proper handling of causal context, updates and deletes, as well as efficiency when not actually fetching values, is a RiakKVClient:

public class RiakKVClient 
{
 
    /**
     * Constructor. Use RiakProvider.getStoreClient(name) instead.
     * @param name bucket name
     * @param client low-level Riak Java client instance
     */
    protected RiakKVClient(final String name, final com.basho.riak.client.api.RiakClient client)
    {
        this.name = name;
        this.namespace = new Namespace(name);
        this.client = client;
    }
 
    /**
     * Get a value associated with a key
     * @param key
     * @return the value associated with the given key
     * @throws IOException
     */
    public byte[] get(final String key) throws IOException {
        if (key == null)
        {
            throw new IllegalArgumentException("Key is required");
        }
 
        try {
            final FetchValue.Response response = fetchValue(key);
            if (response.isNotFound())
            {
                return null;
            }
            final RiakObject riakObject = response.getValue(RiakObject.class);
            return riakObject.getValue().getValue();
        } catch (ExecutionException e) {
            throw new IOException("Riak failed to retrieve object from bucket: " + name + " with key: " + key, e);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return null;
    }
 
    /**
     * Insert a value associated with a key. If a value already exists, update it.
     * @param key the key
     * @param value the value to store
     * @throws IOException
     */
    public void put(final String key, final byte[] value) throws IOException {
        if (key == null || value == null) {
            throw new IllegalArgumentException("All parameters are required");
        }
 
        try {
            // fetch in order to get the causal context
            final FetchValue.Response response = fetchMetadata(key);
            final RiakObject storeObject = new RiakObject().setValue(BinaryValue.create(value)).setContentType("binary/octet-stream");
            StoreValue.Builder builder = new StoreValue.Builder(storeObject).withLocation(new Location(namespace, key));
            final VClock vectorClock = response.getVectorClock();
            if (vectorClock != null) {
                builder = builder.withVectorClock(vectorClock);
            }
            final StoreValue storeValue = builder.build();
            client.execute(storeValue);
        } catch (ExecutionException e) {
            throw new IOException("Riak failed to store object in bucket: " + name + " with key: " + key, e);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
 
    /**
     * Delete the value associated with the given key.
     * @param key the key to delete
     * @returns true if the value existed; false if o/w (and method will have no effect)
     * @throws IOException
     */
    public boolean delete(final String key) throws IOException {
        if (key == null)
        {
            throw new IllegalArgumentException("Key is required");
        }
 
        try {
            // fetch in order to get the causal context
            final FetchValue.Response response = fetchMetadata(key);
            if (response.isNotFound())
            {
                return false;
            }
            DeleteValue.Builder builder = new DeleteValue.Builder(new Location(namespace, key));
            final VClock vectorClock = response.getVectorClock();
            if (vectorClock != null) {
                builder = builder.withVClock(vectorClock);
            }
            final DeleteValue deleteValue = builder.build();
            client.execute(deleteValue);
            return !response.isNotFound() || !response.hasValues();
        } catch (ExecutionException e) {
            throw new IOException("Riak failed to store object in bucket: " + name + " with key: " + key, e);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return false;
    }
 
    private FetchValue.Response fetchMetadata(final String key) throws ExecutionException, InterruptedException
    {
        return fetchResponse(key, true);
    }
 
    private FetchValue.Response fetchValue(final String key) throws ExecutionException, InterruptedException
    {
        return fetchResponse(key, false);
    }
 
    private FetchValue.Response fetchResponse(final String key, boolean headOnly) throws ExecutionException, InterruptedException {
        Location loc = new Location(namespace, key);
        FetchValue.Builder builder = new FetchValue.Builder(loc);
        if (headOnly) {
            builder.withOption(FetchValue.Option.HEAD, true);
        }
        FetchValue fetch = builder.build();
        return client.execute(fetch);
 
    }
 
    private final Namespace namespace;
    private final String name;
    private final com.basho.riak.client.api.RiakClient client;
}

The only thing missing is the RiakProvider class for creating instances of RiakKVClients. Also, the code may be difficult to read given the formatting of this blog, so here’s the full Gist

If you have questions on why something is the way it is, please leave a non-anonymous comment.

References:

Understanding Riak’s Configurable Behaviors: Part 4

A Distributed Reentrant Read-Write Lock Using a Hazelcast Data Grid

Most Java programmers are familiar with the java.util.concurrent package and some of the handy things in it like ReentrantReadWriteLock. To recap, a ReadWriteLock solves the “Readers-Writers Problem” in computer science.

A read-write lock allows for a greater level of concurrency in accessing shared data than that permitted by a mutual exclusion lock. It exploits the fact that while only a single thread at a time (a writer thread) can modify the shared data, in many cases any number of threads can concurrently read the data (hence reader threads). In theory, the increase in concurrency permitted by the use of a read-write lock will lead to performance improvements over the use of a mutual exclusion lock. – JavaDocs, JSE 7

Additionally, a ReentrantReadWriteLock, allows any thread to acquire the same lock more than once.

This lock allows both readers and writers to reacquire read or write locks in the style of a ReentrantLock. Non-reentrant readers are not allowed until all write locks held by the writing thread have been released.

Additionally, a writer can acquire the read lock, but not vice-versa. Among other applications, reentrancy can be useful when write locks are held during calls or callbacks to methods that perform reads under read locks. If a reader tries to acquire the write lock it will never succeed.- JavaDocs, JSE 7

It would be nice to have the same semantics available to control concurrent access to resources in a distributed application, but the only implementations I’m aware of are heavy-weight (.e.g. Apache Zookeeper Shared Reentrant Read Write Lock Recipe.)

Hazelcast’s distributed in-memory data grid, on the other hand, is lightweight and easy to use. Drop a jar file into your application and off you go. Hazelcast includes distributed implementations of Maps, Lists, Queues, etc. as well as Semaphores, Locks and AtomicLongs. It seems we should be able to implement the synchronization we want using some of these distributed collections and concurrency primitives.

This blog post provides a good starting point for how to implement a ReadWrite lock using two semaphores. Though it explains the concept simply, it has two deficiencies. First, writers may starve while readers hog the lock. Second, it isn’t re-entrant. The first problem can be solved, as the blog author notes, using a third semaphore. This article (PDF) illustrates how to solve the “writers-preference” problem using a third semaphore. To make it work, though, we’ll have to replace those counters with distributed AtomicLongs.

That’s great for a non-reentrant ReadWrite lock, but what about a reentrant one? An algorithm to prevent deadlock and allow strongly reentrant usage (writers can acquire nested read locks) is explained in “Reentrant Readers-Writers” (PDF). It requires the use of a monitor–which in Java means a Lock and associated Condition–and involves keeping a per-thread count of nested locks. For the latter, I stash the count in a ThreadLocal. There are a couple of other niggly bits involving a distributed counter and a distributed boolean (which we’ll fake with an AtomicLong.)

The end result is two types of distributed locks, implemented using Hazelcast’s ISemaphore, ILock, ICondition and IAtomicLong. The only further complication is the desire to abstract the grid implementation being used (mostly useful for testing, since I know of no other grid technology that provides the data structures required here.) I use a DistributedDataStructureFactory and a DistributedLockFactory to solve those problems, as well as some helper interfaces and wrapper classes to compensate for the fact that in java.util.concurrent, Semaphore and AtomicLong are concrete classes.

Assuming you have a HazelcastInstance, usage is identical to usage of java.util.concurrent.locks.ReentrantReadWriteLock, with the exception of creation of a new lock instance.

// This can be a singleton, but additional instances aren't a problem.
DistributedLockFactory lockFactory =
    new DistributedLockFactory(new HazelcastDataStructureFactory(hazelcastInstance));
 
ReadWriteLock lock = lockFactory.getReentrantReadWriteLock("myLock");
lock.readLock().lock();
try {
    // do some stuff
}
finally {
    lock.readLock.unlock();
}

The full package, with both types of locks, helper classes, unit and integration tests has been released under the Apache 2.0 license by kind permission of my employer ThoughtWire Corporation. You can find it on GitHub here: https://github.com/ThoughtWire/hazelcast-locks

 
Update: June 13, 2014.
Shortly after releasing the first version of this package, I discovered an additional wrinkle, namely that each lock operation must deal very carefully with thread interruption so as not to leave any data structures in a state which could lead to deadlock of other threads or nodes. Specifically, all operations that call blocking methods (such as Semaphore.acquire() or Condition.await()) must catch any thrown InterruptedException and restore the lock’s original state before setting the thread’s interrupted status and returning. In practice, this is quite messy to do (!) and a worthy improvement would be to find a way to tidy it up. For the gory details on proper handling of task cancellation, see Goetz et al, “Java Concurrency in Practice, 2nd edition” (specifically, Chapter 7.)

Gruple: A Tuplespace for Groovy

I’d like to announce the first release of Gruple, a tuplespace implementation for Groovy (and Java, of course.) Release 1.0 is an *in-process* space only, meaning that it can be used to co-ordinate and synchronize threads, but not separate processes. It’s intent was to add one more tool to the toolkit for making concurrent programming simpler. (A remote space, allowing co-ordination among separate processes and/or nodes is on the roadmap, but not near the top.)

You can find the project at http://gruple.googlecode.com

After downloading and building the source (there’s an ant buildfile to make it easy enough), please read the Demos page for some instant gratification. 😉

Please enjoy it, and I appreciate any feedback (including bug reports!)

Happy concurrent programming!

Face Time with JXTA and Shoal Engineers

The second day at the show was a little more of a grind than the first. Aching feet and all that. But the reaction was still overwhelmingly positive. I had more of a chance to have some chats with the JXTA team and also with one of the people responsible for the Shoal project.

It seems the JXTA team is pining for a new book on JXTA and at least one person suggested I write it! (Uh… yeah… in my plentiful spare time!) I must admit it’s tempting. Another temptation to consume time I don’t really have came from a Shoal engineer interested in a) integrating Shoal with Spring for declarative clustering of Spring apps, and b) using this method to cluster JXTA super-peers (that was actually my idea, since our super-peers are Spring apps.)

Finally someone from Sun’s ISV Engineeering team was very enthusiastic about the idea of us porting our app–or some variation on it–to SavaJe, and since we’ve been considering a mobile version, it’s definitely worth a look-see.

We’ve met a lot of our goals for this conference, so the last day should be pretty easy-going. As Leigh says, the last thing we’re hoping for is a visit from a Java Rock Star.

“Wow. Cool…”

We heard a fair bit of that yesterday here at the oponia “pod” in the Java Playground @ JavaOne 07. We’ve all become pretty slick at giving the demo and Leigh is mastering the art of snagging total strangers and forcing them to watch it. Most of them end up being glad they did. Another thing we’ve been hearing quite a bit is: “hey, this actually looks really useful.” Umm… yeah. We kinda planned it that way, but we’re glad you agree. 😉

We had some pretty stiff competition for attention from the singing, dancing Robosapiens, the world’s fastest robot, and a Java-powered submarine. (It is a playground, after all.) Many thanks to Mike Duigou and Henry Jen for their fabulous demos of oponia’s ucaster in the JXTA pod.

I didn’t get to look around too much myself, but the coolest thing I’ve seen so far (besides the Robosapiens) was the Sunspot programmable sensor technology. Maybe today Mark and I will go find out what Nokia and Motorola are doing here. Well, we’re due back on the floor in a few minutes so… more later.

oponia at JavaOne

I’m thrilled to announce that we (oponia networks) will be premiering our “hyper-simple” sharing and collaboration platform at JavaOne this year. You’ll find us in the Java Playground in the Pavilion from May 8 to 10.

More info on the product will be available before the show (our team is working frantically on the material now), and I’ll post again when it’s ready to share.

I’d like to say a very special “Thank you!” to Bernard Traversat and Stephanie Kaul of Sun for providing us with this opportunity to showcase our JXTA-based product in such a great venue. Hope to see you there!

Configuring a JXTA Peer in Spring

Important update: This article is old and out of date. This method will no longer work with the current version of the JXTA platform. I am looking into what changes might be required to the platform and/or the example. I’ll post the results in a new entry and link to it here when it’s ready. I notice many people searching for JXTA+Spring configuration and landing here. Sorry for the inconvenience. VW, 23/07/2008

The new net.jxta.platform.NetworkConfigurator class provides a clean and straightforward way to configure an instance of the JXTA platform without a UI. It can be used programmatically (see the main() method for an example of stand-alone use) or—because it is a Java Bean—declaratively deployed inside containers like Spring.

Here is a simple example of a Spring context file, applicationContext-rdv.xml, which configures a Rendezvous Server:


    
        
        .jxta
        
        2752
        
        
             urn:jxta:uuid-XXXXXXXXXXXXXXXXXXXXXXXXXX...
        
        
        
            uuid-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
        
        
           MyNetPG
        
        
            My Infrastructure Group
        
        
        
           com.mycompany RDV1
        
        
           Rendezvous Node
        
        user
        pwd
        
        9700
        9701
    

    
        
    

And here’s what MyJxtaBean.java might look like, in part:


public class MyJxtaBean {

    private NetworkConfigurator netConfig;
    
    private PeerGroup infrastructureGroup;
    
    public MyJxtaBean() {}
    
    public setNetConfig(NetworkConfigurator netConfig) {
        this.netConfig = netConfig;
    }
    
    public synchronized void start() {

        /*
        * Simplest case, we don't overwrite an existing configuration, nor
        * do we load and alter/ammend it. Only if there is no PlatformConfig
        * available, construct a new one using the bean's properties.
        */
        if (!netConfig.exists()) {
            try {
                netConfig.save();
            } catch (IOException io) {
                System.out.println("Error saving PlatformConfig");
            }
        }
          
        try {
            infrastructureGroup = PeerGroupFactory.newNetPeerGroup();
        } catch (PeerGroupException pge) {
            System.out.println("Couldn't create NetPeerGroup");
            System.exit(1);
        }
    }
}

Now all your applicaton has to do is get the MyJxtaBean from the context and call start() to start the JXTA platform.


    applicationContext = 
        new ClassPathXmlApplicationContext("applicationContext-rdv.xml");
    myPlatform = (MyJxtaBean)applicationContext.getBean("myJxtaBean");
    myPlatform.start();

There’s more to writing a JXTA application, of course, but I hope this example demonstrates how NetworkConfigurator takes some of the mystery out of JXTA plaform configuration, while making it easy to combine JXTA and Spring.

Update: April 10, 2007 Christopher Marsh-Bourdon points out a flaw in this example when used with Spring 2.0.1. It’s related to the specification of the JXTA home directory. You can read Christopher’s analysis of Spring’s FileEditor Anomaly and solution to the probelm here. Thanks, Christopher.

All Feeds Lead to Rome

There’s probably no completely painless way to deal programmatically with the tangled mess of syndication formats, but the Rome project is looking promising. Unlike the Jakarta Commons FeedParser, Rome not only parses feeds but also provides:

  • RSS-specific, Atom-specific, and generalized object models–handy if you want to persist feeds after you’ve parsed them;
  • generators for all syndication formats;
  • conversion from any format to any other.

Apparently, there’s some co-operation brewing between the FeedParser and Rome folks, which will no doubt be good news for Java feed hackers everywhere.