そろそろRユーザーもApache ArrowでParquetを使ってみませんか?

先日、Apache Arrow東京ミートアップ2019で「RとApache Arrow」というタイトルで発表してきました。あと、Japan.RでもApache ArrowについてLTしました。

話したこととしては、

  1. arrowパッケージを使うとParquetファイル(後述)の読み書きができる
  2. sparklyrパッケージが内部でApache Arrowを使うようになって、R↔Spark間のデータのやり取りが高速になった
  3. Arrow Flightがもっと一般的になれば、JDBCODBCを使わなくてもデータベースからデータを取ってこれるようになる

という感じで、個人的にいま強調したいのは1.です。とりあえずParquetファイルの読み書きというのがRユーザーにとって一番わかりやすいメリットなので、そこをきっかけにみんなApache Arrowにズブズブになって、もっと世の中のシステムがApache Arrowに対応しだしたときにスムーズにデータを交換したり共有したりできればよさそう、と思っています。

図にして表すとこんな感じです:

Apache Arrow依存の好循環の図

おわかりいただけたでしょうか。

では、Apache ArrowのインストールからParquetファイルの読み書きまでを簡単に説明します。

Apache Arrowコミッタの方がDatasetsの説明をされていて、Rの実装もすでにあったのでそちらも面白そうですが、まだ良く知らないのでこっちはまた別の機会にということで...)

Apache Arrowとは

Apache Arrowは「各種言語で使えるインメモリーデータ処理プラットフォーム」です。詳細は立ち入りませんが興味ある方はApache Arrowコミッタの須藤さんのこちらのスライドなんかをご参照ください。

Parquetとは

Apache Parquetは、データを保存する形式のひとつです。よくわからん、という人はとりあえず「すごいCSVファイル」みたいなものだと思ってもらえれば大丈夫です。

規格が定まっている(重要)

「すごいCSVファイル」みたいなもの、と無節操に書いてしまいましたが、CSVファイルはつらいです。例えば、

  • 規格が決まっていないのでソフト(R、PythonExcel等)による読み書きの微妙な違いに悩まされる
  • factorや時刻などのデータを正しく保存できない

といったつらみがあります。こうした現実を鑑み、上の図をつくった勢いでここで伝えたいことはひとつ、

CSVやめますか? それとも人間やめますか?

ということです。Parquetは規格が決まっているし、データの表現力も高いし、Apache Arrowでさまざまな言語に実装が提供されているわけなので、使わない理由がありません*1

列指向

また、Parquetは「列指向(columnar)」と呼ばれる種類のフォーマットです。 列指向のデータは、列方向にまとまっているので、

  • 必要な列だけを読んで、不要な列は読み込みをスキップする(pushdown)
  • 列方向には似たデータがあることが多いので圧縮がかけやすい

といった利点があります。圧縮に関しては実際のデータによるのでなんとも言えませんが、前者に関していえば、必要な列だけを選択して読めるので、CSVでは

read_csv("super_big_data.csv") %>%
  select(col1, col2)

とかやるとselect()にたどりつく前にメモリ不足でエラーになってしまうような巨大なデータでも、Parquetなら必要な列(この例で言えばcol1col2だけ)だけを読むことができます。やり方は後述。

速い

速度も実際のデータや環境によるのでなんとも言えませんが、arrow::read_parquet()data.table::fread()より速いというデータもあり、まあまあ安心して使える速さだと思います。

ここまでのまとめ

  • CSVをやめて人間を続けよう

インストール

Apache ArrowのRバインディングであるarrowパッケージはCRANでリリースされたので、

install.packages("arrow")

だけでインストールできます。Linuxの場合は、以下の指示に従ってあらかじめaptやyumでarrowライブラリをインストールしておく必要があります。

なお、Arch Linuxの場合は正解がよくわからなかったので誰か教えてください*2

ファイルの読み書き

では、さっそくParquetファイルを読み書きしてみましょう。

書き込み

書き込みはwrite_parquet()という関数です。

library(arrow)

tmp <- tempfile(fileext = ".parquet")

write_parquet(ggplot2::mpg, tmp)

ドキュメントを見るとオプションはいろいろあるんですけど(チャンクサイズとか圧縮レベルとか)、とりあえずそのへんは困るまでは気にしなくていいと思います。 ちなみに、書き込み先はいろいろあると思います。HDFSへの書き込み機能のチケットとかもあって夢が広がりますが、今のところ特に動きはなさそうなので一旦忘れましょう。

読み込み

読み込みはread_parquet()という関数です。

read_parquet(tmp)
#> # A tibble: 234 x 11
#>    manufacturer model    displ  year   cyl trans   drv     cty   hwy fl    class
#>    <chr>        <chr>    <dbl> <int> <int> <chr>   <chr> <int> <int> <chr> <chr>
#>  1 audi         a4         1.8  1999     4 auto(l… f        18    29 p     comp…
#>  2 audi         a4         1.8  1999     4 manual… f        21    29 p     comp…
#>  3 audi         a4         2    2008     4 manual… f        20    31 p     comp…
#>  4 audi         a4         2    2008     4 auto(a… f        21    30 p     comp…
#>  5 audi         a4         2.8  1999     6 auto(l… f        16    26 p     comp…
#>  6 audi         a4         2.8  1999     6 manual… f        18    26 p     comp…
#>  7 audi         a4         3.1  2008     6 auto(a… f        18    27 p     comp…
#>  8 audi         a4 quat…   1.8  1999     4 manual… 4        18    26 p     comp…
#>  9 audi         a4 quat…   1.8  1999     4 auto(l… 4        16    25 p     comp…
#> 10 audi         a4 quat…   2    2008     4 manual… 4        20    28 p     comp…
#> # … with 224 more rows

これもオプションはいろいろありますが、困るまでは忘れていいでしょう。と言いたいところなんですが、今のところ困るのであとでちょっと説明します...

必要な列だけ読む

さて、上で列指向の利点として

  • 必要な列だけを読んで、不要な列は読み込みをスキップする(pushdown)

というのを挙げましたが、具体的にどうやるかというと、こうです。

read_parquet(tmp, starts_with("c"))
#> # A tibble: 234 x 3
#>      cyl   cty class  
#>    <int> <int> <chr>  
#>  1     4    18 compact
#>  2     4    21 compact
#>  3     4    20 compact
#>  4     4    21 compact
#>  5     6    16 compact
#>  6     6    18 compact
#>  7     6    18 compact
#>  8     4    18 compact
#>  9     4    16 compact
#> 10     4    20 compact
#> # … with 224 more rows

第2引数にdplyr::select()と同じ記法で列の指定ができます。このコードは、

read_parquet(tmp) %>%
  select(starts_with("c"))

と等価ですが、重要なのはstarts_with("c")の列以外は実際には読み込まれない、ということです。 この例だとわかりにくいですが、もうちょっと重いデータを用意して確認してみましょう。

d <- lapply(1:100, function(...) 1:1e6)
names(d) <- paste0("col", 1:100)
write_parquet(as.data.frame(d), "tmp.parquet")

Rのセッションを再起動し、これを読み込む前後で使われたメモリを比較してみましょう。 全部読む場合は404MB程度が使われます。

before <- lobstr::mem_used()
d <- read_parquet("tmp.parquet")
lobstr::mem_used() - before
#> 404,669,976 B

一方、col1col10だけを指定して読めば使われるのは13MB程度だけです。

before <- lobstr::mem_used()
d <- read_parquet("tmp.parquet", c(col1, col10))
lobstr::mem_used() - before
#> 13,614,528 B

仮に、100倍のデータがあると想像してみましょう。前者は一時的にでもすべてのデータを読み込まないといけないので、 40GBのメモリがないとエラーになってしまいます。一方、後者は1.3GBで済みます。 このように、列がたくさんあるデータのうち一部だけを使う、みたいな場合は特にParquetを使う恩恵があるでしょう。

注意点

こんな偉そうにブログを書いておきながらまだ私もParquet初心者なんですが、軽く使ってみた中でいくつか注意点を見つけたので共有しておきます。 (何か私の勘違いとかがあれば優しく諭していただけると助かります)

POSIXct

タイムスタンプ型(POSIXct)はサポートされているものの、Parquetファイルに書き出すとタイムゾーンは保持されません。 具体的には以下のような挙動になります(デフォルトのタイムゾーンJSTの環境での動作)。

jst <- lubridate::now()
utc <- lubridate::with_tz(jst, "UTC")
d_orig <- tibble::tibble(jst = jst, utc = utc)

d_orig
#> # A tibble: 1 x 2
#>   jst                 utc                
#>   <dttm>              <dttm>             
#> 1 2019-12-09 19:52:23 2019-12-09 10:52:23

tmp <- tempfile()

write_parquet(d_orig, tmp)

d <- read_parquet(tmp)

d$jst
#> [1] "2019-12-09 19:52:23 JST"
d$utc
#> [1] "2019-12-09 19:52:23 JST"

タイムゾーンUTCだったutc列もJSTに変換されてしまっている、ように見えます。 ただし、これは「タイムゾーンJSTに変換された」のではなく、タイムゾーンが失われた結果、Rのprint()がデフォルトのタイムゾーンで時刻を表示してくれているだけです。 dput()を使うとtzoneが消えているのが分かると思います。

# 元のデータ
dput(d_orig$utc)
#> structure(1575888743.42941, class = c("POSIXct", "POSIXt"), tzone = "UTC")

# 変換後のデータ
dput(d$utc)
#> structure(1575888743.42941, class = c("POSIXct", "POSIXt"))

とはいえ、この結果でも分かるように、内部的な値(エポック秒)は変わらずに保存されています。 分析のコードがタイムゾーンを意識したものになっていれば問題ないでしょう。

(ちなみにこれは、ArrowのフォーマットにはタイムゾーンがあるけどParquetのフォーマットにはない、ということだと思うんですがぱっと読んだだけでは理解できなかった...)

factor / ordered

追記(2020/1/8):これはバグだったみたいです。直りました(参考:https://github.com/apache/arrow/pull/6135#issuecomment-571899715


factorはサポートされていますが、読むのにちょっと工夫が必要です。ふつうに何のオプションもなく実行すると文字列になってしまってギョッとします。

x <- factor("a", levels = c("b", "a", "C", "A"))
y <- factor("a", levels = c("b", "a", "C", "A"), ordered = TRUE)

d <- tibble::tibble(x = x, y = y)
d
#> # A tibble: 1 x 2
#>   x     y    
#>   <fct> <ord>
#> 1 a     a

tmp <- tempfile()

write_parquet(d, tmp)
read_parquet(tmp)
#> # A tibble: 1 x 2
#>   x     y    
#>   <chr> <chr>
#> 1 a     a

しかし、Parquetファイルの中にはちゃんとfactorの水準まで保存されています。 ドキュメントを見つけられなかったので正しい方法はわからないんですが、ParquetReaderPropertiesというオブジェクトでParquetを読むときの挙動をいろいろ指定できるみたいで、 set_read_dictionary(列番号, TRUE)とやるとfactor型として読んでくれました。ただ、これはちょっと生のAPIすぎるので(列番号が0から始まるところとか)、もうちょっと簡単なオプションができてほしい気もします。

prop <- ParquetReaderProperties$create()
prop$set_read_dictionary(0, TRUE)
prop$set_read_dictionary(1, TRUE)

read_parquet(tmp, props = prop)
#> # A tibble: 1 x 2
#>   x     y    
#>   <fct> <fct>
#> 1 a     a

ちなみにorderedは復元できませんでした。 これはas_data_frame = FALSEにするとorderedというそれっぽい型が見えるので、Parquetのリーダーの問題ではなくArrow → R間の変換の問題なのかもしれません。

read_parquet(tmp, as_data_frame = FALSE, props = prop)
#> Table
#> 1 rows x 2 columns
#> $x <dictionary<values=string, indices=int32>>
#> $y <dictionary<values=string, indices=int32, ordered=0>>
list

list型の列があるdata.frameはまだ読み書きできません。これはそのうちできるようになるのを期待...

arrow::write_parquet(dplyr::starwars, "tmp.parquet")
#> Error in Table__from_dots(dots, schema): cannot infer type from data

まとめ

  • CSVをやめて人間を続けよう

*1:ただしExcel...

*2:AURにそれっぽいのがあるんですが、snappy-staticとかがコンフリクトしてVSCodeが動かなくなりました。。