Distributed Fork Join
Being able to process large amounts of data becomes increasingly important for an increasing number of people and organizations. Processing data could mean a variety of things and differs per case. However in the most basic sense it means ‘doing something’ with your data.
Processing these large amounts of data is really not a problem until you put a time constraint into the mix. When you have to process your data within a certain time frame, a growing data set can become a problem. A number of solutions instantly come to mind when you face time-constrained data processing. E.g.:
- Code optimization
- Scaling up your hardware
- Database clustering
The above solutions are a little traditional but they are being used a lot. Though we can also identify other solutions: running code in parallel and distributed computing. In this article we will focus on these two concepts.
Fork Join
Ever since processors are able to we have been trying to optimize our code for this. Especially lately the number of cores a processor has is growing fast. How often haven’t you heard someone say; “the system is slow because modern software doesn’t handle multi-core processing very well”? Hardware vendors try to handle this by simply boosting the speed of a single core when needed (TurboBoost, TurboCore), so software that stresses one core a lot can get a small increase in processor speed. But it would be very nice if developers were able to make use of all the cores available. In Java 7, developers have access to JSR-166: a fork/join framework. What it allows the developer to do is to define a workload and then split the entire workload into smaller pieces which can execute separately. Then the results can be combined into one result.

Distributed Tasks
In distributed computing you will attempt to leverage the processing capabilities of multiple nodes. Granted that the problem allows for such an approach multiple machines could be tasked with a small portion of the problem. Using a technology like GigaSpaces that allows easy implementations of the Map-Reduce pattern a problem could be split into smaller sub-tasks and then sent to multiple machines. The result of each of the machines would then have to be combined to form the full result.
Graphically splitting up the work into smaller sub-tasks could look like:

Then when each of the nodes calculated its result, the results will have to be combined. Which could be displayed as:

With a technology like GigaSpaces (of course there are others that allow this as well) we could achieve the above with something called DistributedTasks.
The problem
For our problem scenario we will have a list, of a decent size, of files. Each holds 20,000 lines of comma separated values. Each line must be processed. Processing in our case means that we consider each individual line and extract the ‘id’ value and count how many occurrences it has in the entire data set. While this problem is not one you will come across in your work (most likely) there are a lot of similar cases imaginable that you will come across.
A solution
It would be very nice if you could leverage the power of a fork-join framework along with the power of multiple nodes. So, our basic setup will be: distribute all the files evenly across our cluster, then on each node process all available files, line by line. All this while maximizing our processor power. In the solution that I created I used GigaSpaces with Java 7. First there is the ‘client’, this is a class that gets a reference to the cluster and fires a DistributedTask into the cluster and then prints out the result:
List results = gigaSpace.getClustered().execute(new CounterTask()).get();
for (Result r : results) {
System.out.println("Result " + r.getId() + " occurred " + r.getOccurrences() + " times");
}
Let’s take a look at the class CounterTask, which is a distributed task. This task will execute on each of the nodes where a subset of all data is present.
The following code snippet shows what the execute() method does of the CounterTask.
Data[] allData = gigaSpace.readMultiple(new Data(), Integer.MAX_VALUE);
for (Data data : allData) {
List lines = readAllLines(data.getContents());
ForkJoinPool pool = new ForkJoinPool();
String[] lineArray = lines.toArray(new String[0]);
Map intermediateResults =
pool.invoke(new Counter(lineArray, 0, lineArray.length));
results.addAll(intermediateResults.values());
}
return results.toArray(new Result[0]);
So, for each of the files (represented by the Data class) all lines inside the file are read into a list and a ForkJoinPool handles splitting the workload over the available processing power. For this the Counter class is used. Let’s take a look at that class. It implements a single method: compute() which return how many times an id occurs. Since we want to maximize our processing power we need to divide the work into smaller parts. However at one point splitting the task would only create more overhead and we should simply do our processing. The THRESHOLD variable is used to determine if we should split the task, or simply process the lines that is given:
if (lines.length >= THRESHOLD) {
int mid = start + (end - start) / 2;
Counter left = new Counter(lines, start, mid);
Counter right = new Counter(lines, mid, end);
left.fork();
Map rightAns = right.compute();
Map leftAns = left.join();
return merge(leftAns, rightAns);
. . .
Notice that this method is called recursively by using the fork(), compute() and join() methods.
If we should simply do the work all lines are iterated and the number of times the ‘id ‘ value occurs is counted:
. . .
} else {
Map results = new HashMap<>(10000);
for (String line : lines) {
Result result = new Result();
String id = line.substring(0, line.indexOf(","));
result.setId(id);
result.setOccurrences(1);
if (results.get(id) == null) {
results.put(id, result);
} else {
Result previousResult = results.get(id);
previousResult.increment(1);
results.put(id, previousResult);
}
}
return results;
Conclusion
The solution application shows how we can distribute the workload across an entire cluster of machines using DistributedTask and on each of the nodes processing power is maximized using the fork-join framework. By no means should you always use this approach, I’ve used it purely to demonstrate how work can be divided and results retrieved. You should always first check if the problem you’re solving can be distributed or split into sub-tasks, or maybe even both like in our example, before you go all out with frameworks like this.
