How fast can you read a very large csv file and compute min, max, average aggregation out of it (Part 2)

Paul Bares
6 min readOct 20, 2021

--

This is the second part of the series “How fast can you compute the average, min and max of values read from a very large csv file”. In the first part, we saw how some quick tips and tricks could save us time for reading and computing values from a large csv file.

In this second part, we are going to explore to new ways to improve the performance:

  • Branch prediction elimination
  • Multi-threading

Branch prediction

To have a good understanding of what branch predication is, I recommend to read this very popular SO question. I also recommend this article written by Daniel Lemire, a computer scientist and author of simdjson and fast_double_parser libraries.

I run the SO code snippet:

import java.util.Arrays;
import java.util.Random;

public class Main
{
public static void main(String[] args)
{
// Generate data
int arraySize = 32768;
int data[] = new int[arraySize];

Random rnd = new Random(0);
for (int c = 0; c < arraySize; ++c)
data[c] = rnd.nextInt() % 256;

// !!! With this, the next loop runs faster
Arrays.sort(data);

// Test
long start = System.nanoTime();
long sum = 0;
for (int i = 0; i < 100000; ++i)
{
for (int c = 0; c < arraySize; ++c)
{ // Primary loop
if (data[c] >= 128)
sum += data[c];
}
}

System.out.println((System.nanoTime() - start) / 1000000000.0);
System.out.println("sum = " + sum);
}
}

On my machine (MacBook Pro 19-inch, 2019), it is 5 times faster when the array is sorted. To make sure branches were mispredicted, I use XCode’s Instruments.

  • Select Counters
  • Go to File > Recording Options
  • Select “Sample by: Events”
  • Choose BR_MISP_RETIRED.CONDITIONAL and add it to the list as shown on the below screenshot
XCode’s Instruments

Select the process for which you want to measure branch prediction and start recording.

The result is clear:

Sorted array
Unsorted array

355,347,217 misprediction when the array is unsorted against 75,819 when the array is sorted.

In our case, the condition for which aggregation is performed is year ≥ 2005 which is a good candidate for bitwise operation replacement operation to eliminate branches.

int t = (year - 2005) >> 31;

We can replace the sum operations and the condition by:

sumMileage += ~t & mileage;

Because if year is higher or equal than 2005, t equals zero and ~t an integer with all bits set to 1 therefore ~t & mileage = mileage. If year is lower than 2005, t is integer with all bits set to 1, ~t is zero therefore ~t & mileage = 0.

Using an other trick, we can replace the min operation and the condition by:

minMileage = Math.min(minMileage, (t >>> 1) | mileage);

If year is higher or equal than 2005, t>>>1equals zero and the minimum is computed as usual, elset>>>1is equal to Integer.MAX_VALUEand the minimum remains the same.

A similar trick is used to compute maximums.

Unfortunately, the result are not as good as the previous one (SO example). 1,823,857 misprediction initially with the if condition against 1,251,691 without the if condition.

The overall result is barely better when removing branch misprediction.

Average test time: 6121.64ms

Source code: Step5.java

Multi-threading

Monitoring the CPU usage while benchmarking shows that the code is not very effective and does not leverage all the cores available on the machine.

Ideally, the CPU usage should be 100%. To get as close as possible of that figure, we need to change the code to read the file and do the computation in parallel.

Problem: How to read a csv file in parallel?

Idea:

  • Split the file into pieces i.e split a huge array of bytes into smaller chunks of bytes
  • For each piece, compute an intermediate aggregate result
  • Once all pieces have been read and intermediate results computed, merge the result into the final one

The biggest challenge here is to split the file into pieces where each piece correspond to the beginning of a line to the end of another line. To do that, we can split the file into chunks of approximately the same size. It is very likely that the beginning and the end of a chunk correspond to the middle of line. Move the beginning of the chunk to the next line and move the end of the chunk to the end of the current line as illustrated below.

Original split

Dots represents characters. Letters are assigned to chunk boundaries that will be “moved” to resize the chunk and make it start at the beginning of a line and end at the end of a line.

New split

Once the chunks are determined we can start reading the chunks and compute intermediate results.

I use ForkJoinTasks to do a parallel recursive decomposition of the work. Algorithm does a divide-by-two recursive decomposition to divide work into single pieces (see ParsingTask). Even when work is split into individual calls, tree-based techniques are usually preferable to directly forking leaf tasks, because they reduce inter-thread communication and improve load balancing.

In my code, a parent ParsingTask is launched and creates/forks other tasks if the number of bytes to read is too big. When a ParsingTask is “small” enough, it starts the reading and computation. Once all leaf tasks are done, results are merged in a MapReduce style.

Choosing the value to determine what is “too big” is not easy and there is no universal formula. The value I have chosen (35MB) as been determined empirically and may be different on a different machine but it should take into account the desired level of parallelism i.e the number of available threads.

The result is quite amazing:

Average test time: 1748,21ms

It is 3.5 times faster than before! The CPU usage is satisfying and close to 100%.

CPU usage

Source code: Step6.java

Spark

Just for the fun, I tried with Apache Spark. I am not a Spark connoisseur, I have only used it once or twice before so there might be some optimizations I did not apply but the result is disappointing: 31135ms.

The code is quite simple though:

SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL Example")
.config("spark.master", "local")
.getOrCreate();
StructType schema = new StructType()
.add("year", "int")
.add("mileage", "int")
.add("price", "double")
.add("brand", "string")
.add("transmission", "string")
.add("fuel type", "string");
Dataset<Row> df = spark.read()
.schema(schema)
.option("charset", "US-ASCII")
.option("delimiter", ",")
.csv(CsvGenerator.FILE_PATH);
Dataset<Row> agg = df.select("year", "mileage", "price")
.filter(new Column("year").$greater$eq(2005))
.agg(
min("year"),
min("mileage"),
min("price"),
max("year"),
max("mileage"),
max("price"),
avg("mileage"),
avg("price"));
agg.show();

Source code: StepSpark.java

I run the same benchmarks with GraalVM, the Java VM and JDK based on HotSpot/OpenJDK, implemented in Java by Oracle. Surprisingly, every step run 10% faster.

Version: graalvm-ee-java17–21.3.0.

abscissa: step number; ordinate: time in ms

With the different optimizations applied, we could make the reading 7.6 times faster than it was. I am sure there must still be room for improvement but I let you look for it if like Alice, you aren’t afraid to go down the rabbit hole.

Claps and shares are very much appreciated!

Full code is available here https://github.com/paulbares/fast-calculation. Note the code snippets in this article can differ a little from the code in the repository.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

Paul Bares
Paul Bares

Written by Paul Bares

I'm an enthusiast in computer hardware and programming. I specialize in high performance and parallel computing. Co-Creator of SquashQL Github: squashql

No responses yet

Write a response