At Clever we help 1 in 6 schools in the country sync data on an hourly basis from their student information systems (SISes) to the ed tech apps that their teachers and students use. These 20,000 schools sync about 50 GB of data in aggregate – that’s over a terabyte of data per day. While Node.js has served us in building the API that lets ed tech apps access this data, it broke down when trying to scale up our pipeline for pulling and processing data from SISes, specifically when performing large in-memory joins. Luckily, we figured out a way to take advantage of powerful Unix utilities from within Node.js processes to scale Node.js beyond its memory limits.
The Problem
When we load new data from a district’s SIS, we receive what essentially amounts to a database dump – a set of CSV files representing the different entities in a district: schools, students, teachers, and sections (classes). We also receive a CSV of enrollments, which represents a many-to-many relationship between sections and the students enrolled in them.
If you look at the Clever API docs, you may notice that we provide a slightly denormalized view of this data. Specifically, the list of students enrolled in a section is represented by a list of student ids stored with the section – enrollments aren’t a separate entity.
In order to transform the normalized enrollments data we receive from an SIS into this denormalized format, we need to aggregate the enrollments for each section. If you’re familiar with traditional relational database operators, you might recognize that this can be accomplished with a join.
In SQL, that join might look something like this (if you’re not familiar with SQL joins, check out Wikipedia or this visual guide):
SELECT *
FROM Section LEFT OUTER JOIN Enrollment
ON Section.id = Enrollment.section
We needed a version of this join logic for raw CSV files instead of database tables. When Clever first started, our CSV-processing logic was implemented using Node.js, the same platform as our API. We implemented the join logic we needed using a simple in-memory hash join, avoiding premature optimization.
However, as more and more districts began relying on Clever, it quickly became apparent that in-memory joins were a huge bottleneck. Plus, Node.js processes tend to conk out when they reach their 1.7 GB memory limit, a threshold we were starting to get uncomfortably close to. Once some of the country’s largest districts started using Clever, we realized that loading all of a district’s data into memory at once simply wouldn’t scale.
Optimizing Joins
We considered a few different optimization tactics:
- Using a language with a higher memory limit
- Importing the data into a SQL database and using SQL joins
- Not loading all of the data into memory at once
Reluctant to sink time into rewriting a large piece of code from scratch in a new language, and fearing incurring infrastructure and complexity costs by setting up a new database, we decided to first explore ways to avoid loading all the data into memory at once.
There’s another common join algorithm besides hash join called sort-merge join. The sort-merge join algorithm is pretty simple at a high level: first sort the two tables you want to join by the field you want to join on, and then merge the two sorted tables by traversing both together. It’s conceptually similar to the main insight of merge sort – once you have two sorted sequences, you only need to traverse them once to produce an ordering. In the sort-merge join algorithm, instead of producing an ordering, you’re matching up the items from each sequence that have the same value.
Node.js already supported streams – in fact, we had already implemented our in-memory hash-join to operate on streams – so all we needed to do was implement a way to sort and then merge two streams, and we’d have a memory-efficient join for CSV files.
Leveraging Unix
At Clever, we’re big fans of trying to use the best tool for the job, where “best” is defined by the constraints of the problem at hand. In this case, the important criteria were speed, reliability, and, of course, correctness. When we asked ourselves where we could find a fast, battle-tested implementation of stream sorting and merging, we realized we needed look no further than our operating system – Unix.
Writing simple, general programs that consume and produce text streams is part of the Unix philosophy. Simplicity lets each program focus on doing one thing well, generality means programs can be reused in many different contexts, and the common text stream interface allows programs to be easily composed.
The Unix commands sort
, which sorts text streams, and join
, which merges them, are perfect examples of this philosophy. They each do one thing well – sort uses an external merge-sort algorithm to allow it to sort large inputs quickly without holding all the data in memory. In the spirit of the principles of reuse and composition, we decided to pipe sort
and join
together to implement a sort-merge join.
To wrap sort
and join
in a Node.js stream interface, we first created a stream interface around processes. In Node.js, streams can implement the Writable interface (meaning they consume data), the Readable interface (meaning they produce data), or both (known as a Duplex stream).
The ChildProcess class, which allows you to spawn and communicate with other processes from a Node.js process, exposes a Writable
stream process.stdin
that represents the child process’s stdin
and a Readable
stream process.stdout
that represents the child process’s stdout
. We created a Duplex
stream ProcessStream
that wraps a ChildProcess
instance using its stdin
and stdout
to implement the Writable
and Readable
interfaces respectively:
{Duplex} = require 'stream'
class ProcessStream extends Duplex
constructor: (@process, options={}) ->
super options
@on 'pipe', (source) => source.unpipe(@).pipe @process.stdin
pipe: (dest, options) -> @process.stdout.pipe dest, options
We used ProcessStream
to spawn a sort
process stream and use it to sort a CSV file:
{spawn} = require 'child_process'
fs = require 'fs'
csv_in_stream = fs.createReadStream 'some-data.csv'
sort_stream = new ProcessStream spawn 'sort', [
'-k', '1,1' # sort on the first column
'-t', ',' # use comma as the column delimiter
]
csv_out_stream = fs.createWriteStream 'some-data-sorted.csv'
csv_in_stream.pipe(sort_stream).pipe(csv_out_stream)
Using this approach, we created open-source stream wrappers of sort
and join
: unix-sort and unix-join. They consume and produce JSON objects, since we parse CSVs to JSON earlier in our pipeline. Using unix-sort
and unix-join
, we implemented a sort-merge join:
unix_sort = require 'unix-sort'
unix_join = require 'unix-join'
sort_merge_join = (type, left_stream, left_key,
right_stream, right_key) ->
left_sorted = left.pipe unix_sort [left_key]
right_sorted = right.pipe unix_sort [right_key]
join_options = type: type, on: {}
join_options.on[left_key] = right_key
# unix_join is a function that takes two streams and returns
# a stream that produces the results of the join.
unix_join left_sorted, right_sorted, join_options
We swapped this in for our previous hash join implementation when joining sections and enrollments:
sections = ... # a stream of JSON objects with an `id` field
enrollments = ... # a stream of JSON objects with a `section` field
sections_with_students = sort_merge_join 'left', sections, 'id',
enrollments, 'section'
Conclusion
Once we switched to a sort-merge join, our bottleneck disappeared. A bonus benefit: we’ve been able to reuse these efficient joins to provide our API consumers with a create/update/delete event stream. Each time we sync data from an SIS, we do a full outer join with the existing data in our system to compute the diff.
While we could have invested time and energy in major language or infrastructure changes, we instead solved our problem quickly and simply using the tools already at our disposal.
Think it’s a hacky solution? Made similar tradeoffs between engineering effort and elegance? Let us know in the comments on Hacker News.