
The question of how fast a Programming language can go through and aggregate 1 billion rows of data has been gaining traction lately. Python, not being the most performant language out there, naturally doesn’t stand a chance – especially since the currently top-performing Java implementation takes only 1.535 seconds!
The fundamental rule of the challenge is that no external libraries are allowed. My goal for today is to start by obeying the rules, and then see what happens if you use external libraries and better-suited file formats.
I’ve run all the scripts 5 times and averaged the results.
As for the hardware, I’m using a 16" M3 Pro Macbook Pro with 12 CPU cores and 36 GB of RAM. Your results may vary if you decide to run the code, but hopefully, you should see similar percentage differences between implementations.
Code
What is the 1 Billion Row Challenge?
The idea behind the 1 Billion Row Challenge (1BRC) is simple – go through a .txt
file that contains arbitrary temperature measurements and calculate summary statistics for each station (min, mean, and max). The only issues are that you’re working with 1 billion rows and that the data is stored in an uncompressed .txt
format (13.8 GB).
The dataset is generated by the data/createMeasurements.py
script on my GitHub repo. I’ve copied the script from the source, just to have everything in the same place.
Once you generate the dataset, you’ll end up with a 13.8 GB semicolon-separated text file with two columns – station name and measurement.
And at the end of the aggregation process, you should end up with something like this:

The actual output format is somewhat different from one implementation to the other, but this is the one I found proposed by the official Python repo.
You now know what the challenge is, so let’s dive into the implementation next!
1 Billion Row Challenge – Pure Python Implementation
This is the only section in which I plan to obey the challenge rules. The reason is simple – Python doesn’t stand a chance with its standard library, and everyone in the industry relies heavily on third-party packages.
Single-Core Implementation
By far the easiest one to implement. You go through the text file and keep track of the measurements in a dictionary. It’s simple for min and max calculations, but mean requires keeping track of the count and then dividing the results.
# https://github.com/ifnesi/1brc#submitting
# Modified the multiprocessing version
def process_file(file_name: str):
result = dict()
with open(file_name, "rb") as f:
for line in f:
location, measurement = line.split(b";")
measurement = float(measurement)
if location not in result:
result[location] = [
measurement,
measurement,
measurement,
1,
]
else:
_result = result[location]
if measurement < _result[0]:
_result[0] = measurement
if measurement > _result[1]:
_result[1] = measurement
_result[2] += measurement
_result[3] += 1
print("{", end="")
for location, measurements in sorted(result.items()):
print(
f"{location.decode('utf8')}={measurements[0]:.1f}/{(measurements[2] / measurements[3]) if measurements[3] !=0 else 0:.1f}/{measurements[1]:.1f}",
end=", ",
)
print("bb} ")
if __name__ == "__main__":
process_file("data/measurements.txt")
Multi-core implementation
The same idea as before, but now you need to split the text file into equally sized chunks and process them in parallel. Here, you compute the statistics for each chunk and then combine the results.
# Code credits: https://github.com/ifnesi/1brc#submitting
import os
import multiprocessing as mp
def get_file_chunks(
file_name: str,
max_cpu: int = 8,
) -> list:
"""Split flie into chunks"""
cpu_count = min(max_cpu, mp.cpu_count())
file_size = os.path.getsize(file_name)
chunk_size = file_size // cpu_count
start_end = list()
with open(file_name, "r+b") as f:
def is_new_line(position):
if position == 0:
return True
else:
f.seek(position - 1)
return f.read(1) == b"n"
def next_line(position):
f.seek(position)
f.readline()
return f.tell()
chunk_start = 0
while chunk_start < file_size:
chunk_end = min(file_size, chunk_start + chunk_size)
while not is_new_line(chunk_end):
chunk_end -= 1
if chunk_start == chunk_end:
chunk_end = next_line(chunk_end)
start_end.append(
(
file_name,
chunk_start,
chunk_end,
)
)
chunk_start = chunk_end
return (
cpu_count,
start_end,
)
def _process_file_chunk(
file_name: str,
chunk_start: int,
chunk_end: int,
) -> dict:
"""Process each file chunk in a different process"""
result = dict()
with open(file_name, "rb") as f:
f.seek(chunk_start)
for line in f:
chunk_start += len(line)
if chunk_start > chunk_end:
break
location, measurement = line.split(b";")
measurement = float(measurement)
if location not in result:
result[location] = [
measurement,
measurement,
measurement,
1,
] # min, max, sum, count
else:
_result = result[location]
if measurement < _result[0]:
_result[0] = measurement
if measurement > _result[1]:
_result[1] = measurement
_result[2] += measurement
_result[3] += 1
return result
def process_file(
cpu_count: int,
start_end: list,
) -> dict:
"""Process data file"""
with mp.Pool(cpu_count) as p:
# Run chunks in parallel
chunk_results = p.starmap(
_process_file_chunk,
start_end,
)
# Combine all results from all chunks
result = dict()
for chunk_result in chunk_results:
for location, measurements in chunk_result.items():
if location not in result:
result[location] = measurements
else:
_result = result[location]
if measurements[0] < _result[0]:
_result[0] = measurements[0]
if measurements[1] > _result[1]:
_result[1] = measurements[1]
_result[2] += measurements[2]
_result[3] += measurements[3]
# Print final results
print("{", end="")
for location, measurements in sorted(result.items()):
print(
f"{location.decode('utf8')}={measurements[0]:.1f}/{(measurements[2] / measurements[3]) if measurements[3] !=0 else 0:.1f}/{measurements[1]:.1f}",
end=", ",
)
print("bb} ")
if __name__ == "__main__":
cpu_count, *start_end = get_file_chunks("data/measurements.txt", max_cpu=12)
process_file(cpu_count=cpu_count, start_end=start_end[0])
PyPy implementation
Leverages the multiprocessing implementation to a large extent, but uses PyPy instead of CPython. This allows you to use a just-in-time compiler for your Python code. Great in all cases that don’t depend on CPython extensions.
# Code credits: https://github.com/ifnesi/1brc#submitting
import os
import multiprocessing as mp
def get_file_chunks(
file_name: str,
max_cpu: int = 8,
) -> list:
"""Split flie into chunks"""
cpu_count = min(max_cpu, mp.cpu_count())
file_size = os.path.getsize(file_name)
chunk_size = file_size // cpu_count
start_end = list()
with open(file_name, "r+b") as f:
def is_new_line(position):
if position == 0:
return True
else:
f.seek(position - 1)
return f.read(1) == b"n"
def next_line(position):
f.seek(position)
f.readline()
return f.tell()
chunk_start = 0
while chunk_start < file_size:
chunk_end = min(file_size, chunk_start + chunk_size)
while not is_new_line(chunk_end):
chunk_end -= 1
if chunk_start == chunk_end:
chunk_end = next_line(chunk_end)
start_end.append(
(
file_name,
chunk_start,
chunk_end,
)
)
chunk_start = chunk_end
return (
cpu_count,
start_end,
)
def _process_file_chunk(
file_name: str,
chunk_start: int,
chunk_end: int,
blocksize: int = 1024 * 1024,
) -> dict:
"""Process each file chunk in a different process"""
result = dict()
with open(file_name, "r+b") as fh:
fh.seek(chunk_start)
tail = b""
location = None
byte_count = chunk_end - chunk_start
while byte_count > 0:
if blocksize > byte_count:
blocksize = byte_count
byte_count -= blocksize
index = 0
data = tail + fh.read(blocksize)
while data:
if location is None:
try:
semicolon = data.index(b";", index)
except ValueError:
tail = data[index:]
break
location = data[index:semicolon]
index = semicolon + 1
try:
newline = data.index(b"n", index)
except ValueError:
tail = data[index:]
break
value = float(data[index:newline])
index = newline + 1
if location not in result:
result[location] = [
value,
value,
value,
1,
] # min, max, sum, count
else:
_result = result[location]
if value < _result[0]:
_result[0] = value
if value > _result[1]:
_result[1] = value
_result[2] += value
_result[3] += 1
location = None
return result
def process_file(
cpu_count: int,
start_end: list,
) -> dict:
"""Process data file"""
with mp.Pool(cpu_count) as p:
# Run chunks in parallel
chunk_results = p.starmap(
_process_file_chunk,
start_end,
)
# Combine all results from all chunks
result = dict()
for chunk_result in chunk_results:
for location, measurements in chunk_result.items():
if location not in result:
result[location] = measurements
else:
_result = result[location]
if measurements[0] < _result[0]:
_result[0] = measurements[0]
if measurements[1] > _result[1]:
_result[1] = measurements[1]
_result[2] += measurements[2]
_result[3] += measurements[3]
# Print final results
print("{", end="")
for location, measurements in sorted(result.items()):
print(
f"{location.decode('utf-8')}={measurements[0]:.1f}/{(measurements[2] / measurements[3]) if measurements[3] !=0 else 0:.1f}/{measurements[1]:.1f}",
end=", ",
)
print("bb} ")
if __name__ == "__main__":
cpu_count, *start_end = get_file_chunks("data/measurements.txt", max_cpu=12)
process_file(cpu_count, start_end[0])
1BRC Pure Python Results
As for the results, well, take a look for yourself:

That’s pretty much all you can squeeze out of Python’s standard library. Even the PyPy implementation is over 11 times slower than the fastest Java implementation. So no, Python won’t win any speed contest any time soon.
Speeding Things Up – Using 3rd Party Python Libraries
But what if you rely on third-party libraries? I said it before and I’ll say it again – it’s against the rules of the competition – but I’m beyond it at this point. I just want to make it run faster. No one’s going to restrict me to Python’s standard library on my day job anyway.
Pandas
A must-know data analysis library for any Python data professional. Not nearly the fastest one, but has a far superior ecosystem. I’ve used Pandas 2.2.2 with the PyArrow engine when reading the text file.
import pandas as pd
df = (
pd.read_csv("data/measurements.txt", sep=";", header=None, names=["station_name", "measurement"], engine="pyarrow")
.groupby("station_name")
.agg(["min", "mean", "max"])
)
df.columns = df.columns.get_level_values(level=1)
df = df.reset_index()
df.columns = ["station_name", "min_measurement", "mean_measurement", "max_measurement"]
df = df.sort_values("station_name")
print("{", end="")
for row in df.itertuples(index=False):
print(
f"{row.station_name}={row.min_measurement:.1f}/{row.mean_measurement:.1f}/{row.max_measurement:.1f}",
end=", "
)
print("bb} ")
Dask
Almost identical API to Pandas, but is lazy evaluated. You can use Dask to scale Pandas code across CPU cores locally or across machines on a cluster.
import dask.dataframe as dd
df = (
dd.read_csv("data/measurements.txt", sep=";", header=None, names=["station_name", "measurement"], engine="pyarrow")
.groupby("station_name")
.agg(["min", "mean", "max"])
.compute()
)
df.columns = df.columns.get_level_values(level=1)
df = df.reset_index()
df.columns = ["station_name", "min_measurement", "mean_measurement", "max_measurement"]
df = df.sort_values("station_name")
print("{", end="")
for row in df.itertuples(index=False):
print(
f"{row.station_name}={row.min_measurement:.1f}/{row.mean_measurement:.1f}/{row.max_measurement:.1f}",
end=", "
)
print("bb} ")
Polars
Similar to Pandas, but has a multi-threaded query engine written in Rust and offers order of operation optimization. I’ve written about it previously.
# Code credits: https://github.com/ifnesi/1brc#submitting
import polars as pl
df = (
pl.scan_csv("data/measurements.txt", separator=";", has_header=False, with_column_names=lambda cols: ["station_name", "measurement"])
.group_by("station_name")
.agg(
pl.min("measurement").alias("min_measurement"),
pl.mean("measurement").alias("mean_measurement"),
pl.max("measurement").alias("max_measurement")
)
.sort("station_name")
.collect(streaming=True)
)
print("{", end="")
for row in df.iter_rows():
print(
f"{row[0]}={row[1]:.1f}/{row[2]:.1f}/{row[3]:.1f}",
end=", "
)
print("bb} ")
DuckDB
Open-source, embedded, in-process, relational OLAP DBMS that is typically orders of magnitude faster than Pandas. You can use it from the shell or over 10 different programming languages. In most of them, you can choose between a traditional analytical interface and a SQL interface. I’ve written about it previously.
import duckdb
with duckdb.connect() as conn:
data = conn.sql("""
select
station_name,
min(measurement) as min_measurement,
cast(avg(measurement) as decimal(8, 1)) as mean_measurement,
max(measurement) as max_measurement
from read_csv(
'data/measurements.txt',
header=false,
columns={'station_name': 'varchar', 'measurement': 'decimal(8, 1)'},
delim=';',
parallel=true
)
group by station_name
order by station_name
""")
print("{", end="")
for row in sorted(data.fetchall()):
print(
f"{row[0]}={row[1]}/{row[2]}/{row[3]}",
end=", ",
)
print("bb} ")
1BRC Third-Party Library Results
The results are interesting, to say at least:

Pandas is slow – no surprises here. Dask offers pretty much the same performance as multi-core Python implementation, but with around 100 lines of code less. Polars and DuckDB reduce the runtime to below 10 seconds, which is impressive!
Going One Step Further – Ditching .txt for .parquet
There’s still one bit of performance gain we can squeeze out, and that’s changing the data file format. The data/convertToParquet.py
file in the repo will do just that.
The idea is to go from uncompressed and unoptimized 13.8 GB of text data to a compressed and columnar-oriented 2.51 GB Parquet file.
The libraries remain the same, so it doesn’t make sense to explain them again. I’ll just provide the source code:
Pandas
import pandas as pd
df = (
pd.read_parquet("data/measurements.parquet", engine="pyarrow")
.groupby("station_name")
.agg(["min", "mean", "max"])
)
df.columns = df.columns.get_level_values(level=1)
df = df.reset_index()
df.columns = ["station_name", "min_measurement", "mean_measurement", "max_measurement"]
df = df.sort_values("station_name")
print("{", end="")
for row in df.itertuples(index=False):
print(
f"{row.station_name}={row.min_measurement:.1f}/{row.mean_measurement:.1f}/{row.max_measurement:.1f}",
end=", "
)
print("bb} ")
Dask
import dask.dataframe as dd
df = (
dd.read_parquet("data/measurements.parquet")
.groupby("station_name")
.agg(["min", "mean", "max"])
.compute()
)
df.columns = df.columns.get_level_values(level=1)
df = df.reset_index()
df.columns = ["station_name", "min_measurement", "mean_measurement", "max_measurement"]
df = df.sort_values("station_name")
print("{", end="")
for row in df.itertuples(index=False):
print(
f"{row.station_name}={row.min_measurement:.1f}/{row.mean_measurement:.1f}/{row.max_measurement:.1f}",
end=", "
)
print("bb} ")
Polars
import polars as pl
df = (
pl.scan_parquet("data/measurements.parquet")
.group_by("station_name")
.agg(
pl.min("measurement").alias("min_measurement"),
pl.mean("measurement").alias("mean_measurement"),
pl.max("measurement").alias("max_measurement")
)
.sort("station_name")
.collect(streaming=True)
)
print("{", end="")
for row in df.iter_rows():
print(
f"{row[0]}={row[1]:.1f}/{row[2]:.1f}/{row[3]:.1f}",
end=", "
)
print("bb} ")
DuckDB
import duckdb
with duckdb.connect() as conn:
data = conn.sql("""
select
station_name,
min(measurement) as min_measurement,
cast(avg(measurement) as decimal(8, 1)) as mean_measurement,
max(measurement) as max_measurement
from parquet_scan('data/measurements.parquet')
group by station_name
order by station_name
""")
print("{", end="")
for row in sorted(data.fetchall()):
print(
f"{row[0]}={row[1]}/{row[2]}/{row[3]}",
end=", ",
)
print("bb} ")
1BRC with Parquet Results
It looks like we have a clear winner:

The DuckDB implementation on the Parquet file format reduced the runtime to below 4 seconds! It’s still about 2.5x times slower than the fastest Java implementation (on .txt
), but it’s something to be happy with.
Conclusion
If there’s one visualization to remember from this article, it has to be the following one:

Sure, only the first three columns obey the official competition rules, but I don’t care. Speed is speed. All is fair in love and Python performance optimization.
Python will never be as fast as Java or any other compiled language – that’s the fact. The question you have to answer is how fast is fast enough. For me, less than 4 seconds for 1 billion rows of data is well below that margin.
What are your thoughts on the 1 Billion Row Challenge? Did you manage to implement a faster solution? Let me know in the comment section below.
Read next:
DuckDB and AWS – How to Aggregate 100 Million Rows in 1 Minute