I Scream, You Scream, We All Scream for an Arrow Schema

R
Apache Arrow
python
Author

Sam Albers

Published

October 2, 2024

Always take the time to write a schema. Sometimes you have to. It is isn’t optional. But in less strongly typed languages like R and python, you can sometimes get away with not specifying a schema because there are such good tools for correctly inferring it. This post is an example of an instance where you absolutely do need to provide schema.

A data structure that is popular is a hive partitioned dataset. This is a cheap data structure that is easy to create (just structured directories!), compatible with s3 and has many tools to take advantage of it. I say “cheap” because it does not come with the overhead of a full fledged database but on some level can act like one. For example, you can efficiently query partitioned data with SQL just like a typical database.

To illustrate why schemas are important for a partitioned dataset we first need to create one. We are going to use R’s internal quakes data. The code below creates a reprex but the main thing to notice is that one partition, stations=50, contains an additional column called wave_height. This scenario mimics a situation where for that station, perhaps you gained a new sensor and thus collected more data (always good!).

library(arrow, warn.conflicts = FALSE)
for (i in unique(quakes$stations)) {
  quakes_per_station <- quakes[quakes$stations == i, ]
  if (i == "50") {
    quakes_per_station$wave_height <- runif(nrow(quakes_per_station), 0, 100)
  }
  write_dataset(quakes_per_station, "quakes_partitioned/", partitioning = "stations")
}

The directory quakes_partitioned will have a bunch of sub-directories of the formation {variable}={value}.

quakes_partitioned/
├── stations=10

If we examine just one of those files, we can see what it contains:

read_parquet("quakes_partitioned/stations=10/part-0.parquet")
# A tibble: 20 × 4
     lat  long depth   mag
   <dbl> <dbl> <int> <dbl>
 1 -21    182.   600   4.4
 2 -23.6  181.   349   4  
 3 -16.3  186     48   4.5
 4 -20.1  184.   186   4.2
 5 -15.0  182.   399   4.1
 6 -19.1  169.   158   4.4
 7 -17.7  185    383   4  
 8 -21.0  181.   483   4.2
 9 -27.2  182.    55   4.6
10 -18.4  183.   343   4.1
11 -20.3  182.   476   4.5
12 -14.8  185.   294   4.1
13 -17.6  182.   548   4.1
14 -20.6  182.   518   4.2
15 -25    180    488   4.5
16 -17.8  185.   223   4.1
17 -20.7  186.    80   4  
18 -21.8  181    618   4.1
19 -21.0  181.   616   4.3
20 -17.7  188.    45   4.2

Note that the station column has been dropped from the parquet file because it is contained within the partitioned structure (i.e. the directory name).

Apache Arrow has considerable functionality for dealing with partitioned data via its dataset layer. The easiest way for us to take advantage of this in R is to use open_dataset, point it at the partitioned directory and tell it that there is a partitioned variable:

open_dataset("quakes_partitioned/", partitioning = "stations")
FileSystemDataset with 102 Parquet files
5 columns
lat: double
long: double
depth: int32
mag: double
stations: int32

See $metadata for additional Schema metadata

This is a very fast operation that quickly collects some basic metadata from our quakes dataset. However we can see clearly that we are missing the added water_temp column from station 50. If we investigate that single parquet file we can see that column there:

read_parquet("quakes_partitioned/stations=50/part-0.parquet")
# A tibble: 10 × 5
     lat  long depth   mag wave_height
   <dbl> <dbl> <int> <dbl>       <dbl>
 1 -22.6  181.   544   5          66.2
 2 -20.6  182.   529   5          41.6
 3 -22.9  173.    56   5.1        75.3
 4 -23.3  184    164   4.8        79.6
 5 -20.5  182.   559   4.9        65.7
 6 -26.5  178.   609   5          28.9
 7 -25.0  180.   505   4.9        47.1
 8 -23.4  180.   541   4.6        83.2
 9 -23.9  180.   524   4.6        43.7
10 -20.9  185.    82   4.9        68.7

So what’s happened here? An arrow dataset just uses the first file to construct the metadata. Because water_temp is not present there, it is not picked up by a open_dataset call.

How can we fix this? A schema! If we take the time to create a schema, we can tell open_dataset: “hey there is another column in there that we are also interested in”. The arrow R package provides a schema function to construct this:

quake_schema <- schema(
  lat = float64(),
  long = float64(),
  depth = int32(),
  mag = float64(),
  wave_height = float64(),
  station = string()
)
quake_schema
Schema
lat: double
long: double
depth: int32
mag: double
wave_height: double
station: string

Now we can provide that schema to the open_dataset function and see what happens:

quakes_dataset <- open_dataset(
  "quakes_partitioned/",
  partitioning = "stations",
  schema = quake_schema
)
quakes_dataset
FileSystemDataset with 102 Parquet files
6 columns
lat: double
long: double
depth: int32
mag: double
wave_height: double
station: string

open_dataset has recognized our wave_height column! We explicitly told arrow about it through a schema and so now you can work with it as you would any other column.

One other option that is worth discussing is setting the unify_schemas argument to TRUE. From the arrow documentation:

should all data fragments (files, Datasets) be scanned in order to create a unified schema from them? If FALSE, only the first fragment will be inspected for its schema. Use this fast path when you know and trust that all fragments have an identical schema. The default is FALSE when creating a dataset from a directory path/URI or vector of file paths/URIs (because there may be many files and scanning may be slow) but TRUE when sources is a list of Datasets (because there should be few Datasets in the list and their Schemas are already in memory).

This is an excellent heuristic for when you should use unify_schemas. I’d also suggest that you also gain some benefit from explicitly writing our your schema types as well the fields being used. Also specifying the schema in the way discussed above happens to be a bit a tiny bit faster on these data:

library(bench)
library(dplyr, warn.conflicts = FALSE)

bench::mark(
  set_schema = open_dataset(
    "quakes_partitioned/",
    partitioning = "stations",
    schema = quake_schema
  ) |>
    collect(),
  unify_schema = open_dataset(
    "quakes_partitioned/",
    partitioning = "stations",
    unify_schemas = TRUE
  ) |>
    collect(),
  check = FALSE
)
# A tibble: 2 × 6
  expression        min  median `itr/sec` mem_alloc `gc/sec`
  <bch:expr>   <bch:tm> <bch:t>     <dbl> <bch:byt>    <dbl>
1 set_schema     32.9ms  34.9ms      28.6    1.08MB     4.76
2 unify_schema   36.6ms  38.6ms      25.8   16.38KB     5.17

It is also worth mentioning that this is not a R specific thing. The same pattern exists in pyarrow:

import pyarrow as pa
import pyarrow.dataset as ds

quakes_partitioned = ds.dataset("quakes_partitioned/", partitioning = "hive")
quakes_partitioned.schema
lat: double
long: double
depth: int32
mag: double
stations: int32
-- schema metadata --
r: 'A
3
263168
197888
5
UTF-8
531
1
531
5
254
254
254
254
254
1026
1
2621' + 128

I am not certain quite what the schema metadata is telling us here and that’ll have to be an investigation for another day. But the thing you can notice here is that, again, in the absense of a schema, we don’t detect that wave_height column. But if we instead provide it, we are able to detect it:

quake_schema = pa.schema(
    [
        ("lat", pa.float64()),
        ("long", pa.float64()),
        ("depth", pa.int32()),
        ("mag", pa.float64()),
        ("wave_height", pa.float64()),
        ("stations", pa.string()),
    ]
)

quakes_partitioned = ds.dataset("quakes_partitioned/", partitioning = "hive", schema=quake_schema)
quakes_partitioned.schema
lat: double
long: double
depth: int32
mag: double
wave_height: double
stations: string

Explicitly setting your schema, illustrated here with Apache Arrow datasets, can be really important. You always have a schema - this process just reminds us that you either let it be inferred by another program or you tell your computer exactly what you want.