baweaver

Beyond Enumerable: Streaming and External Sort


Enumerable is great for processing data right up until you find out what an OOM is and watch your swap space start to eat itself alive. The algorithm might be perfectly correct, but once a file is large enough you have to fundamentally change the way you approach it.

Back at Playstation we were processing near-petabyte scale logs across PSN with Spark, Kafka, and OpenTSDB. Ruby isn’t the production tool for that scale, and it doesn’t need to be. But most of the materials on streaming and external sort live in Scala or Python, and learning these ideas in a language you already think in builds the intuition that transfers when you step into those frameworks. So let’s see what it looks like in Ruby.

A file bigger than the laptop

You’re looking through access logs. On your local machine it’s what, a few thousand lines? Counting 500s is straightforward enough to express:

File.readlines(path).count { |line| line.include?(" 500 ") }

Then you try and ship that same code, and find out the hard way that the real file is forty gigabytes, if not worse. It’s not going to fit into ram and File.readlines is going to try and load the entire thing at once, causing the OS to kill the process. So how do we handle this?

The answer is to stop holding the whole file.

Streaming what you can

When dealing with giant files we often only need a single line at a time. File.foreach does exactly this by reading in one line at a time, allowing the previous one to get collected:

File.foreach(path).count { |line| line.include?(" 500 ") }

With that one change the memory usage goes from 40gb to 23mb (Ruby’s baseline), and that applies whether the file is three million or three billion lines long. It’ll still take a while though.

The same works for anything that condenses the stream to a single result. Sums, minimums, maximums, “does any line match”:

File.foreach(path).sum { |line| line.split(",").fetch(column).to_f }

Do be careful though as map and select only look like they should stream, but in reality they run the complete collection bringing us right back to the initial issue. That’s what we have lazy for, to process elements one at a time rather than as one whole go at once, until a terminal method is called to start the processing:

File.foreach(path).lazy.map { |line| line.upcase }.first(5)

On a million-line file the eager version touches all million lines. The lazy version touches five.

Note: lazy isn’t free. Each element pays a small per-step cost, so for data that fits in memory the eager versions are faster. The win is bounded memory and short-circuiting, not speed. Reach for it when the collection is large or infinite, not as a default.

Where streaming hits the wall

That works great, up until you need more than the next element to make a decision. sort and uniq, for example, will break our current techniques because they require the entire context. How does one sort something you can’t hold?

Cut it into pieces that fit

If you can’t hold all of it, hold what you can instead. By breaking this problem into smaller workable chunks we’re able to take on much larger files:

def write_sorted_runs(input, chunk_size)
  runs = []

  input.each_slice(chunk_size) do |chunk|
    run = Tempfile.new("run")
    chunk.sort.each { |number| run.puts(number) }
    run.flush
    runs << run
  end

  runs
end

Note: each_slice takes a block here on purpose. If you chain it lazily with .map { ... }, the runs are never written, you get back a lazy enumerator instead of your array of files. The block form forces the iteration while still holding only one chunk at a time.

Great, but now we have a bunch of 1gb files to deal with, how do we approach merging them back together without managing to approach the full file size again?

Merging sorted things

Remember back to earlier where we were able to leverage reading only one line at a time? If we have a bunch of files which are already sorted, let’s say two, then we only ever need to deal with comparing two numbers at a time:

def merge_sorted_pair(left, right)
  merged = []
  left_index = 0
  right_index = 0

  while left_index < left.size && right_index < right.size
    if left[left_index] <= right[right_index]
      merged << left[left_index]
      left_index += 1
    else
      merged << right[right_index]
      right_index += 1
    end
  end

  merged.concat(left[left_index..])
  merged.concat(right[right_index..])
end

We only need to deal with the current front of whatever sorted files we’re dealing with, allowing us to offload a lot of these concerns to disk. Now we feel clever, riiiight up until we remember there were actually forty of those files, not two.

Saying it out loud though, remember back to earlier articles: We want the next smallest number, and after each one we’re going to add the next number from whatever file we just grabbed from. Sounds a lot like a Heap right?:

class Heap
  def initialize(by: ->(item) { item })
    @items = []
    @key = by
  end

  def size = @items.size
  def empty? = @items.empty?
  def peek = @items.first

  def push(item)
    @items << item
    sift_up(@items.size - 1)
    self
  end

  def pop
    return nil if @items.empty?

    root = @items.first
    last = @items.pop
    unless @items.empty?
      @items[0] = last
      sift_down(0)
    end
    root
  end

  private

  def compare(a, b) = @key.call(a) <=> @key.call(b)

  def sift_up(pos)
    while pos.positive?
      parent = (pos - 1) / 2
      break if compare(@items[pos], @items[parent]) >= 0
      @items[pos], @items[parent] = @items[parent], @items[pos]
      pos = parent
    end
  end

  def sift_down(pos)
    count = @items.size
    loop do
      left = (2 * pos) + 1
      right = (2 * pos) + 2
      smallest = pos
      smallest = left if left < count && compare(@items[left], @items[smallest]).negative?
      smallest = right if right < count && compare(@items[right], @items[smallest]).negative?
      break if smallest == pos
      @items[pos], @items[smallest] = @items[smallest], @items[pos]
      pos = smallest
    end
  end
end

We hold exactly one element per source. From there we pop the smallest, emit it, then grab the next element from the source we got it from. When a source is out of elements it stops contributing them, and we can wrap the whole thing into an Enumerator to provide a Ruby-like abstraction over the whole thing:

class KWayMerge
  Front = Struct.new(:value, :source)

  def self.merge(sorted_sources)
    Enumerator.new do |yielder|
      cursors = sorted_sources.map(&:each)
      frontier = Heap.new(by: ->(front) { front.value })

      cursors.each_with_index do |cursor, index|
        pull_next(cursor) { |value| frontier.push(Front.new(value, index)) }
      end

      until frontier.empty?
        smallest = frontier.pop
        yielder << smallest.value
        pull_next(cursors.fetch(smallest.source)) do |value|
          frontier.push(Front.new(value, smallest.source))
        end
      end
    end.lazy
  end

  def self.pull_next(cursor)
    yield cursor.next
  rescue StopIteration
    # source exhausted
  end
end

Now finding the minimum went from scanning forty streams to an O(log k) per element. With k runs and N total records, the merge is O(N log k) time holding only k elements at once. The data pours through a fixed-size heap regardless of how large it is.

Heaps become exceedingly useful for tasks like this, so if you’ve not read the previous article on heaps and priority queues definitely give it a read.

Spill to disk, then merge it back

Now let’s put them back together:

class ExternalSort
  def self.sort(input, chunk_size:)
    run_files = write_sorted_runs(input, chunk_size)
    sources = run_files.map { |file| read_integers(file.path) }
    [KWayMerge.merge(sources), run_files]
  end

  def self.write_sorted_runs(input, chunk_size)
    runs = []

    input.each_slice(chunk_size) do |chunk|
      run = Tempfile.new("run")
      chunk.sort.each { |number| run.puts(number) }
      run.flush
      runs << run
    end

    runs
  end

  def self.read_integers(path)
    File.foreach(path).lazy.map { |line| Integer(line) }
  end
end

That’s an external merge sort. Unix sort does the same thing when a file exceeds RAM, and so do databases when sorting more rows than fit in their buffers.

One caveat: the caller has to keep the run file array alive until the stream is drained. If those Tempfile objects get reaped the files vanish out from under the merge. Easiest fix is to return both the stream and the array, and let the caller close the files explicitly when it’s done consuming.

Counting the cost

The external sort produces the same result as Array#sort at every size tested. The difference is memory:

records in-memory sort external sort runs
2M 55 MB 39 MB 4
5M 117 MB 51 MB 10
10M 184 MB 62 MB 20
20M 365 MB 79 MB 40

In-memory climbs at roughly 17 MB per million records. External sort stays nearly flat because its working set is one chunk plus the heap, regardless of total input size.

Great, so we use external sorting instead, especially if it’s that much more efficient! Except it does incur a cost: time. On twenty million numbers the in-memory sort took about fifteen seconds, the external version closer to three minutes.

Most of that gap is the pure-Ruby heap comparing through a lambda hundreds of millions of times, plus parsing every integer twice (once to disk, once back). The algorithm is the same one sort(1) uses (an external R-way merge), sort(1) runs it in C. You trade wall-clock time for bounded memory. When the input is larger than memory you don’t have a choice.

At that scale Ruby stops being the right tool and you reach for Spark, Flink, or Kafka Streams. But the underlying ideas, chunking, streaming, merge via heap, don’t change when you switch languages. They show up in Scala’s sortMerge joins, in Python’s heapq.merge, in every database’s query planner.

When even streaming with bounded memory isn’t enough, when the data is so large that exact answers become impractical, you start making different trades entirely. Approximate counts instead of exact ones. Probabilistic membership checks instead of full sets. Fixed-size sketches that merge across machines. That’s the next post.

← Prev 4 of 4 Next →