Enumerable is great for processing data right up until you find out what an OOM is and watch your swap space start to eat itself alive. The algorithm might be perfectly correct, but once a file is large enough you have to fundamentally change the way you approach it.
Back at Playstation we were processing near-petabyte scale logs across PSN with Spark, Kafka, and OpenTSDB. Ruby isn’t the production tool for that scale, and it doesn’t need to be. But most of the materials on streaming and external sort live in Scala or Python, and learning these ideas in a language you already think in builds the intuition that transfers when you step into those frameworks. So let’s see what it looks like in Ruby.
A file bigger than the laptop
You’re looking through access logs. On your local machine it’s what, a few thousand lines? Counting 500s is straightforward enough to express:
File.readlines(path).count { |line| line.include?(" 500 ") }Then you try and ship that same code, and find out the hard way that the real file is forty gigabytes, if not worse. It’s not going to fit into ram and File.readlines is going to try and load the entire thing at once, causing the OS to kill the process. So how do we handle this?
The answer is to stop holding the whole file.
Streaming what you can
When dealing with giant files we often only need a single line at a time. File.foreach does exactly this by reading in one line at a time, allowing the previous one to get collected:
File.foreach(path).count { |line| line.include?(" 500 ") }With that one change the memory usage goes from 40gb to 23mb (Ruby’s baseline), and that applies whether the file is three million or three billion lines long. It’ll still take a while though.
The same works for anything that condenses the stream to a single result. Sums, minimums, maximums, “does any line match”:
File.foreach(path).sum { |line| line.split(",").fetch(column).to_f }Do be careful though as map and select only look like they should stream, but in reality they run the complete collection bringing us right back to the initial issue. That’s what we have lazy for, to process elements one at a time rather than as one whole go at once, until a terminal method is called to start the processing:
File.foreach(path).lazy.map { |line| line.upcase }.first(5)On a million-line file the eager version touches all million lines. The lazy version touches five.
Note:
lazyisn’t free. Each element pays a small per-step cost, so for data that fits in memory the eager versions are faster. The win is bounded memory and short-circuiting, not speed. Reach for it when the collection is large or infinite, not as a default.
Where streaming hits the wall
That works great, up until you need more than the next element to make a decision. sort and uniq, for example, will break our current techniques because they require the entire context. How does one sort something you can’t hold?
Cut it into pieces that fit
If you can’t hold all of it, hold what you can instead. By breaking this problem into smaller workable chunks we’re able to take on much larger files:
def write_sorted_runs(input, chunk_size)
runs = []
input.each_slice(chunk_size) do |chunk|
run = Tempfile.new("run")
chunk.sort.each { |number| run.puts(number) }
run.flush
runs << run
end
runs
endNote:
each_slicetakes a block here on purpose. If you chain it lazily with.map { ... }, the runs are never written, you get back a lazy enumerator instead of your array of files. The block form forces the iteration while still holding only one chunk at a time.
Great, but now we have a bunch of 1gb files to deal with, how do we approach merging them back together without managing to approach the full file size again?
Merging sorted things
Remember back to earlier where we were able to leverage reading only one line at a time? If we have a bunch of files which are already sorted, let’s say two, then we only ever need to deal with comparing two numbers at a time:
def merge_sorted_pair(left, right)
merged = []
left_index = 0
right_index = 0
while left_index < left.size && right_index < right.size
if left[left_index] <= right[right_index]
merged << left[left_index]
left_index += 1
else
merged << right[right_index]
right_index += 1
end
end
merged.concat(left[left_index..])
merged.concat(right[right_index..])
endWe only need to deal with the current front of whatever sorted files we’re dealing with, allowing us to offload a lot of these concerns to disk. Now we feel clever, riiiight up until we remember there were actually forty of those files, not two.
Saying it out loud though, remember back to earlier articles: We want the next smallest number, and after each one we’re going to add the next number from whatever file we just grabbed from. Sounds a lot like a Heap right?:
class Heap
def initialize(by: ->(item) { item })
@items = []
@key = by
end
def size = @items.size
def empty? = @items.empty?
def peek = @items.first
def push(item)
@items << item
sift_up(@items.size - 1)
self
end
def pop
return nil if @items.empty?
root = @items.first
last = @items.pop
unless @items.empty?
@items[0] = last
sift_down(0)
end
root
end
private
def compare(a, b) = @key.call(a) <=> @key.call(b)
def sift_up(pos)
while pos.positive?
parent = (pos - 1) / 2
break if compare(@items[pos], @items[parent]) >= 0
@items[pos], @items[parent] = @items[parent], @items[pos]
pos = parent
end
end
def sift_down(pos)
count = @items.size
loop do
left = (2 * pos) + 1
right = (2 * pos) + 2
smallest = pos
smallest = left if left < count && compare(@items[left], @items[smallest]).negative?
smallest = right if right < count && compare(@items[right], @items[smallest]).negative?
break if smallest == pos
@items[pos], @items[smallest] = @items[smallest], @items[pos]
pos = smallest
end
end
endWe hold exactly one element per source. From there we pop the smallest, emit it, then grab the next element from the source we got it from. When a source is out of elements it stops contributing them, and we can wrap the whole thing into an Enumerator to provide a Ruby-like abstraction over the whole thing:
class KWayMerge
Front = Struct.new(:value, :source)
def self.merge(sorted_sources)
Enumerator.new do |yielder|
cursors = sorted_sources.map(&:each)
frontier = Heap.new(by: ->(front) { front.value })
cursors.each_with_index do |cursor, index|
pull_next(cursor) { |value| frontier.push(Front.new(value, index)) }
end
until frontier.empty?
smallest = frontier.pop
yielder << smallest.value
pull_next(cursors.fetch(smallest.source)) do |value|
frontier.push(Front.new(value, smallest.source))
end
end
end.lazy
end
def self.pull_next(cursor)
yield cursor.next
rescue StopIteration
# source exhausted
end
endNow finding the minimum went from scanning forty streams to an O(log k) per element. With k runs and N total records, the merge is O(N log k) time holding only k elements at once. The data pours through a fixed-size heap regardless of how large it is.
Heaps become exceedingly useful for tasks like this, so if you’ve not read the previous article on heaps and priority queues definitely give it a read.
Spill to disk, then merge it back
Now let’s put them back together:
class ExternalSort
def self.sort(input, chunk_size:)
run_files = write_sorted_runs(input, chunk_size)
sources = run_files.map { |file| read_integers(file.path) }
[KWayMerge.merge(sources), run_files]
end
def self.write_sorted_runs(input, chunk_size)
runs = []
input.each_slice(chunk_size) do |chunk|
run = Tempfile.new("run")
chunk.sort.each { |number| run.puts(number) }
run.flush
runs << run
end
runs
end
def self.read_integers(path)
File.foreach(path).lazy.map { |line| Integer(line) }
end
endThat’s an external merge sort. Unix sort does the same thing when a file exceeds RAM, and so do databases when sorting more rows than fit in their buffers.
One caveat: the caller has to keep the run file array alive until the stream is drained. If those Tempfile objects get reaped the files vanish out from under the merge. Easiest fix is to return both the stream and the array, and let the caller close the files explicitly when it’s done consuming.
Counting the cost
The external sort produces the same result as Array#sort at every size tested. The difference is memory:
| records | in-memory sort | external sort | runs |
|---|---|---|---|
| 2M | 55 MB | 39 MB | 4 |
| 5M | 117 MB | 51 MB | 10 |
| 10M | 184 MB | 62 MB | 20 |
| 20M | 365 MB | 79 MB | 40 |
In-memory climbs at roughly 17 MB per million records. External sort stays nearly flat because its working set is one chunk plus the heap, regardless of total input size.
Great, so we use external sorting instead, especially if it’s that much more efficient! Except it does incur a cost: time. On twenty million numbers the in-memory sort took about fifteen seconds, the external version closer to three minutes.
Most of that gap is the pure-Ruby heap comparing through a lambda hundreds of millions of times, plus parsing every integer twice (once to disk, once back). The algorithm is the same one sort(1) uses (an external R-way merge), sort(1) runs it in C. You trade wall-clock time for bounded memory. When the input is larger than memory you don’t have a choice.
At that scale Ruby stops being the right tool and you reach for Spark, Flink, or Kafka Streams. But the underlying ideas, chunking, streaming, merge via heap, don’t change when you switch languages. They show up in Scala’s sortMerge joins, in Python’s heapq.merge, in every database’s query planner.
When even streaming with bounded memory isn’t enough, when the data is so large that exact answers become impractical, you start making different trades entirely. Approximate counts instead of exact ones. Probabilistic membership checks instead of full sets. Fixed-size sketches that merge across machines. That’s the next post.