Category Archives: Distributed Computing

Distributed Computing

Simple Key-Value Java Client for Riak

Riak 2.x’s new client is powerful, but can be pretty overwhelming if all one wants is to get(), put(), and delete() key/value pairs from a distributed store. This article covers a special, but useful, case of Riak usage and the code and configuration required to achieve it. I hope it saves someone all the time it took me to work it out (thanks to engineers at Basho for their comments.)

The first thing is to establish the requirements for our store and the configuration we’ll need to achieve it. For the sake of this example, I am assuming our default bucket setup is 3 replicas (n_val=3) and will achieve consistency-by-writes (w=all). Note that you can choose any value for w, including 1. Because read-repair is triggered after a value is returned to the client, we will usually want to use a value for r that is >1 (r=2) in order to make sure we don’t get a stale value and no chance to reconcile conflicts.

Besides n_val, w, and r, there are a few more bucket and global settings we need to adjust:

  • We do not want Riak to generate “siblings”, so we need to set ‘allow_mult = false’; this lets Riak resolve conflicts for us (see next item)
  • We do want Riak to use ‘causal context’ (a combination of vector clocks and time stamps) to reconcile inconsistent values, so we set ‘last_write_wins = false’ to prevent timestamps being the only criteria
  • Because we don’t want a deletion to mask the existence of a value, we have to turn off this optimization: ‘notfound_ok = false’
  • As a result of setting ‘notfound_ok = false’, we can alleviate the resulting performance hit with ‘basic_quorum = true’; this prevents the need to wait for all replicas if a deletion is found first

The default bucket properties in riak.config would like like this:

# Default bucket properties
buckets.default.n_val =  3
buckets.default.r = 2
buckets.default.w = 3
buckets.default.allow_mult = false
buckets.default.last_write_wins = false
buckets.default.notfound_ok = false
buckets.default.basic_quorum = true

Finally, in order to prevent certain edge cases causing spooky resurrection of deleted keys, we will set the delete_mode to immediate in advanced.config

[
  {riak_kv,
    [
       %% Delete mode
        {delete_mode, immediate},
       %% Dotted version vectors 
        {dvv_enabled, true}
    ]
  }
].

The ‘dotted version vectors’ are a nice-to-have but don’t affect this article. They are simply an enhancement of the original vector clocks used by older versions of Riak.

OK, enough prolog, let’s get to the code. Here, with proper handling of causal context, updates and deletes, as well as efficiency when not actually fetching values, is a RiakKVClient:

public class RiakKVClient 
{
 
    /**
     * Constructor. Use RiakProvider.getStoreClient(name) instead.
     * @param name bucket name
     * @param client low-level Riak Java client instance
     */
    protected RiakKVClient(final String name, final com.basho.riak.client.api.RiakClient client)
    {
        this.name = name;
        this.namespace = new Namespace(name);
        this.client = client;
    }
 
    /**
     * Get a value associated with a key
     * @param key
     * @return the value associated with the given key
     * @throws IOException
     */
    public byte[] get(final String key) throws IOException {
        if (key == null)
        {
            throw new IllegalArgumentException("Key is required");
        }
 
        try {
            final FetchValue.Response response = fetchValue(key);
            if (response.isNotFound())
            {
                return null;
            }
            final RiakObject riakObject = response.getValue(RiakObject.class);
            return riakObject.getValue().getValue();
        } catch (ExecutionException e) {
            throw new IOException("Riak failed to retrieve object from bucket: " + name + " with key: " + key, e);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return null;
    }
 
    /**
     * Insert a value associated with a key. If a value already exists, update it.
     * @param key the key
     * @param value the value to store
     * @throws IOException
     */
    public void put(final String key, final byte[] value) throws IOException {
        if (key == null || value == null) {
            throw new IllegalArgumentException("All parameters are required");
        }
 
        try {
            // fetch in order to get the causal context
            final FetchValue.Response response = fetchMetadata(key);
            final RiakObject storeObject = new RiakObject().setValue(BinaryValue.create(value)).setContentType("binary/octet-stream");
            StoreValue.Builder builder = new StoreValue.Builder(storeObject).withLocation(new Location(namespace, key));
            final VClock vectorClock = response.getVectorClock();
            if (vectorClock != null) {
                builder = builder.withVectorClock(vectorClock);
            }
            final StoreValue storeValue = builder.build();
            client.execute(storeValue);
        } catch (ExecutionException e) {
            throw new IOException("Riak failed to store object in bucket: " + name + " with key: " + key, e);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
 
    /**
     * Delete the value associated with the given key.
     * @param key the key to delete
     * @returns true if the value existed; false if o/w (and method will have no effect)
     * @throws IOException
     */
    public boolean delete(final String key) throws IOException {
        if (key == null)
        {
            throw new IllegalArgumentException("Key is required");
        }
 
        try {
            // fetch in order to get the causal context
            final FetchValue.Response response = fetchMetadata(key);
            if (response.isNotFound())
            {
                return false;
            }
            DeleteValue.Builder builder = new DeleteValue.Builder(new Location(namespace, key));
            final VClock vectorClock = response.getVectorClock();
            if (vectorClock != null) {
                builder = builder.withVClock(vectorClock);
            }
            final DeleteValue deleteValue = builder.build();
            client.execute(deleteValue);
            return !response.isNotFound() || !response.hasValues();
        } catch (ExecutionException e) {
            throw new IOException("Riak failed to store object in bucket: " + name + " with key: " + key, e);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return false;
    }
 
    private FetchValue.Response fetchMetadata(final String key) throws ExecutionException, InterruptedException
    {
        return fetchResponse(key, true);
    }
 
    private FetchValue.Response fetchValue(final String key) throws ExecutionException, InterruptedException
    {
        return fetchResponse(key, false);
    }
 
    private FetchValue.Response fetchResponse(final String key, boolean headOnly) throws ExecutionException, InterruptedException {
        Location loc = new Location(namespace, key);
        FetchValue.Builder builder = new FetchValue.Builder(loc);
        if (headOnly) {
            builder.withOption(FetchValue.Option.HEAD, true);
        }
        FetchValue fetch = builder.build();
        return client.execute(fetch);
 
    }
 
    private final Namespace namespace;
    private final String name;
    private final com.basho.riak.client.api.RiakClient client;
}

The only thing missing is the RiakProvider class for creating instances of RiakKVClients. Also, the code may be difficult to read given the formatting of this blog, so here’s the full Gist

If you have questions on why something is the way it is, please leave a non-anonymous comment.

References:

Understanding Riak’s Configurable Behaviors: Part 4

A Distributed Reentrant Read-Write Lock Using a Hazelcast Data Grid

Most Java programmers are familiar with the java.util.concurrent package and some of the handy things in it like ReentrantReadWriteLock. To recap, a ReadWriteLock solves the “Readers-Writers Problem” in computer science.

A read-write lock allows for a greater level of concurrency in accessing shared data than that permitted by a mutual exclusion lock. It exploits the fact that while only a single thread at a time (a writer thread) can modify the shared data, in many cases any number of threads can concurrently read the data (hence reader threads). In theory, the increase in concurrency permitted by the use of a read-write lock will lead to performance improvements over the use of a mutual exclusion lock. – JavaDocs, JSE 7

Additionally, a ReentrantReadWriteLock, allows any thread to acquire the same lock more than once.

This lock allows both readers and writers to reacquire read or write locks in the style of a ReentrantLock. Non-reentrant readers are not allowed until all write locks held by the writing thread have been released.

Additionally, a writer can acquire the read lock, but not vice-versa. Among other applications, reentrancy can be useful when write locks are held during calls or callbacks to methods that perform reads under read locks. If a reader tries to acquire the write lock it will never succeed.- JavaDocs, JSE 7

It would be nice to have the same semantics available to control concurrent access to resources in a distributed application, but the only implementations I’m aware of are heavy-weight (.e.g. Apache Zookeeper Shared Reentrant Read Write Lock Recipe.)

Hazelcast’s distributed in-memory data grid, on the other hand, is lightweight and easy to use. Drop a jar file into your application and off you go. Hazelcast includes distributed implementations of Maps, Lists, Queues, etc. as well as Semaphores, Locks and AtomicLongs. It seems we should be able to implement the synchronization we want using some of these distributed collections and concurrency primitives.

This blog post provides a good starting point for how to implement a ReadWrite lock using two semaphores. Though it explains the concept simply, it has two deficiencies. First, writers may starve while readers hog the lock. Second, it isn’t re-entrant. The first problem can be solved, as the blog author notes, using a third semaphore. This article (PDF) illustrates how to solve the “writers-preference” problem using a third semaphore. To make it work, though, we’ll have to replace those counters with distributed AtomicLongs.

That’s great for a non-reentrant ReadWrite lock, but what about a reentrant one? An algorithm to prevent deadlock and allow strongly reentrant usage (writers can acquire nested read locks) is explained in “Reentrant Readers-Writers” (PDF). It requires the use of a monitor–which in Java means a Lock and associated Condition–and involves keeping a per-thread count of nested locks. For the latter, I stash the count in a ThreadLocal. There are a couple of other niggly bits involving a distributed counter and a distributed boolean (which we’ll fake with an AtomicLong.)

The end result is two types of distributed locks, implemented using Hazelcast’s ISemaphore, ILock, ICondition and IAtomicLong. The only further complication is the desire to abstract the grid implementation being used (mostly useful for testing, since I know of no other grid technology that provides the data structures required here.) I use a DistributedDataStructureFactory and a DistributedLockFactory to solve those problems, as well as some helper interfaces and wrapper classes to compensate for the fact that in java.util.concurrent, Semaphore and AtomicLong are concrete classes.

Assuming you have a HazelcastInstance, usage is identical to usage of java.util.concurrent.locks.ReentrantReadWriteLock, with the exception of creation of a new lock instance.

// This can be a singleton, but additional instances aren't a problem.
DistributedLockFactory lockFactory =
    new DistributedLockFactory(new HazelcastDataStructureFactory(hazelcastInstance));
 
ReadWriteLock lock = lockFactory.getReentrantReadWriteLock("myLock");
lock.readLock().lock();
try {
    // do some stuff
}
finally {
    lock.readLock.unlock();
}

The full package, with both types of locks, helper classes, unit and integration tests has been released under the Apache 2.0 license by kind permission of my employer ThoughtWire Corporation. You can find it on GitHub here: https://github.com/ThoughtWire/hazelcast-locks

 
Update: June 13, 2014.
Shortly after releasing the first version of this package, I discovered an additional wrinkle, namely that each lock operation must deal very carefully with thread interruption so as not to leave any data structures in a state which could lead to deadlock of other threads or nodes. Specifically, all operations that call blocking methods (such as Semaphore.acquire() or Condition.await()) must catch any thrown InterruptedException and restore the lock’s original state before setting the thread’s interrupted status and returning. In practice, this is quite messy to do (!) and a worthy improvement would be to find a way to tidy it up. For the gory details on proper handling of task cancellation, see Goetz et al, “Java Concurrency in Practice, 2nd edition” (specifically, Chapter 7.)

Gruple moved to Codehaus

As promised, Gruple has moved from Googlecode to Codehaus. You find it’s new home here: http://gruple.codehaus.org.

All the documentation has been ported. There are new mailing lists. The source is now in a Git repository. And the 1.1.1 distribution is available from the distro site. Everything is (or should be) linked to from the main page.

Thanks to everyone who commented or helped.

Gruple 1.1.1 with Transactions released

I had almost given up on Gruple because I had no idea if anyone was using it. But it turns out I need it so badly myself that I got a second-wind and implemented transactions. I released v.1.1 yesterday and then realized with horror that it had serious bugs. After a frantic morning, I’ve got Gruple v.1.1.1 out and I believe (pray) those bugs are addressed.

That is not to say that I’m completely confident there are no bugs in Gruple as it stands! I have noticed occasional bad behaviour, the sure sign of a concurrency time-bomb somewhere. I’ll keep doing my best to track it down, starting with adding concurrent unit tests with the help of GroboUtils. If you use Gruple and have any problems please do let me know.

Finally, Gruple will be moving to Codehaus fairly soon (it’s approved, but there is work to do.) This will give it greater exposure. I’ll be switching to a git repo, because that just seems to be the thing to do (and I hate SVN with a passion anyway.)

Now if only I could get someone at Terracotta and SpringSource to work on supporting Groovy in the Terracotta product, I’d be laughing. And so would you.

Loosely-Coupled Actors: In Brief

Actors are a popular way to write concurrent & distributed programs. Immutable messages are passed between actors which do the required processing, avoiding the difficulties inherent in sharing data among threads/processes.

Tuplespaces (best exemplified in current times by JavaSpaces) are a way to decouple cooperating processes by using pattern-matching to share immutable and persistent data objects.

Many distributed algorithms can be modelled as a flow of objects between participants.

JavaSpaces(TM) Principles, Patterns, and Practice

Or, if you prefer, substitute “parallel” for “distributed” and “messages” for “objects”.

So what do you get if you combine the pattern-based communication of tuplespaces with the message processing paradigm of actors? Loosely-coupled actors.

Actually, as I see it, you get two things:

  • An easier way to write tuplespace programs: the actor DSL provides a formalism for describing reading, writing, and consuming of tuples and the behaviour associated with these operations.
  • Pattern-based communication between actors with no explicit reference to each other: actors are decoupled in both (address)space and time (since messages are persistent objects.)

In both cases, a (hopefully) better way to write concurrent programs.

To the best of my knowledge, no one has tried this yet.

Note: I wrote a longer piece on this topic yesterday with more explanation and references. For those wanting a little more…

Loosely-Coupled Actors

Recently the growing concern with effective use of multicore processors and the subsequent popularity of the actor model came together with my desire to do a project in Groovy. Adding to the mix, I had an in-JVM tuplespace library in Java I’d never released because it seemed thread co-ordination was just not enough of a big deal. Now, with the need to simplify and promote good multi-threaded programming practices, I thought it might be worth translating it into Groovy as a learning exercise (I’m new to Groovy) and releasing it as open source.

But the actor model and tuplespaces kept getting mixed up in my head. Like chocolate and peanut butter, I just wanted them both at the same time. What I really wanted was loosely-coupled actors.

Actors

The actor model uses the passing of immutable messages to avoid the problem of shared state in concurrent and parallel programs. This is a gross oversimplification, but a deeper discussion is beyond the scope of this post. If you want a better explanation, this two-part article (part 1, part2) by Alex Miller really helped me out. (Don’t let the Erlang stuff in Part 1 turn you off, he gets to Scala, Java, and Groovy eventually.)

Tuplespaces

In a tuplespace–again grossly simplified–there are no shared variables, but there is a shared data space filled with immutable objects. These objects are accessed not by address, but by their contents. A tuplespace is an associative shared data store. These objects, or tuples, can be thought of as messages. They are matched by templates inserted into the space by other components or processes, rather than delivered to a named end-point. A tuplespace is a realization of the “generative communication model” because messages are first-class, shared objects. But in this model communication is uncoupled rather than point-to-point.

Digression: Assembly by Diffusion

A passage in Manuel Delanda’s Intensive Science & Virtual Philosophy, does something to explain why I am so attracted to spaces as a communication mechanism. It’s not an exact analogy, by any means, but it gives a flavour of the “open combinatorial spaces” that the generative communication model creates. Here’s an excerpt:

[Biological assembly] permits the transport processes not to be rigidly channelled, using simple diffusion through a fluid medium to bring the different parts together. Components may float around and randomly collide, using a lock-and-key mechanism to find matching patterns without the need for exact positioning.

I think that’s a pretty fair description of “loose coupling”: no rigid channels; no exact positioning; parts brought together by pattern matching. Tuplespaces would seem to act like the aforementioned fluid medium, allowing components to work together in an almost organic manner.

ActorSpaces

Agha and Callsen describe the idea of an ActorSpace. Have I finally found what I’ve been looking for? Well… not quite. Agha and Callsen do recognize the importance of “pattern-based communication between processes which have no explicit reference to each other” for open distributed programming. However, they chose to base their pattern matching on attributes of actors themselves. This introduces a new element to the actor model–attributes–and it is not entirely clear what these attributes might be or how to use them to abstract communication. Interesting paper, but it doesn’t quite scratch my personal itch.

Tuplespaces + Actors

Many distributed algorithms can be modelled as a flow of objects between participants.

JavaSpaces(TM) Principles, Patterns, and Practice

We could as well substitute “parallel” for “distributed” and “messages” for “objects”.

Considering the quote above, the actor DSL provides a way to define the “participants”, describe their behaviour, and manage their lifecycle. The tuplespace operations provide the pattern-directed flow of messages.

The result is:

  • An easier way to write tuplespace programs: the actor DSL provides a formalism for describing reading, writing, and consuming of tuples and the behaviour associated with these operations.
  • Pattern-based communication between actors with no explicit reference to each other: actors are decoupled in both (address)space and time (since messages are persistent objects.)

In both cases, a (hopefully) better way to write parallel programs.

So the bottom line of all this is that my little Groovy tuplespace project is trying to grow into something more: using a tuplespace paradigm to provide pattern-matching based on messages and processing of these messages by actors: loosely-coupled actors.

The Web Bores Me

I have to face the facts: I’m completely bored of writing web apps. I’m not bored by the architecture of the Web, which I believe should be leveraged more than it currently is; but sometimes I really don’t think I can face the grind and hassle of assembling what should be a simple web application or web service. Let’s face it: it’s so freaking dull. And so much harder than it should be. I read somewhere (unfortunately, I can’t remember where) that the most complex aspect of any enterprise web development project is AJAX. I can believe it easily. Add a pile of enterprise middleware suckage plus associated crappy tools and it’s ten times more disheartening.

So I’m learning Cocoa. Gonna see what I can do with desktop apps that speak Web. At least it’ll be different. And cross-platform capability be damned (for the moment, at least.) Then who knows? Maybe it’s time to take distributed computing to the iPhone :)

I suppose it’s no wonder I’m more comfortable writing middleware. It’s hard, but at least it’s not hard and terminally boring. (YMMV.)

Amazon Web Services Redux

It seems my earlier post “The Long Tail of Web Services” is getting some traffic from links here and here. At least someone is willing to put their money where my mouth is :)

Since that original post, Amazon has come out with yet another service (still in limited beta) called Amazon SimpleDB. This is a simple but apparently powerful service to query structured data. Although I note some complaining from the database community about it not really being a database, that’s just a semantic issue. If they renamed it, the complaints would probably go away. I think I would describe SimpleDB as something like a content-addressable DHT.

BTW, I can’t help wondering if this new service is related to Amazon’s Dynamo. (This is total speculation on my part, BTW. Perhaps closer inspection will tell. Or maybe Amazon will, eventually…)