Building a scalable and high performances big data system
About choosing the right tools for making a scalable big data analysis system

Some time ago, I had the opportunity to work on a very exciting project. We needed a way to expose REST APIs about Google Analytics data, but we had some big problems with the free Google Analytics version.

First of all, the retention period. At time of writing, Google Analytics (free plan) has a retention period of 26 months. Of course, you can extend that retention period, but it has some downsides (read more here).

Another huge problem, is that the free plan of Google Analytics Reporting API has a limit of 50.000 requests per project per day, and 10 queries per second (per IP address, source here).

These problems made it impossible to use the Reporting API as a standard way for making requests to Google Analytics, as it won't scale as soon as we reach a discrete number of users.

So we needed a way for storing those data inside our servers and make it available everytime we want, without any kind of rate and/or retention limit.

#Our Constraints

We still wanted to use Google Analytics as our main analytics system. We've been using it for the past year and it has everything we need in order to improve our users experience. We just needed to do the most simple thing possible in order to expose those data to our Next.js frontend.

So, we needed a simple scheduled job that calls Google Analytics, normalizes the resulting data, and stores it on our server, in order to expose them as aggregated data later on:

Building a scalable and high performances big data system

One thing to keep in mind, is that we want our data to be as fresh as possible. That means that the scheduled job, the data normalization process and the database batch update operation must be as fast as possible.

#Data Normalization

During the past months, we changed the URL structure of some resources for our webapp. That means that we need to add support for legacy URL, such as:

Legacy URL:

https://www.example.com/users/AJH829SKLJ29/john-doe

                              ^ old ID

new URL:

https://www.exampke.com/users/john-doe-0cc175b9c0f1

                                       ^ new ID

as you can see, we are exposing an ID to the client, but it has changed over time. So we also need to transform the old ID into the new one before inserting it in our database.

We also need to perform other transformations on our data, but it isn't something that requires access to our master database (a PostgreSQL db).

#Building a Proof of Concept

We've started to build a proof of concept with Node.js (TypeScript) as primary scripting language and CouchDB as main database. We've soon realized that we needed to add Redis as a caching layer, 'cause making calls to the database for transforming the old ID into the new one, was really time consuming. It also helps a lot for caching our REST API responses.

Our initial infrastructure looked like this:

Building a scalable and high performances big data system

Some thoughts on our first proof of concept:

1) Node.js is great for prototyping, but even with TypeScript, it still doesn't seems to be the right choice for intensive data manipulation. DISCLAIMER: this is my personal opinion. I still love Node.js and I use it for my everyday job, but we really wanted to choose the right tool for the right job. 2) In order to make our queries more efficient, we wrote down some CouchDB views using the map/reduce programming model. While it makes our queries incredibly efficient, writing down very optimized map/reduce queries requires some experience and skills... and we began to miss SQL a lot. 3) This solution would scale well vertically (adding more power to our server), but will it scale horizontally (adding more nodes to our cluster)? I have no doubts about CouchDB, but what about the Node.js server and its in-memory cache (which is used intensively)? 4) Are Node.js performances enough? I mean, Node.js performances are absolutely outstanding, but the data normalization process seems to take a very long time. In order to increase its performances, we eventually refactored parts of the codebase removing some higher order functions in favor of more imperative patterns, and that decreases our productivity and software maintanability.

#Changing Approach

After some time running this system in a testing environment, we began to spot some issues in the architectural decisions that we made in our proof of concept. CouchDB is still an awesome database, but we realized that maybe we didn't have the right skills and knowledge required for writing very optimized queries. Writing map/reduce views become really painful for such a small team, that needed to spend more time searching for the right StackOverflow answer, than writing actual code. We also didn't take advantage of any of the most loved CouchDB features (data synchronization, REST APIs and so on). That said, maybe CouchDB wasn't the best solution for our problem.

Another problem we had, was the Node.js server performances. Let's investigate the following scenario:

1) Node.js calls Google Analytics Reporting APIs 2) Reporting APIs responds after about 0.8 seconds with thousands of records 3) For each reporting row (it's actually an array of objects), we needed to parse the data and call Redis/PostgreSQL for converting the old IDs into the new ones 4) Inserting the normalized data into CouchDB. If data already exists, we need to take its CouchDB _rev ID and use it for updating the data instead of inserting it. 5) After inserting the data to CouchDB, we need to make a CouchDB query in order to update some of the data inside our PostgreSQL database.

the whole process takes about 4 seconds to be completed, which is a lot of time. We needed to fix all these problems that can be summarized in:

1) Increasing the data-normalization process performances 2) Increasing the developers productivity when working with our NoSQL database 3) Get rid of all the imperative and low-level TypeScript stuff, in order to write better unit tests and more maintainable software.

After looking into different technologies (Go, Python, Cassandra, MongoDB) we eventually came up with the idea that our software should respect some important roules:

1) It must scale horizontally 2) It has to be written in a high level language 3) We should be able to produce unit tests and documentation with ease 4) If Google APIs takes 0.8s to respond, it would be awesome to normalize and insert data in about 200ms, so that the whole process will take 1s to be completed. 5) Calling Redis is ok, but it would be awesome to have a sort of distributed-in-memory caching mechanism just to avoid to call an external service for caching the relation between an old ID and the new one.

Eventually, we came up with just a couple of technologies that can help us to achieve the goals above: Elixir + Cassandra.

#Why Elixir

Elixir is a functional programming language that compiles to Erlang bytecode. In fact, it runs on the famous BEAM virtual machine, which is well known for its reliability and performances:

"The AXD301 has achieved a NINE nines reliability (yes, you read that right, 99.9999999%). Let’s put this in context: 5 nines is reckoned to be good (5.2 minutes of downtime/year). 7 nines almost unachievable... but we did 9. Why is this? No shared state, plus a sophisticated error recovery model." (Joe Armstrong, the author of the Erlang Programming Language)

Famous framework such as Scala's Akka takes huge inspiration from Erlang's actor model in order to build massively scalable and resilient software.

Thanks to Elixir, we could actually solve oll of the problems we previously mentioned.

#1) It must scale horizontally

Elixir is fast. Really, really fast. And even if some functions are taking a bit more time to execute, Elixir code is incredibly easy to run concurrently. In fact, Erlang and Elixir have some concurrency primitives built-in in the language itself.

Let's make a very trivial example:

defmodule Example do
  def factorial(0) do
    1
  end

  def factorial(limit) when limit > 0 do
    limit * factorial(limit - 1)
  end
end

As you can see, we're defining a module called Example, with a method called factorial which returns the sum of its arguments. If the syntax reminds you of Ruby... well, that's because Elixir has been invented by José Valim, an ex Ruby On Rails team member!

By the way, we're able to spawn the factorial function asyncronously (even on different machines!) by typing:

iex> spawn(Example, :factorial, [10])
#PID<0.114.0>

As you can see, it returns the #PID (Process ID) of the spawned process... and pay attention, I've intentionally used the "process" word instead of threads!

"Processes in the Erlang VM are lightweight and run across all CPUs. While they may seem like native threads, they’re simpler and it’s not uncommon to have thousands of concurrent processes in an Elixir application" (source)

#2) It has to be written in a high level language

Elixir, just like Erlang, Java, PHP, Ruby is an high-level and garbage collected programming language. It offers a lot of useful structures and higher order functions that can really help when writing complex algorithms.

Once again, let's make a trivial example writing a C++ quicksort (taken from https://www.softwaretestinghelp.com/quick-sort):

#include 
using namespace std;

void swap(int* a, int* b) 
{ 
    int t = *a; 
    *a = *b; 
    *b = t; 
} 

int partition (int arr[], int low, int high) 
{ 
    int pivot = arr[high];
    int i = (low - 1);   

    for (int j = low; j <= high- 1; j++) 
    { 
        if (arr[j] <= pivot) 
        { 
            i++;
            swap(&arr[i], &arr[j]); 
        } 
    } 
    swap(&arr[i + 1], &arr[high]); 
    return (i + 1); 
} 

void quickSort(int arr[], int low, int high) 
{ 
    if (low < high) 
    {
        int pivot = partition(arr, low, high); 

        quickSort(arr, low, pivot - 1); 
        quickSort(arr, pivot + 1, high); 
    } 
} 

void displayArray(int arr[], int size) 
{ 
    int i; 
    for (i=0; i < size; i++) 
        cout<

Here it is the exact same algorithm written in Elixir:

defmodule Quicksort do
  def sort([]), do: []
  def sort([pivot|t]) do
    sort(for x <- t, x < pivot, do: x)
    ++ [pivot] ++
    sort(for x <- t, x >= pivot, do: x)
  end
end

What about performances? Well, C++ is faster, and I'm not really surprised about that! But Elixir is fast enough to justify its usage.

Higher level languages improves developers productivity, and we needed to find the right language to handle massive amounts of data. Elixir seems to be the right choice for that kind of work!

Elixir also introduces tons of syntactic sugar that really helps us to write coincise and straightforward code. Just take the pipeline operator as an example:

def make_it_scream(sentence) do
    sentence
    |> String.upCase
    |> fn words -> "#{words}!!!"
end

Or the shorthand for anonymous functions:

def make_it_scream(sentence) do
    sentence
    |> String.upCase
    |> &("#{&1}!!!") # "&N" refers to the Nth parameter of the anonymous function
end

And does a function return a boolean value? Just use the ? character to make it explicit!

def odd?(number) do
  rem(number, 2) === 0
end

Once you get used to these syntactic sugars, you'll never come back.

#3) We should be able to produce unit tests and documentation with ease

This is probably one of the most awesome things about Elixir. In fact, Elixir treats documentations and tests as first-class-citizens. Let's say that I need to write down some documentation for the factorial function above:

defmodule Example do
  @moduledoc"""
  The Example module is a module written for making
  some tests on this awesome article.
  """

  @moduledoc since: "1.0.0"

  @doc"""
  This function is used only when the passed argument is equal to 0,
  and it always returns 1.

  ## Examples
    iex> Example.factorial(2)
    1
  """
  def factorial(0) do
    1
  end

  @doc"""
  This function recursively computes the factorial of a given number when the
  passed argument is greater than 0.

  ## Examples
    iex> Example.factorial(10)
    3628800
  """
  def factorial(limit) when limit > 0 do
    limit * factorial(limit - 1)
  end
end

As you can see, we're writing some comments that looks like Javadoc/JSDoc and so on. But unlike Java and JavaScript, these comments are part of the Elixir language itself, and we can easily generate the documentation using the ex_doc package, made by the Elixir team itself! Another great thing about ex_doc is that it supports markdown natively, so you can write your documentation the same way you'd write your markdown files inside your repo!

But the best is yet to come. As you can see, we're writing a code example inside our documentation:

  @doc"""
  This function recursively computes the factorial of a given number when the
  passed argument is greater than 0.

  ## Examples
    iex> Example.factorial(10)
    3628800
  """

this code example will run as a part of your unit tests suite, so you're always sure to write documentation that actually behaves as expected!

#4) Speeding up Data Normalization Process

As we've sad before, Elixir itself is really, really fast. But thanks to it's concurrency capabilities, we're able to run the code concurrently with ease, treating our data as a stream.

Today we're using a lot of trivial example, so here it is another one. Let's say that we have a list of 100000 integers and we want to multiply each integer by 3, filter out all the odd ones, and sum all the remaining numbers in the list.

1..100_000 
    |> Enum.map(&(&1 * 3))
    |> Enum.filter(&(rem(&1, 2) == 0))
    |> Enum.sum

Not used yet to the & shorthand? Here it is the extendend anonymous functions form:

1..100_000 
    |> Enum.map(fn x -> x * 3 end)
    |> Enum.filter(fn x -> rem(x, 2) == 0 end)
    |> Enum.sum

by the way, how should we do to convert this into a stream? Just use the Stream keyword instead of Enum (enumerable)!

1..100_000 
    |> Stream.map(&(&1 * 3))
    |> Stream.filter(&(rem(&1, 2) == 0))
    |> Enum.sum

The Stream module supports lazy operations. That means that a function won't be evaluated untile it is strictly necessary. This is incredibly useful if you're working with massive amounts of data!

Also, in order to improve data normalization performances, we've changed our caching mechanism, but we'll see it in the next paragraph.

#5) Saying goodbye to Redis

I know, Redis is awesome. Seriously, It's probably one of the most beautiful pieces of engineering out there... but thanks to the Erlang VM, we just don't need it.

In fact, the Open Source distribution of Erlang, ships with the Open Telecom Platform (OTP), which is an outstanding collection of modules and tools that can be natively called by Elixir and Erlang.

  • the Erlang VM itself (called BEAM)
  • an Erlang Compiler
  • a static analysis tool (Dialyzer)
  • Yecc (a LALR-1 Parser Generator, similar to YACC)
  • Leex (a lexical analyzer generator for Erlang)
  • Observer (tools for tracing and investigation of distributed systems)
  • Mnesia (a distributed, transactional database)
  • ETS (a distributed, key-value, in-memory database )

and many other incredibly useful and battle tested modules.

You can think of the BEAM like an operating system, which allows you to use it's built-in tools such as databases, compilers, lexers, HTTP/WebSocket interfaces (yes, the BEAM supports them natively) and so on.

By the way, we want to concentrate on ETS. As said before, Erlang Term Storage (ETS) is "an interface to the Erlang built-in term storage BIFs. These provide the ability to store very large quantities of data in an Erlang runtime system, and to have constant access time to the data." (source).

ETS can be easily used as a built-in caching mechanism (let's see an example using the Erlang built-int REPL):

1> T = ets:new(t,[ordered_set]), ets:insert(T, {"555-1234", "John Smith"}).
true

2> ets:match(T,{[$5,$5,$5,$- |'$1'],'$2'}).
[["1234","John Smith"]]

I won't dig deep into the Erlang syntax right now, but I can promise you that building a caching layer with Elixir using ETS it's incredibly easy.

Do you want to make the caching layer distributed? Well, Mnesia actually wraps both ETS and DETS (Disk Erlang Term Storage, basically a disk-persistent version of ETS), adding a distributed transaction layer to our cache!

Want to have a more detailed look at the differences between ETS/Mnesia and Redis here it is an amazing article about that: https://rollout.io/blog/elixir-ets-vs-redis

#Data Storage

As said at the beginning of the article, we've started our first proof of concept using CouchDB as a master data storage. While it work really well, we feel like it isn't the best solution for our team. We had some experiences in the past with Cassandra, and its CQL query languages brought us back to the joy of writing SQL-like queries.

This is not the only reason we choose Cassandra. In fact, we store our data with a day interval, but we keep on updating this data every few seconds to keep it as fresh as possible. With CouchDB, in order to update the data, we needed to make a query to check if the data already exist, take its _rev id, then spawn an update query. It is not technically difficult, but it wastes a lot of time. With Cassandra, we just can spawn an insert query, and if the data already exists, it will jut update it. Also, after some benchmarks, Cassandra's insterts seems to be a bit faster than the CouchDB ones (for the kind of data that we're working on, of course).

Another thing that we have thought a lot was... "do we really need CouchDB"? I mean, it has everything we need (big data storage capabilities, high performances at scale)... but is it the right choice?

  • Are we using its built-in REST APIs? No.
  • Are we using its powerful sync capabilities? No.
  • Do we need a schema-free database? No.

so what do we need?

  • Horizontally scalable database. Cassandra ✅
  • No single point of failure. Cassandra ✅
  • Multi datacenter replication. Cassandra ✅
  • Non-schema-free database. Cassandra ✅
  • SQL-like syntax is a plus. Cassandra ✅
  • map/reduce is a plus. Cassandra ✅

so the choice seemed to be pretty obvious! And before you're asking: yes, CouchDB is written in Erlang. Sadly, it doesn't have any native Erlang driver, so there will be the same latence that we have while connecting to other databases.

Another wise question would be: "why aren't you just using Mnesia"? Well, that's a good question. We could use Mnesia, of course... and maybe we will, in the future! There's also an Ecto (Elixir ORM) connector for Mnesia, but we'll need to investigate more if that's the right choice for us.

#REST APIs and GraphQL layer

One last thing that I'd like to talk about is how we handle REST APIs. During the past years, many web framework came to life, and most of them were really life-changing.

One of the most loved framework ever is Ruby on Rails for sure. I can't even name all the startups that are using (or have used) RoR as web framework for bootstrapping their services quickly and efficiently: Twitter, GitHub and Basecamp are just few of them.

Elixir has its own MVC framework, called Phoenix, and it has its roots in Ruby on Rails, but it improves its performances, maintanability and so on. I've personally never heared of a single Elixir Phoenix developer complaining about that framework.

As said above, the Erlang VM already has support for HTTP, WebSockets and PubSub, but having an MVC framework will really help us to write critical tasks such as requests authorization more easily.

Given that this project frontend is enterely feeded by GraphQL, we can also add support for that query language with the amazing Elixir Absinthe: https://github.com/absinthe-graphql/absinthe.

#Some thoughts about that architecture

I personally fell in love with Elixir about four years ago. While it's not a widely used language yet, I am absolutely sure that it will grow in popularity in the next few years.

That's because the internet scenario is changing a lot, and traditional technologies are not enough. Just take a look at Scala: why the heck should I ever add tons of libraries and frameworks just to add something that is already built in in the Erlang VM such as the actor model? Of course, the actor model is something incredibly useful and solves a lot of problems, but not in the way that the JVM has been built originally.

Why should I add Redis (again, which is awesome) as a caching layer if I do already have Mnesia and ETS? Why should I add Ejabberd, RabbitMQ or Kafka if I already have everything I need inside the BEAM? Why should I write a Domain Specific Language in C/Java/Go/Rust/whatever if the BEAM gives me Yecc and Leex, making it possible to compile Erlang bytecode and integrate into my app?

The Erlang programming language has come to life in the 80's. And it is still solving tons of problems with modern web development.

We've just scratched the surface of this awesome system, and I can promise you, that once you read why Erlang can really solve your problems, you'll never forget it.