Building a DataFusion CSV reader with arrow-extendr
extending R with Arrow and Rust
Want to skip to the end with the source code? Click here.
Goal
For this tutorial we’re going to create a very simple Rust-based R package using extendr
and arrow-extendr
. The package will use the new and very powerful DataFusion
crate to create a csv reader.
“DataFusion is a very fast, extensible query engine for building high-quality data-centric systems in Rust, using the Apache Arrow in-memory format.”
We’ll learn a little bit about how extendr
, Rust, and arrow-extendr
works along the way.
Create a new R package
We will use {usethis}
to create a new R package called dfusionrdr
(prnounced d-fusion reader).
The following section is the standard process for creating a new Rust based R package. It’s a pretty simple process once you get used to it!
::create_package("dfusionrdr") usethis
This will open a new R project with the scaffolding of an R package. From here, we need to make the R package into an extendr R package. To do so we userextendr::use_extendr()
.
use_extendr()
creates the directory src/
a rust crate in src/rust/
as wll as a few Makevars
files in src/
that are used to define how to compile the Rust library. Rust is a compiled language unlike R and Python which are interpreted. Meaning that instead of being able to run code line by line, we have to run it all at once.
Compiled code can be turned into something called a static library. R can call functions and objects from these libraries using the .Call()
function. You do not need to worry about this function. It’s just for context. :)
::use_extendr() rextendr
Before running this, make sure you have a compatible Rust installation by running rextendr::rust_sitrep()
. If you do not, it will tell you need to do. If you’re on windows, you’re likely missing a target.
Building your package
Once you’ve initialized extendr in your package, we can check to see if everything worked by running the hello_world()
function that is included. To do so, we can build our package, then document it.
I use the RStudio shortcut to build my package which is cmd + shift + b
or if on Windows it’s (probably) ctrl + shift + b
. If neither of those work for you, run devtools::build()
.
To make R functions from Rust available into R, we run rextendr::document()
.
rextendr::document()
will also compile your R package for you if need be. Personally, I prefer to build it then document it. For some reason—and it may just be me—I find that compilation from the console can freeze? The cargo file lock is wonky and I probably mess it up a bunch.
Run devtools::load_all()
to bring the documented functions into scope and run the function!
::load_all()
devtoolshello_world()
#> [1] "Hello world!"
We’ve now ran Rust code directly from R. Pretty simple Rust, but Rust nonetheless.
Adding Rust dependencies
Much like how we like to use R packages to make our lives easier, we can use Rust crates (libraries) to make do crafty things. To do so, we will open up our Rust crate in our preferred editor. I prefer VS Code.
If you haven’t configured VS Code to use with Rust, there are like a million different ways to configure it. But at minimum, install the rust-analyzer
, BetterTOML
, and CodeLLDB
extensions (I think CodeLLDB comes with the rust-analyzer though?)
Open src/rust/
in VS Code. Then we will add 3 additional dependencies. These are
datafusion
- a powerful Arrow-based
DataFrame
library (like Polars but different)
- a powerful Arrow-based
tokio
- which will give us the ability to run code lazily and asynchronously which is required by
datafusion
- which will give us the ability to run code lazily and asynchronously which is required by
arrow_extendr
- this is a crate I built that lets us send Arrow data from Rust to R and back
In the terminal run the following
cargo add datafusion
cargo add tokio
cargo add arrow_extendr --git https://github.com/JosiahParry/arrow-extendr
arrow-extendr
is not published on crates.io yet so we need to pass the git flag to tell Rust where to find the library.
This is my preferred way of adding dependencies. If you open up Cargo.toml
you’ll now see these libraries added under the [Dependencies]
heading.
Making R work with DataFusion
DataFusion
requires one additional C library that we need to use we need to add it to our Makevars
. This is not something you typically have to do, but DataFusion requires it from us.
Open Makevars
and Makevars.win
. One the line that starts with PKG_LIBS
add -llzma
to the end.
Again, this is not a common thing you have to do. This is specifically for our use case.
Building our CSV Reader
Open src/lib.rs
. This is where your R package is defined. For larger packages you may want to break it up into multiple smaller files. But our use case is relatively small (and frankly, not that simple, lol!).
Let’s first start by removing our hello_world example from our code. Delete the hello world function (lines 3-8) and remove it from the module declaration under mod dfusionrdr
.
In order to make our Rust functions available to R, we need to include them in our extendr_module!
macro call. Under mod dfusionrdr
we can add additional functions there. Those incldued in there will be made available to R. If the have /// @export
roxygen tag, then they will be exported in the R package as well.
extendr_module! {
mod dfusionrdr;
}
Let’s create the scaffolding for our first function read_csv_dfusion()
#[extendr]
/// @export
fn read_csv(csv_path: &str) {
rprintln!("We will read the csv file at: `{csv_path}`");
}
- The
#[extendr]
macro indicates that this function will be made available to R. - We add
/// @export
to indicate that our function will be exported to R. We can add roxygen2 documentation to our functions by prefixing with///
which a documentation comment wheras//
is a normal comment.
This function prints a message indicating we will read a CSV at the path provided. It takes one argument csv_path
which is an &str
. A &str
in Rust is a like a scalar character in R e.g. "my-file.csv"
Next we need to make sure the function is available to R in the module.
extendr_module! {
mod dfusionrdr;
fn read_csv_dfusion;
}
From RStudio, let’s build, document, and load again.
::build() # 1.
devtools::document() # 2.
rextendr::load_all() # 3. devtools
- Only run if you haven’t built with
cmd + shift + b
- This brings functions into the
NAMESPACE
and updates arguments and outputs - Loads everything from your package into memory
Import dependencies
In order to use DataFusion to read dependencies we need to import it. A lot of Rust libraries have something called a prelude
. The prelude
is a special module that contains common structs, traits, enums, etc that are very useful for the crate. Notice that the top of your lib.rs
includes use extendr_api::prelude::*;
this brings all of the Rust based R objects into scope such as Robj
, Doubles
, Integers
etc.
DataFusion also has a useful prelude
that we want to bring into scope. We will add use datafusion::prelude::*;
to the top of our file (much like adding library()
). This brings important objects into scope for us. We will also need tokio::runtime::Runtime
as well.
The first 3 lines of your lib.rs
should look like this:
use extendr_api::prelude::*;
use datafusion::prelude::*;
use tokio::runtime::Runtime;
Context and Runtime
DataFusion
requires something called a SessionContext
. The session context
“maintains the state of the connection between a user and an instance of the DataFusion engine.”
We need to instantiate this struct inside of our function.
fn read_csv_dfusion(csv_path: &str) {
let ctx = SessionContext::new();
}
We now have a ctx
object which we can use to read our csv. It has a method called read_csv()
. It requires the path of a csv to read as well as a struct called CsvReadOptions
which determines how it will be read into memory. We will pass csv_path
to the first argument and create a default options struct with the new()
method.
fn read_csv_dfusion(csv_path: &str) {
let ctx = SessionContext::new();
let csv = ctx.read_csv(
,
csv_pathCsvReadOptions::new()
;
)}
This will compile with a bunch of warnings about unused variables. But, more importantly, the csv
variable we created is special. If you have your Rust analyzer configured you should see that it is of type impl Future<Output = Result <..., ...>>
. That right there is problematic!
When you see impl Future<...>
that tells us it is an asynchronous result that needs to be polled and executed. async
functions are lazy. They don’t do anything until you ask it to. The way to do this is by calling the .await
attribute. We can then unwrap()
the results and store it into another variable.
It’s typically a pretty bad idea to use .unwrap()
since the program will “panic!” if it does not get a result that it expected. But it’s a pretty handy way to get working code without error handling. I typically handle errors after I’ve gotten the bulk of what I want working.
fn read_csv_dfusion(csv_path: &str) {
let ctx = SessionContext::new();
let csv = ctx.read_csv(
"sdf",
CsvReadOptions::new()
;
)
let csv_result = csv.await.unwrap();
}
If we run cargo check
in our terminal we will get the message:
error[E0728]: `await` is only allowed inside `async` functions and blocks
One way to get this to work would be to add async fn
instead of fn
but that isn’t supported by extendr
since R is single threaded and doesn’t support async
. So how do we get around this?
async
with extendr
and tokio
In order to run async functions we need to execute it in a runtime. tokio
provides this for us with the Runtime
struct. It lets us run impl Future<...>
in a non async function!
We’ll modify our function definition to
fn read_csv_dfusion(csv_path: &str) {
let rt = Runtime::new().unwrap();
let ctx = SessionContext::new();
let csv = ctx.read_csv(
,
csv_pathCsvReadOptions::new()
;
)}
With the Runtime
object rt
we can call the block_on()
method which takes a Future
and runs it until it has completed. This means that we don’t get to use async functionality—e.g. executing 2 or more things at the same time—but we still get to take the result!
Let’s read the csv into an object called df
using the block_on()
method.
fn read_csv_dfusion(csv_path: &str) {
let rt = Runtime::new().unwrap();
let ctx = SessionContext::new();
let csv = ctx.read_csv(
,
csv_pathCsvReadOptions::new()
;
)
let df = rt.block_on(csv).unwrap();
}
The analyzer shows that this is a DataFrame
. Awesome! Now, how can we get this into memory?
Sending DataFrame
s to R with arrow-extendr
This is where arrow-extendr comes into play. arrow-extendr provides a couple of traits which allow us to convert a number of arrow-rs types into an Robj
.
See my post on Rust traits for R users
An Robj
is extendr
’s catch all for any type of object that can be returned to R
The IntoArrowRobj
trait can convert a Vec<RecordBatch>
into an Robj
. The R documentation for a RecordBatch
says
“A record batch is a collection of equal-length arrays matching a particular Schema. It is a table-like data structure that is semantically a sequence of fields, each a contiguous Arrow Array.”
Based on that, a Vec<RecordBatch>
is a collection of chunks of a table-like data structures.
DataFrame
s have a method .collect()
which creates a Vec<RecordBatch>
.
Let’s modify our function to turn the DataFrame into a Vec<RecordBatch>
.
All things with DataFusion are done async so we need to wrap them in rt.block_on()
.
fn read_csv_dfusion(csv_path: &str) {
let rt = Runtime::new().unwrap();
let ctx = SessionContext::new();
let csv = ctx.read_csv(
,
csv_pathCsvReadOptions::new()
;
)
// create a dataframe from the csv
let df = rt.block_on(csv).unwrap();
// collect the results into record batches
let res = rt.block_on(df.collect()).unwrap();
}
With this, we can send the results to R with the into_arrow_robj()
method! First we need to add use arrow_extendr::to::IntoArrowRobj;
to the top of our script to bring the trait into scope.
Then in our function we need to specify the return type as Robj
(see the first line of the definition -> Robj
) and then turn res
into an Robj
fn read_csv_dfusion(csv_path: &str) -> Robj {
let rt = Runtime::new().unwrap();
let ctx = SessionContext::new();
let csv = ctx.read_csv(
,
csv_pathCsvReadOptions::new()
;
)
// create a dataframe from the csv
let df = rt.block_on(csv).unwrap();
// collect the results into record batches
let res = rt.block_on(df.collect()).unwrap();
.into_arrow_robj().unwrap()
res}
Handling arrow-rs from R
Let’s rebuild and document our function again.
I’ve added a csv of {palmerpenguins}
to the inst/
folder of our package for testing. Let’ try reading this in.
<- read_csv_dfusion("inst/penguins.csv")
res
res#> <nanoarrow_array_stream struct<rowid: int64, species: string, island: string, bill_length_mm: string, bill_depth_mm: string, flipper_length_mm: string, body_mass_g: string, sex: string, year: int64>>
#> $ get_schema:function ()
#> $ get_next :function (schema = x$get_schema(), validate = TRUE)
#> $ release :function ()
Now, this doesn’t look very familiar to most R users. This is an object from the {nanoarrow}
R package called "nanoarrow_array_stream"
. This is how data is received from Rust in R. We can process batches from this “stream” using the method get_next()
. But there’s a handy as.data.frame()
method for it.
This is a good time to note that you should add nanoarrow
as a dependency of your package explicitly with usethis::use_package("nanoarrow")
.
<- read_csv_dfusion("inst/penguins.csv")
res as.data.frame(res) |>
head()
#> rowid species island bill_length_mm bill_depth_mm flipper_length_mm
#> 1 1 Adelie Torgersen 39.1 18.7 181
#> 2 2 Adelie Torgersen 39.5 17.4 186
#> 3 3 Adelie Torgersen 40.3 18 195
#> 4 4 Adelie Torgersen NA NA NA
#> 5 5 Adelie Torgersen 36.7 19.3 193
#> 6 6 Adelie Torgersen 39.3 20.6 190
#> body_mass_g sex year
#> 1 3750 male 2007
#> 2 3800 female 2007
#> 3 3250 female 2007
#> 4 NA NA 2007
#> 5 3450 female 2007
#> 6 3650 male 2007
Boom! We’ve written ourselves a reader! Let’s do a simple bench mark comparing it to readr
.
library(dfusionrdr)
::mark(
benchdatafusion = as.data.frame(read_csv_dfusion("inst/penguins.csv")),
readr = readr::read_csv("inst/penguins.csv"),
check = FALSE
)#> # A tibble: 2 × 6
#> expression min median `itr/sec` mem_alloc `gc/sec`
#> <bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt> <dbl>
#> 1 datafusion 922.3µs 1.04ms 915. 1.07MB 0
#> 2 readr 14.6ms 15.31ms 65.6 13.2MB 80.8
Insanely fast!
Addendum
The source code for the entire package is below. It also includes a function read_sql_csv_dfusion()
which takes a SQL statement and reads it into memory if you want to explore that. For example:
<- 'SELECT count(*) as "n", "species" FROM csv GROUP BY "species"'
query
as.data.frame(
read_sql_csv_dfusion(query, "inst/penguins.csv")
)#> n species
#> 1 68 Chinstrap
#> 2 152 Adelie
#> 3 124 Gentoo
Source code:
lib.rs
use arrow_extendr::to::IntoArrowRobj;
use extendr_api::prelude::*;
use datafusion::prelude::*;
use tokio::runtime::Runtime;
use std::result::Result;
#[extendr]
/// @export
fn read_csv_dfusion(csv_path: &str) -> Robj {
let rt = Runtime::new().unwrap();
let ctx = SessionContext::new();
let csv = ctx.read_csv(
,
csv_pathCsvReadOptions::new()
;
)
// create a dataframe from the csv
let df = rt.block_on(csv).unwrap();
// collect the results into record batches
let res = rt.block_on(df.collect()).unwrap();
.into_arrow_robj().unwrap()
res}
#[extendr]
fn read_sql_csv_dfusion(sql_query: &str, csv_path: &str) -> Result<Robj, Error> {
let rt = Runtime::new().unwrap();
let ctx = SessionContext::new();
let csv_tbl_fut = ctx.register_csv(
"csv",
,
csv_pathCsvReadOptions::new(),
;
)
// // we dont assign it to anything because we're just ensuring that it gets ran
let _ = rt.block_on(csv_tbl_fut).unwrap();
// // create a plan
let df = rt.block_on(ctx.sql(sql_query)).unwrap();
// // collect into a vector of record batches
let res = rt.block_on(df.collect()).unwrap();
// return the result to R
.into_arrow_robj()
res}
// Macro to generate exports.
// This ensures exported functions are registered with R.
// See corresponding C code in `entrypoint.c`.
extendr_module! {
mod dfusionrdr;
fn read_sql_csv_dfusion;
fn read_csv_dfusion; }
Cargo.toml
[package]
name = 'dfusionrdr'
publish = false
version = '0.1.0'
edition = '2021'
[lib]
crate-type = [ 'staticlib' ]
name = 'dfusionrdr'
[dependencies]
arrow_extendr = { git = "https://github.com/JosiahParry/arrow-extendr" }
datafusion = "33.0.0"
extendr-api = '*'
tokio = "1.34.0"