Building a DataFusion CSV reader with arrow-extendr

extending R with Arrow and Rust

rust
pkg-dev
extendr
arrow
Author

Josiah Parry

Published

November 25, 2023

Want to skip to the end with the source code? Click here.

Goal

For this tutorial we’re going to create a very simple Rust-based R package using extendr and arrow-extendr. The package will use the new and very powerful DataFusion crate to create a csv reader.

“DataFusion is a very fast, extensible query engine for building high-quality data-centric systems in Rust, using the Apache Arrow in-memory format.”

We’ll learn a little bit about how extendr, Rust, and arrow-extendr works along the way.

Create a new R package

We will use {usethis} to create a new R package called dfusionrdr (prnounced d-fusion reader).

The following section is the standard process for creating a new Rust based R package. It’s a pretty simple process once you get used to it!

usethis::create_package("dfusionrdr")

This will open a new R project with the scaffolding of an R package. From here, we need to make the R package into an extendr R package. To do so we userextendr::use_extendr().

use_extendr() creates the directory src/ a rust crate in src/rust/ as wll as a few Makevars files in src/ that are used to define how to compile the Rust library. Rust is a compiled language unlike R and Python which are interpreted. Meaning that instead of being able to run code line by line, we have to run it all at once.

Compiled code can be turned into something called a static library. R can call functions and objects from these libraries using the .Call() function. You do not need to worry about this function. It’s just for context. :)

rextendr::use_extendr()

Before running this, make sure you have a compatible Rust installation by running rextendr::rust_sitrep(). If you do not, it will tell you need to do. If you’re on windows, you’re likely missing a target.

Building your package

Once you’ve initialized extendr in your package, we can check to see if everything worked by running the hello_world() function that is included. To do so, we can build our package, then document it.

Tip

I use the RStudio shortcut to build my package which is cmd + shift + b or if on Windows it’s (probably) ctrl + shift + b. If neither of those work for you, run devtools::build().

To make R functions from Rust available into R, we run rextendr::document().

rextendr::document() will also compile your R package for you if need be. Personally, I prefer to build it then document it. For some reason—and it may just be me—I find that compilation from the console can freeze? The cargo file lock is wonky and I probably mess it up a bunch.

Run devtools::load_all() to bring the documented functions into scope and run the function!

devtools::load_all()
hello_world()
#> [1] "Hello world!"

We’ve now ran Rust code directly from R. Pretty simple Rust, but Rust nonetheless.

Adding Rust dependencies

Much like how we like to use R packages to make our lives easier, we can use Rust crates (libraries) to make do crafty things. To do so, we will open up our Rust crate in our preferred editor. I prefer VS Code.

If you haven’t configured VS Code to use with Rust, there are like a million different ways to configure it. But at minimum, install the rust-analyzer, BetterTOML, and CodeLLDB extensions (I think CodeLLDB comes with the rust-analyzer though?)

Open src/rust/ in VS Code. Then we will add 3 additional dependencies. These are

  • datafusion
    • a powerful Arrow-based DataFrame library (like Polars but different)
  • tokio
    • which will give us the ability to run code lazily and asynchronously which is required by datafusion
  • arrow_extendr
    • this is a crate I built that lets us send Arrow data from Rust to R and back

In the terminal run the following

cargo add datafusion
cargo add tokio
cargo add arrow_extendr --git https://github.com/JosiahParry/arrow-extendr

arrow-extendr is not published on crates.io yet so we need to pass the git flag to tell Rust where to find the library.

Note

This is my preferred way of adding dependencies. If you open up Cargo.toml you’ll now see these libraries added under the [Dependencies] heading.

Making R work with DataFusion

DataFusion requires one additional C library that we need to use we need to add it to our Makevars. This is not something you typically have to do, but DataFusion requires it from us.

Open Makevars and Makevars.win. One the line that starts with PKG_LIBS add -llzma to the end.

Again, this is not a common thing you have to do. This is specifically for our use case.

Building our CSV Reader

Open src/lib.rs. This is where your R package is defined. For larger packages you may want to break it up into multiple smaller files. But our use case is relatively small (and frankly, not that simple, lol!).

Let’s first start by removing our hello_world example from our code. Delete the hello world function (lines 3-8) and remove it from the module declaration under mod dfusionrdr.

Tip

In order to make our Rust functions available to R, we need to include them in our extendr_module! macro call. Under mod dfusionrdr we can add additional functions there. Those incldued in there will be made available to R. If the have /// @export roxygen tag, then they will be exported in the R package as well.

extendr_module! {
    mod dfusionrdr;
}

Let’s create the scaffolding for our first function read_csv_dfusion()

#[extendr]
/// @export 
fn read_csv(csv_path: &str)  {
  rprintln!("We will read the csv file at: `{csv_path}`");
}
  1. The #[extendr] macro indicates that this function will be made available to R.
  2. We add /// @export to indicate that our function will be exported to R. We can add roxygen2 documentation to our functions by prefixing with /// which a documentation comment wheras // is a normal comment.

This function prints a message indicating we will read a CSV at the path provided. It takes one argument csv_path which is an &str. A &str in Rust is a like a scalar character in R e.g. "my-file.csv"

Next we need to make sure the function is available to R in the module.

extendr_module! {
    mod dfusionrdr;
    fn read_csv_dfusion;
}

From RStudio, let’s build, document, and load again.

devtools::build()    # 1. 
rextendr::document() # 2. 
devtools::load_all() # 3.
  1. Only run if you haven’t built with cmd + shift + b
  2. This brings functions into the NAMESPACE and updates arguments and outputs
  3. Loads everything from your package into memory

Import dependencies

In order to use DataFusion to read dependencies we need to import it. A lot of Rust libraries have something called a prelude. The prelude is a special module that contains common structs, traits, enums, etc that are very useful for the crate. Notice that the top of your lib.rs includes use extendr_api::prelude::*; this brings all of the Rust based R objects into scope such as Robj, Doubles, Integers etc.

DataFusion also has a useful prelude that we want to bring into scope. We will add use datafusion::prelude::*; to the top of our file (much like adding library()). This brings important objects into scope for us. We will also need tokio::runtime::Runtime as well.

The first 3 lines of your lib.rs should look like this:

use extendr_api::prelude::*;
use datafusion::prelude::*;
use tokio::runtime::Runtime;

Context and Runtime

DataFusion requires something called a SessionContext. The session context

“maintains the state of the connection between a user and an instance of the DataFusion engine.”

We need to instantiate this struct inside of our function.

fn read_csv_dfusion(csv_path: &str) {
    let ctx = SessionContext::new();
}

We now have a ctx object which we can use to read our csv. It has a method called read_csv(). It requires the path of a csv to read as well as a struct called CsvReadOptions which determines how it will be read into memory. We will pass csv_path to the first argument and create a default options struct with the new() method.

fn read_csv_dfusion(csv_path: &str) {
    let ctx = SessionContext::new();
    let csv = ctx.read_csv(
        csv_path,
        CsvReadOptions::new()
    );
}

This will compile with a bunch of warnings about unused variables. But, more importantly, the csv variable we created is special. If you have your Rust analyzer configured you should see that it is of type impl Future<Output = Result <..., ...>>. That right there is problematic!

When you see impl Future<...> that tells us it is an asynchronous result that needs to be polled and executed. async functions are lazy. They don’t do anything until you ask it to. The way to do this is by calling the .await attribute. We can then unwrap() the results and store it into another variable.

It’s typically a pretty bad idea to use .unwrap() since the program will “panic!” if it does not get a result that it expected. But it’s a pretty handy way to get working code without error handling. I typically handle errors after I’ve gotten the bulk of what I want working.

fn read_csv_dfusion(csv_path: &str) {
    let ctx = SessionContext::new();
    let csv = ctx.read_csv(
        "sdf",
        CsvReadOptions::new()
    );

    let csv_result = csv.await.unwrap();
}

If we run cargo check in our terminal we will get the message:

error[E0728]: `await` is only allowed inside `async` functions and blocks

One way to get this to work would be to add async fn instead of fn but that isn’t supported by extendr since R is single threaded and doesn’t support async. So how do we get around this?

async with extendr and tokio

In order to run async functions we need to execute it in a runtime. tokio provides this for us with the Runtime struct. It lets us run impl Future<...> in a non async function!

We’ll modify our function definition to

fn read_csv_dfusion(csv_path: &str) {
    let rt = Runtime::new().unwrap();
    let ctx = SessionContext::new();
    let csv = ctx.read_csv(
        csv_path,
        CsvReadOptions::new()
    );
}

With the Runtime object rt we can call the block_on() method which takes a Future and runs it until it has completed. This means that we don’t get to use async functionality—e.g. executing 2 or more things at the same time—but we still get to take the result!

Let’s read the csv into an object called df using the block_on() method.

fn read_csv_dfusion(csv_path: &str) {
    let rt = Runtime::new().unwrap();
    let ctx = SessionContext::new();
    let csv = ctx.read_csv(
        csv_path,
        CsvReadOptions::new()
    );

    let df = rt.block_on(csv).unwrap();
}

The analyzer shows that this is a DataFrame. Awesome! Now, how can we get this into memory?

Sending DataFrames to R with arrow-extendr

This is where arrow-extendr comes into play. arrow-extendr provides a couple of traits which allow us to convert a number of arrow-rs types into an Robj.

See my post on Rust traits for R users

Tip

An Robj is extendr’s catch all for any type of object that can be returned to R

The IntoArrowRobj trait can convert a Vec<RecordBatch> into an Robj. The R documentation for a RecordBatch says

“A record batch is a collection of equal-length arrays matching a particular Schema. It is a table-like data structure that is semantically a sequence of fields, each a contiguous Arrow Array.”

Based on that, a Vec<RecordBatch> is a collection of chunks of a table-like data structures.

DataFrames have a method .collect() which creates a Vec<RecordBatch>.

Let’s modify our function to turn the DataFrame into a Vec<RecordBatch>.

Note

All things with DataFusion are done async so we need to wrap them in rt.block_on().

fn read_csv_dfusion(csv_path: &str) {
    let rt = Runtime::new().unwrap();
    let ctx = SessionContext::new();
    let csv = ctx.read_csv(
        csv_path,
        CsvReadOptions::new()
    );

    // create a dataframe from the csv
    let df = rt.block_on(csv).unwrap();

    // collect the results into record batches
    let res = rt.block_on(df.collect()).unwrap();
}

With this, we can send the results to R with the into_arrow_robj() method! First we need to add use arrow_extendr::to::IntoArrowRobj; to the top of our script to bring the trait into scope.

Then in our function we need to specify the return type as Robj (see the first line of the definition -> Robj) and then turn res into an Robj

fn read_csv_dfusion(csv_path: &str) -> Robj {
    let rt = Runtime::new().unwrap();
    let ctx = SessionContext::new();
    let csv = ctx.read_csv(
        csv_path,
        CsvReadOptions::new()
    );

    // create a dataframe from the csv
    let df = rt.block_on(csv).unwrap();

    // collect the results into record batches
    let res = rt.block_on(df.collect()).unwrap();

    res.into_arrow_robj().unwrap()
}

Handling arrow-rs from R

Let’s rebuild and document our function again.

I’ve added a csv of {palmerpenguins} to the inst/ folder of our package for testing. Let’ try reading this in.

res <- read_csv_dfusion("inst/penguins.csv")
res
#> <nanoarrow_array_stream struct<rowid: int64, species: string, island: string, bill_length_mm: string, bill_depth_mm: string, flipper_length_mm: string, body_mass_g: string, sex: string, year: int64>>
#>  $ get_schema:function ()  
#>  $ get_next  :function (schema = x$get_schema(), validate = TRUE)  
#>  $ release   :function ()  

Now, this doesn’t look very familiar to most R users. This is an object from the {nanoarrow} R package called "nanoarrow_array_stream". This is how data is received from Rust in R. We can process batches from this “stream” using the method get_next(). But there’s a handy as.data.frame() method for it.

Tip

This is a good time to note that you should add nanoarrow as a dependency of your package explicitly with usethis::use_package("nanoarrow").

res <- read_csv_dfusion("inst/penguins.csv")
as.data.frame(res) |> 
  head()
#>   rowid species    island bill_length_mm bill_depth_mm flipper_length_mm
#> 1     1  Adelie Torgersen           39.1          18.7               181
#> 2     2  Adelie Torgersen           39.5          17.4               186
#> 3     3  Adelie Torgersen           40.3            18               195
#> 4     4  Adelie Torgersen             NA            NA                NA
#> 5     5  Adelie Torgersen           36.7          19.3               193
#> 6     6  Adelie Torgersen           39.3          20.6               190
#>   body_mass_g    sex year
#> 1        3750   male 2007
#> 2        3800 female 2007
#> 3        3250 female 2007
#> 4          NA     NA 2007
#> 5        3450 female 2007
#> 6        3650   male 2007

Boom! We’ve written ourselves a reader! Let’s do a simple bench mark comparing it to readr.

library(dfusionrdr)
bench::mark(
  datafusion = as.data.frame(read_csv_dfusion("inst/penguins.csv")),
  readr = readr::read_csv("inst/penguins.csv"),
  check = FALSE
)
#> # A tibble: 2 × 6
#>   expression      min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr> <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 datafusion  922.3µs   1.04ms     915.     1.07MB      0  
#> 2 readr        14.6ms  15.31ms      65.6    13.2MB     80.8

Insanely fast!


Addendum

The source code for the entire package is below. It also includes a function read_sql_csv_dfusion() which takes a SQL statement and reads it into memory if you want to explore that. For example:

query <- 'SELECT count(*) as "n", "species" FROM csv GROUP BY "species"'

as.data.frame(
  read_sql_csv_dfusion(query, "inst/penguins.csv") 
)
#>     n   species
#> 1  68 Chinstrap
#> 2 152    Adelie
#> 3 124    Gentoo

Source code:

lib.rs
use arrow_extendr::to::IntoArrowRobj;
use extendr_api::prelude::*;
use datafusion::prelude::*;
use tokio::runtime::Runtime;
use std::result::Result;

#[extendr]
/// @export
fn read_csv_dfusion(csv_path: &str) -> Robj {
    let rt = Runtime::new().unwrap();
    let ctx = SessionContext::new();
    let csv = ctx.read_csv(
        csv_path,
        CsvReadOptions::new()
    );

    // create a dataframe from the csv
    let df = rt.block_on(csv).unwrap();

    // collect the results into record batches
    let res = rt.block_on(df.collect()).unwrap();

    res.into_arrow_robj().unwrap()
}

#[extendr]
fn read_sql_csv_dfusion(sql_query: &str, csv_path: &str) -> Result<Robj, Error> {
    let rt = Runtime::new().unwrap();
    let ctx = SessionContext::new();

    let csv_tbl_fut = ctx.register_csv(
        "csv",
        csv_path,
        CsvReadOptions::new(),
    );

    // // we dont assign it to anything because we're just ensuring that it gets ran
    let _ = rt.block_on(csv_tbl_fut).unwrap();

    // // create a plan
    let df = rt.block_on(ctx.sql(sql_query)).unwrap();

    // // collect into a vector of record batches
    let res = rt.block_on(df.collect()).unwrap();

    // return the result to R
    res.into_arrow_robj()
}

// Macro to generate exports.

// This ensures exported functions are registered with R.
// See corresponding C code in `entrypoint.c`.
extendr_module! {
    mod dfusionrdr;
    fn read_sql_csv_dfusion;
    fn read_csv_dfusion;
}
Cargo.toml
[package]
name = 'dfusionrdr'
publish = false
version = '0.1.0'
edition = '2021'

[lib]
crate-type = [ 'staticlib' ]
name = 'dfusionrdr'

[dependencies]
arrow_extendr = { git = "https://github.com/JosiahParry/arrow-extendr" }
datafusion = "33.0.0"
extendr-api = '*'
tokio = "1.34.0"