株式会社ホクソエムのブログ

R, Python, データ分析, 機械学習

Python + AsyncSSH によるお手軽非同期SSH接続

1. はじめに

ホクソエムサポーターの藤岡です。普段はデータ分析会社で大規模データを抽出・加工する仕事をしています。

Python 3.4以降、asyncioが導入されたことで非同期処理の実装が簡単にできるようになりました。非同期処理を活用すると、大量のテキストを読み込んだり、通信のレスポンスを待つ時間に他の処理を行うことができるようになります。

ここでは、asyncioをベースに作成されたPythonライブラリ AsyncSSH を使って、簡単に非同期のSSH通信を実現する方法を紹介したいと思います。

AsyncSSHはSSH通信に必要な多くの機能を備えており、リファレンスを読むと色々と目移りしてしまうのですが、実際はある程度の処理ならお手軽に実装できるように作られています。なのでconnectコルーチンとSSHClientConnectionクラスのいくつかのメソッドだけを使って実現できる内容をここで紹介しようと思い立ち、本記事を執筆しました。

なお、ここに書いてある内容はあくまで自分の理解に基づいたものであり、誤った内容が含まれている可能性があります。もしお気付きの際には、ご指摘いただければ幸いです。

2. 導入

Python 3.4以上の入った環境で以下のコマンドを実行します。

$ pip install asyncssh

なお、別のパッケージをインストールすることでいくつかの拡張機能を使うことができるようになりますが、今回紹介する内容に限ればいずれの機能も不要なので省略します。詳しくはこちらをご覧ください。

3. 解説に入る前に

本記事は、以下の環境で検証しながら執筆しました。

  • Python 3.6.8
  • asyncssh 1.18.0

本記事中のサンプルコードは公式のサンプルコードを参考に作成しているため、それらと同様に以下の構文で成り立っています。

import asyncio, asyncssh, sys, getpass

async def run_client():
    << 接続処理(4章の内容) >>
    << ホスト上での処理(5章の内容) >>

try:
    asyncio.get_event_loop().run_until_complete(run_client())
except (OSError, asyncssh.Error) as exc:
    sys.exit('SSH connection failed: ' + str(exc))

各節では、冗長な内容を省いて最小限のコードのみを掲載しておりますが、 それらのコードをこのテンプレートに当てはめると実行できるようになっております。例えば、4.1節5.1節の内容を組み合わせて、パスワード認証 + lsコマンド実行のプログラムを作ると、以下のようになります。

import asyncio, asyncssh, sys, getpass

async def run_client():
    # 接続処理(4節の内容)
    pw = getpass.getpass()
    async with asyncssh.connect("localhost", password=pw) as conn: 
        # ホスト上で実行したい処理(5節の内容)
        conn.run("ls /path/to/the/folder")             

try:
    asyncio.get_event_loop().run_until_complete(run_client())
except (OSError, asyncssh.Error) as exc:
    sys.exit('SSH connection failed: ' + str(exc))

また、テスト用の環境として、Dockerを使ってSSHサーバコンテナを立てると安全に試すことができます。sshdサーバのイメージはdocker公式からこちらで提供されています。ただし、このイメージだとbcコマンドを使うサンプルと多段接続のサンプルが試せないのでご注意ください。

全てのサンプルを動かしたい場合は、こちらに筆者が使用したテスト環境を掲載しておりますので適宜参考にしてください。

4. 接続処理: connect()

シェル上でsshコマンドを叩くだけでホストへのSSH接続ができるのと同様に、asyncsshでもconnect()コルーチンを呼び出すだけでSSH接続が実現できます。async with句を使って通信の切断を内部的に処理させるために、基本的には以下の構文で呼び出します。

async with asyncssh.connect(*args, **kwargs) as conn:
    ...

例えば、

  • user: root
  • host: localhost
  • port: 2222

という設定で接続する場合、

async with asyncssh.connect("localhost", 2222, username="root") as conn:
    ...

とすればOKです。

なお、connect()の引数はhost, port以外全てkeyword-only引数なので注意してください。

このように、接続方法や認証方法などはすべてconnect()の引数で設定します。その種類は多岐にわたるのですが、本章ではその代表的なものを紹介していきます。

4.1 パスワード認証

パスワード認証は、パスワードの文字列をpassword引数に与えるだけで実現できます。

例えば、標準出力からパスワードを受け取って接続する場合は、以下のコードで実現できます。

pw = getpass.getpass()
async with asyncssh.connect("localhost", password=pw) as conn:
    ...

4.2 多段接続

connect()を使って接続する際に、tunnel引数に別の接続オブジェクトを渡すことでその接続を踏み台にすることができます。

例えば、

  1. ローカルからlocalhostに接続
  2. localhostから172.22.0.3に接続

ということを実現する場合、以下のようなコードになります。

async with asyncssh.connect('localhost') as tunnel_conn: #手順1
    async with asyncssh.connect('172.22.0.3', tunnel=tunnel_conn) as conn: #手順2
        ...

4.3 SSH Agent

asyncsshでは、デフォルトでSSH Agentから秘密鍵を取得する設定になっています。読み込むAgentは環境変数SSH_AUTH_SOCKに設定されているパスによって決定されます。この環境変数を使わずにAgentを指定する場合には、agent_path引数でagentのパスを指定します。Agentを使わない場合にはagent_path=Noneとしましょう。

また、agent_forwarding引数をTrueに設定することで、Agent Forwarding (SSH Agentの情報をホストへと転送すること) も可能です。

4.4 known_hosts

SSHで新しい接続先に接続する際には確認が行われますが、テスト時などにはこの機能が邪魔になることがあります。そんなときには、引数known_hostsNoneに指定することで、このステップをスキップすることができます。

ただし、この状態は中間者攻撃の被害に気づきにくくなるリスクがあるため、特に理由がなければNoneに指定しないようにしましょう。また、この引数を使って読み込むknown_hostsファイルのパスを指定することもできます。デフォルトでは~/.ssh/known_hostsに設定されています。

4.5 その他

ここまで紹介した以外にも、connect()の引数で設定できる項目は多く存在します。それらを調べる際に注意していただきたいのが、これらの引数はconnectのページだけでなくSSHClientConnectionOptionsのページにも記載されていることです。

なお、上記のように実装上は分けられているのですが、引数として渡す際にはどちらもconnect()のキーワード引数として与えることができます。

5. ホスト上での処理: SSHClientConnection

SSHClientConnectionとは、connect()で返される接続オブジェクトconnのクラスです。接続オブジェクトから種々のメソッドを呼び出すことで、接続先でコマンドを実行したりシグナルを送ったりできます。本章では、このクラスのメソッドを通じて、ホスト上で種々の処理を実現する方法を紹介します。

ここで紹介する3つのメソッドのうちconn.runconn.create_processはどちらもsubprocessモジュールと近いインターフェースで実装されているので、その辺りに慣れていれば使いやすいと思います。なお、conn.runsubprocess.run に似ていて、conn.create_processsubprocess.Popenに似ています。

5.1 コマンドの実行

コマンドを引数にとってconn.run()コルーチンを呼び出すだけです。例えば、lsコマンドを呼び出す場合には以下のように呼び出します。

await conn.run("ls /path/to/the/folder")

引数に渡すコマンドはトークンのリストでは無く単一の文字列なので注意が必要です。subprocess.run()同様にリストを渡したい場合には、subprocess.list2cmdline()を使って結合してから渡すのが良いでしょう。

一つの接続に対して複数回呼び出すこともできます。その場合、それぞれの呼び出しに対してホスト側でプロセスが発行されます。

subprocess.run()はプロセス実行結果オブジェクトSSHCompletedProcess を返します。このオブジェクトは実行結果の種々のパラメータをプロパティとして保持しています。各プロパティはこちらにリストアップされていますが、よく使うものを以下を列挙しておきます。

  • stdout: 標準出力
  • stderr: 標準エラー出力
  • exit_status: 終了ステータス (exit signalを受け取った場合は-1が入る)
  • returncode: 終了ステータス (exit signalを受け取った場合はその番号(負の数)が入る)

5.2. プロセス生成: SSHClientConnection.create_process()

インタラクティブなコマンドを実行する場合には、一つのプロセスとやり取りをする必要があるので、conn.run()ではなく、conn.create_process()を使います。例えば、以下はサンプルコードに載っている、対話的な処理の一部です。

async with conn.create_process('bc') as process:
    for op in ['2+2', '1*2*3*4', '2^32']:
        process.stdin.write(op + '\n')
        result = await process.stdout.readline()
        print(op, '=', result, end='')

1行目でbcコマンドを対話モードで呼び出し、そのプロセスオブジェクトprocessを受け取っています。次に、3行目でstdin.writeメソッドを使って数式を書き込み、4行目でreadlineコルーチンを使って計算結果を受け取っています。これはsubprocess.Popenstdin, stdout引数にsubprocess.PIPEを設定した場合と同様です。

command引数を省略すると、コマンドの代わりにホスト側のシェルが呼び出されます。例えば、上の例をシェルから実行すると、以下の通りです。

async with conn.create_process() as process:
    for op in ['2+2', '1*2*3*4', '2^32']:
        cmd = "echo {} | bc\n".format(op)
        process.stdin.write(cmd)
        result = await process.stdout.readline()
        print(op, '=', result, end='')

シェルを呼ぶ場合もコマンドを呼ぶ場合も、実行したい区切りで改行記号を入れることを忘れないようにしましょう。

5.3 I/Oのリダイレクト

ホスト側のプロセスのI/Oをローカルのプロセスや別のSSHClientProcessにリダイレクトすることができます。指定方法はsubprocess.Popen()とほぼ同じです。つまり、stdinstdoutstderr引数にそれぞれ特定の定数(e.g. asyncssh.PIPE)、もしくはファイルオブジェクトを与えます。

まず、これらの引数のデフォルト値は全てasyncssh.PIPEに設定されています。この設定では、ホスト側のプロセスの当該標準ストリームを対応するプロパティを通じて読み書きできます。前節でプロセスの標準ストリームがprocess.stdin, process.stdoutを通じて読み書き可能だったのは、デフォルト値がこのように設定されていたためです。

他には、asyncssh.STDOUTという定数があります。これをstderr引数に渡すことで、stderrに渡される出力をstdoutに流すこと(つまり、シェルでの2>&1)が可能です。asyncssh.STDERR定数をstdout引数に渡して、逆のこともできます。

それ以外のファイルオブジェクトに読み書きをさせたい場合には、そのファイルオブジェクトを直接渡します。例えば、以下のコードは、bcの対話プロセスをホストで開いて、それをローカルから操作する例です。

async with conn.create_process("bc", stdin=sys.stdin, stdout=sys.stdout) as process:
    await process.wait()

以下、上記のコードをテンプレートに当てはめたプログラム(remote_bc.py)の実行例です。

$ python remote_bc.py 
1+1
2
2+3
5
quit

1行目でローカルのstdin/stdoutをホストのstdin/stdoutへとリダイレクトするように設定し、2行目でホストのプロセスが終了するまで待機します。

ただし、stdin, stdout, stderr引数に渡したファイルオブジェクトはホスト側のプロセス終了時に強制的にcloseされます。例えば、上記コードだとstdoutが閉じられるのでprint関数が呼び出せなくなります。なので、リダイレクトするファイルオブジェクトは、閉じても構わないものにして、それ以外の場合はasyncssh.PIPEを指定しておくのがいいかと思います。

5.4 Send Signal

ホスト側に中断信号(SIG_INT)のようなシグナルを送る場合には conn.send_signalメソッドを使います。

async with conn.create_process() as process:
    process.send_signal("INT")
    await process.wait()

process.send_signal()は信号を送るだけなので、実際に信号が受け取られて処理されるまでprocess.wait()で待機します。SIG_TERMとSIG_KILLについてはそれぞれprocess.terminate(), process.kill()と関数が用意されているので、そちらを使うのが楽です。他の信号は全てsignalライブラリにあるものに対応していて、指定方法はSIG_以下の文字列(SIG_INTなら"INT")です。

ただし、send_signal()はホスト側の環境によっては動かないので注意が必要です。自分の知る限りでは、OpenSSHについてはv7.9p1以降でないと動きません。詳しくはこちらのissueを参照してください。どうしてもシグナルを送りたい場合の実現方法も載っています。

6. MISC

ここでは、本稿で扱いたい内容とは少しずれるものの重要な内容をさっくりと紹介します。いずれも、もし機会と需要があればしっかりと勉強して記事を書きたい内容です。

6.1 SSHサーバ

ここで扱った内容は全てSSHクライアントの話ですが、AsyncSSHではSSHサーバを実装することもできます。クライアントだけで実現できることには限界があるので、サーバも自前で実装すればできることの可能性が大きく広がります。

6.2 ssh_config

接続先の設定は基本的にssh_configファイル(e.g. ~/.ssh/config)に書きますが、 AsyncSSHにはこのファイルをパースする機能はありません。 ssh_configファイルから接続先の情報を取得したい場合は、Paramikoのパーサ を利用するのがいいかと思います。

6.3 コールバック

connect()を呼び出す際に、SSHClientもしくはその派生クラスをclient_factory引数に渡すことで、接続オブジェクトの挙動を変更できます。あるいは、conn.create_connection()の代わりにconn.create_sessionを使い、SSHClientProcessの派生クラスをそのclient_factory引数に渡すことで、プロセスの挙動を変更できます。

特に、接続完了時、通信遮断時など種々のフェーズで呼び出されるコールバックの挙動を弄れば、例えば通信を簡単にロギングやデバッグできます。

7. おわりに

PythonでSSHといえばParamiko一強なイメージもありますが、こちらのライブラリも非同期処理ができるという独自の強みを持っています。また、とても簡単に扱えるのでasyncioの事始めとしてもいい教材だと思います。なので、この機会にぜひ一度触ってみてはいかがでしょうか。

8. おまけ: テスト環境の構築例

上記サンプルコードを試すために筆者が使用した環境を簡単に紹介します。ユーザ、パスワードはそれぞれrootです。entranceからinternalへのipアドレスの確認方法はこちらの記事が参考になると思います。

8.1 環境

  • Docker 18.09.2
  • docker-compose 1.24.1

    8.2 ディレクトリ構成

test_server/
 ├── docker-compose.yml
 └── Dockerfile

8.2.1 docker-compose.yml

version: '3'
services:
  entrance:
    build: .
    ports:
      - "2222:22"
    networks:
      - sshtest
  internal:
    build: .
    networks:
      - sshtest
networks:
  sshtest:

8.2.2 Dockerfile

FROM rastasheep/ubuntu-sshd

RUN apt-get update && apt-get install -y \
    bc \
 && apt-get clean \
 && rm -rf /var/lib/apt/lists/*

8.3 起動方法

$ docker-compose up -d

8.4 終了方法

$ docker-compose down

9. References

Effective Python ―Pythonプログラムを改良する59項目

Effective Python ―Pythonプログラムを改良する59項目

スパコン上でRを並列実行する方法

アカデミアの大規模超並列クラスタ型スーパーコンピュータ上で、Rを並列実行する際の備忘録です。あまりスパコンに詳しくないので、用語を間違っていたら教えてください。僕が使っているスパコンは富士通のサーバでXeon Scalableプロセッサを積んでて、Intelのコンパイラがインストールされてて、pjsubコマンドでjobをsubmitします。

並列実行するために、試した方法は次の3種類です。順に説明します。

  1. バルクジョブを使う方法
  2. Rのパッケージを利用してMPIで実行する方法
  3. インテルのMPIライブラリのコマンドラインを使う方法

バルクジョブを使う方法

まず実行したいRファイル(calc.R)は以下のようなものとします。

.libPaths(c('/path/to/my_R_lib', .libPaths()))
library(rstan)

args <- commandArgs(trailingOnly=TRUE)
paraID <- as.integer(args[1])
f_in <- sprintf('input/param.%d.csv', paraID)
f_out <- sprintf('output/result.%d.RData', paraID)
# ... resultの計算 ...
save(result, file=f_out)
  • 1行目: Rのパッケージをユーザ権限で/home/以下などにインストールした場合には、このようにそのRパッケージへのパスを.libPaths()関数で加えておく必要があります。
  • 4行目: コマンドラインの引数をcommandArgs関数で受け取ります。
  • 5~7, 9行目: 引数の使用例です。数値の文字列(10とか)を渡して、その数値に対応するファイルを読み込み、計算結果を格納しています。

次にRコードをキックするためのバッチファイルを作ります。

#!/bin/sh

#PJM -g my_proj_name
#PJM -L rscgrp=my_resource
#PJM -L node=1
#PJM -L elapse=01:00:00
#PJM -j
#PJM -X

module load R/3.6.0
Rscript --vanilla /my_dir/calc.R $PJM_BULKNUM
  • 3~8, 10行目: この部分はsubmitするときに指定するオプションとRのPATHの読み込みで、必要かどうかはスパコンの環境に依存します。説明しません。
  • 11行目: ここで/my_dir/calc.Rを実行します。$PJM_BULKNUMpjsubで実行するときに渡されます。この値が先ほどのRコードのargs[1]になります。

最後にjobをsubmitするときのコマンドです。バルクジョブで実行します。

pjsub --bulk --sparam 1-50 run.sh

実行すると150の数値が一つずつ$PJM_BULKNUMに渡されて、サブジョブとしてRが実行されます。

しかしこの方法では、サブジョブ1つ1つが1ノードに割り振られるという扱いになるため、スパコンの使用チケットがノード×(サブ)ジョブ×計算時間の合計で定められている場合には、非常に速いスピードでチケットを消費してしまいます。この状況を打破するためには、MPIを使って1ジョブでマルチプロセスを利用できると良いです。

Rのパッケージを利用してMPIで実行する方法

手っ取り早いのはMPIを利用するRパッケージを利用することです。{foreach}, {doParallel}, {snow}, {Rmpi}パッケージをインストールする必要があります。その後、以下のようなRコードを準備します。

.libPaths(c('/path/to/my_R_lib', .libPaths()))
library(foreach)
library(doParallel)
library(snow)
library(Rmpi)

args <- commandArgs(trailingOnly=TRUE)
paraID_vec <- seq(from=as.integer(args[1]), to=as.integer(args[2]))

cl <- makeCluster(50, type='MPI')
registerDoParallel(cl)

memo <- foreach(paraID=paraID_vec, .packages='rstan', .export=ls(envir = globalenv())) %dopar% {
  f_in <- sprintf('input/param.%d.csv', paraID)
  f_out <- sprintf('output_mpi/result.%d.RData', paraID)
  # ... resultの計算 ...
  save(result, file=f_out)
}

stopImplicitCluster()
  • 7~8行目:ここでは並列実行したい数値のはじまりと終わりをargs変数に渡して、vectorを作っています。
  • 10~11行目:ここでMPIで実行するための準備をしています。50プロセスを使う指定をしています。{Rmpi}パッケージがないとmakeCluster関数でtype='MPI'引数が使えません。
  • 13行目:あとはいつものforeach関数と%dopar%演算子で並列実行します。

次にRコードをキックするためのバッチファイルを作ります。

#!/bin/sh

#PJM -g my_proj_name
#PJM -L rscgrp=my_resource
#PJM -L node=1
#PJM -L elapse=01:00:00
#PJM --mpi proc=50
#PJM -j
#PJM -X

module load R/3.6.0
Rscript --vanilla /my_dir/calc-with-Rmpi.R 1 50
  • 7行目: ここでmpi50プロセス使う指定をしています。Rコード内のプロセス数と合わせる必要があります。
  • 12行目: 上のRコードを実行します。

しかし、この方法はRパッケージ、特に{Rmpi}のインストールが環境によっては難しいという欠点があります。僕も失敗しました。install.packages関数のconfigure.argsオプションを使って、用意されているMPIのincludeディレクトリやlibpathを指定したのですが、エラーを完全に取り除くことができませんでした。だから上のコードも未テストでうまくいくか分かりません。ごめんなさい。

MPIライブラリのコマンドラインを使う方法

triadsouさんに教えてもらいました(URL)。ありがとうございました!

僕のスパコンの環境ではmpirunに相当するコマンドラインがIntelのmpiexec.hydraでした。そのため、以下のようなバッチファイルになります。

#!/bin/sh

#PJM -g my_proj_name
#PJM -L rscgrp=my_resource
#PJM -L node=1
#PJM -L elapse=01:00:00
#PJM --mpi proc=50
#PJM -j
#PJM -X

module load R/3.6.0
mpiexec.hydra \
-n 1 /path/to/bin/Rscript --vanilla /my_dir/calc.R 1 : \
-n 1 /path/to/bin/Rscript --vanilla /my_dir/calc.R 2 : \
-n 1 /path/to/bin/Rscript --vanilla /my_dir/calc.R 3 : \
# ... 50まで ... コマンド書くのが面倒なので別のプログラムで自動生成するとよい
-n 1 /path/to/bin/Rscript --vanilla /my_dir/calc.R 49 : \
-n 1 /path/to/bin/Rscript --vanilla /my_dir/calc.R 50 :
  • 12行目:mpiexec.hydraコマンドを使います。
  • 13~18行目: -n 1は1プロセス分使うという意味です。これを50プロセス分だけコロンでつなげて書きます。なお僕の環境ではRscriptを絶対パスにしないと動きませんでした。

この方法で、無事MPIで50プロセスを使って並列実行することができました。

Enjoy!

Rでのナウなデータ分割のやり方: rsampleパッケージによる交差検証

前処理大全の「分割」の章では、予測モデルの評価のためのデータセット分割方法が解説されています。基礎から時系列データへ適用する際の注意まで説明されているだけでなく、awesomeなコードの例がRおよびPythonで書かれており、実践的な側面もあります(お手元にぜひ!)。

しかし今回は、Awesome例とは異なる、より新しいやり方で・簡単にRでのデータ分割を行う方法を紹介したいと思います。前処理大全でも取り上げられているcaretパッケージですが、その開発者のMax Kuhnが開発するパッケージの中に rsample を使う方法です。ここでは前処理大全で書かれている一般的なデータと時系列データの交差検証による分割をrsampleの使い方を紹介しながらやっていきます。加えて、rsampleの層化サンプリングについても最後に触れます。

  • 1. レコードデータにおけるモデル検証用のデータ分割
    • zeallotによる代入
  • 2. 時系列データにおけるモデル検証用のデータ分割
  • おまけ: 層化抽出法
続きを読む

ggplot2 で facet ごとのヒストグラムに平均値の線を引く

ggplot2 で facet ごとのヒストグラムに平均値の線を引きたい。

例えば次のような感じ。

f:id:hoxo_m:20181108192139p:plain

Rコミュニティの Slack である r-wakalang で聞いたところ、即回答がもらえただけでなく、いろいろなやり方を教わったのでメモしておく。 みなさんありがとう。

基本作図

まずはデータを準備する。 ここでは有名な iris データから数値カラム x と facet を作るためのカテゴリカル変数 group を持つデータフレームを作成する。

data(iris) # iris データの読み込み(通常は必要ない)

library(dplyr)

# データの作成
data <- iris %>% select(x = Sepal.Length, group = Species)
# データ内容の確認
head(data)
    x  group
1 5.1 setosa
2 4.9 setosa
3 4.7 setosa
4 4.6 setosa
5 5.0 setosa
6 5.4 setosa

このデータに対して、グループごとにヒストグラムを作成する。

library(ggplot2)
ggplot(data, aes(x)) + 
    geom_histogram() +
    facet_wrap(~group, ncol=1)

f:id:hoxo_m:20181108192206p:plain

この3つのヒストグラムに対して、それぞれのグループの平均値を表す垂直線を追加したいというのが今回の目的。

facet ごとの集計値の扱い

ggplot2 で facet ごとに集計値を扱う場合は、別データとして集計済みのデータを用意する。

例えば今回は平均値の垂直線を引きたいので、次のように平均値を計算したデータフレームを用意する。

data_summary <- data %>%
    group_by(group) %>%
    summarise(mean_x = mean(x))
data_summary
  group      mean_x
1 setosa       5.01
2 versicolor   5.94
3 virginica    6.59

この集計データを用いて平均値をグラフに書き込むことができる(ura_hoxom さん)。

ggplot(data, aes(x)) + 
    geom_histogram() +
    facet_wrap(~group, ncol=1) +
    geom_vline(data = data_summary, aes(xintercept = mean_x), color = "red")

f:id:hoxo_m:20181108192139p:plain

目的は達成された。

他の方法

他の方法も教えてもらった。 ggplot2 では集計データを用意しなくても、集計用の関数を渡すと自動的にデータ集計してくれるらしい。

例えば、次のようにグループごとに平均値を集計する関数を用意する。

mean_by_group = function(d) {
    d %>% group_by(group) %>% summarize(mean_x = mean(x))
}

集計済みデータの代わりに、これを ggplot2 に渡すことができる(heavywatal さん)。

ggplot(data, aes(x)) + 
    geom_histogram() +
    facet_wrap(~group, ncol=1) +
    geom_vline(data = mean_by_group, aes(xintercept = mean_x), color = "red")

f:id:hoxo_m:20181108192139p:plain

また、集計する関数については次のように簡潔に作成できる(yutannihilation さん)*1

mean_by_group = . %>% group_by(group) %>% summarize(mean_x = mean(x))

まとめ

というわけで、みなさんいろいろ教えてくださってありがとうございました。

R-wakalang への参加の仕方については次の資料を参考にしてください。

Enjoy!

*1:これは magrittr の機能らしい。https://github.com/tidyverse/magrittr#building-unary-functions 参照。

ggplot2 で時系列プロットの端点にラベルを表示する

こういう感じで時系列プロットの端点にラベルを表示したい。

f:id:hoxo_m:20180915223055p:plain

この方が時系列とラベルの対応がわかりやすくて良い。

ggrepel パッケージを使うと簡単にできるが、いくつか注意点があるのでここで紹介したい。

まずはデータを用意する。

library(tidyverse)

set.seed(314)
d <- map_dfr(
  c("setosa", "versicolor", "virginica"),
  ~ tibble(
    idx = 1:400,
    value = cumsum(runif(400, -1, 1)),
    type = .
  )
)

ggplot(d, aes(idx, value, colour = type)) + 
  geom_line()

f:id:hoxo_m:20180915223619p:plain

普通にプロットするとこんな感じだが、右端にラベルをつけたい。

ggrepel はいい感じにラベルを配置してくれるパッケージである。

まずはラベルとなるカラムを用意する。 右端にだけラベルをつけたいので ids が最大となる点にだけ type をラベルとして代入し、その他は NA を入れる。

library(ggrepel)

d <- d %>% mutate(label = ifelse(idx == max(idx), type, NA))

ggplot(d, aes(idx, value, colour = type)) +
  geom_line() + 
  geom_label_repel(aes(label = label), na.rm = TRUE)

f:id:hoxo_m:20180915232256p:plain

デフォルトのままだと折れ線にラベルが重なってしまう。 ラベルを右側に出すためには x 軸を右側に広げて nudge_x = Inf を指定すると良い。

ggplot(d, aes(idx, value, colour = type)) +
  geom_line() + 
  geom_label_repel(aes(label = label), na.rm = TRUE, nudge_x = Inf) +
  xlim(NA, 500)

f:id:hoxo_m:20180915232323p:plain

この方法だとラベルはプロットの右端に張り付く。 時系列のすぐ右に出したい場合は、xlim 引数に時系列の端点を指定すると良い。

ggplot(d, aes(idx, value, colour = type)) +
  geom_line() + 
  geom_label_repel(aes(label = label), na.rm = TRUE, xlim = c(400, NA)) + 
  xlim(NA, 500)

f:id:hoxo_m:20180915232359p:plain

謝辞

上記は r-wakalang の Slack 上で @u_ribo さんと @yutannihilation さんに教えていただきました。 ありがとうございます。

参考

RユーザのためのRStudio[実践]入門−tidyverseによるモダンな分析フローの世界−

RユーザのためのRStudio[実践]入門−tidyverseによるモダンな分析フローの世界−

データ分析のワークフローをdrakeで管理して効率的に作業を進めよう

要約

  • drakeパッケージは、GNU makeのようにあらかじめ定義されたワークフローを自動的に実施する仕組みを、Rユーザに馴染みやすいデータフレーム形式で提供する
  • ワークフローの構築と管理、実行はRの関数として提供され、依存関係を可視化する関数も用意される
    • drakeパッケージを使うことで、データ分析でありがちな「再実行」の負担(再計算、コードの保守)を軽減することが可能となる
    • 各オブジェクトは自動的にキャッシュされ、コードや依存関係に変更のない場合はキャッシュが利用される
    • ワークフローの各処理の状況、依存関係を可視化する関数も用意され、ワークフロー管理が容易になる

f:id:u_ribo:20180905064443j:plain

  • 要約
  • はじめに
    • シーシュポスの岩
    • 既存の解決策
  • drake: Rユーザのためのワークフロー処理パッケージ
    • ワークフロー管理の基礎
    • ワークフローと依存関係の可視化
    • ワークフローの変更
  • 参考URL

はじめに

データ分析の作業は、試行錯誤を繰り返して進めて行くのが一般的です。

Garrett GrolemundとHadley Wickhamの著書 “R for Data Science” (「Rではじめるデータサイエンス」)では、Rを使ったデータ分析作業の工程を以下の図のように整理しています (FontAwesomeのフォントを使って編集しています)。

f:id:u_ribo:20180831061703p:plain

この図の中で、データ整形と可視化、そしてモデリングの工程がひとまとまりになっており、これらの工程を経て分析結果を伝えるための作業に入ります。

データ整形と可視化、モデリングは互いに影響しあう関係にあり、繰り返し実行されることを想定しています。繰り返しの例としてはモデルに用いる変数のアップデートがあります。その際、モデルを実行するコードを修正するだけでなく、それに関連する前後のプロセス、すなわちモデルデータの生成とモデルの結果を利用するプログラムについても見直す必要があるでしょう。

こうした「繰り返し」や「やり直し」の作業は、みなさんご存知の通り、時間がかかる骨の折れる仕事で、退屈なものです。場合によっては、ほとんど1からコードを書き直すなんて事もあるかもしれません。間に実行時間の長い処理が途中に入る時も、処理が終わるまで待つ必要があります。また、変更箇所に見落としがあるとフローは流れなくなってしまいます。変更が多ければ多いほど、人為的な誤りも犯しやすくなる危険があります。

続きを読む

【2018年版】R でハッシュテーブルの速度比較

以前、こういう記事を書いた。

Rでハッシュテーブルを使う方法はいくつかあるが、

  • サイズが 1000 以下ならば名前付きベクトルが速い
  • それ以上なら環境を使った方法が速い
  • hash パッケージは遅いが記述がわかりやすい

ということがわかった。

今回はさらに高速なハッシュテーブルを実装するパッケージを3つ見つけたのでこれらを加えて比較してみる。

使用するデータは前回と同じく次のコードで生成した。

generate_random_labels <- function(num, length = 10) {
  apply(Vectorize(sample, "size")(letters, rep(num, length), replace = TRUE), 1, function(row) paste(row, collapse = ""))
}
generate_random_values <- function(labels, digits = 1) {
  format <- paste0("%s%0", digits, "d")
  sprintf(format, labels, Vectorize(sample, "size")(seq_len(10^digits-1), rep(1,length(labels))))
}

N <- 1000
labels <- generate_random_labels(N)
values <- generate_random_values(labels)
df <- data.frame(labels, values, stringsAsFactors = FALSE)

target <- sample(labels, 5)
target
#> [1] "eguiqhmukh" "yuvlnhfvba" "cjbcajffur" "nttsfcuszh" "ibvgfrjnro"

1. 名前付きベクトル

名前付きベクトルは最もシンプルなハッシュテーブルの実装である。

named_values <- values
names(named_values) <-  labels
named_values[target]
#>    eguiqhmukh    yuvlnhfvba    cjbcajffur    nttsfcuszh    ibvgfrjnro 
#> "eguiqhmukh2" "yuvlnhfvba2" "cjbcajffur1" "nttsfcuszh8" "ibvgfrjnro6" 

2. 環境

R言語徹底解説』第8章 (p.160) には、R における環境 (environment) の特別な使用方法としてハッシュマップとして使えると書いてある。 前回の記事ではやや複雑に書いたが、list2env(setNames()) イディオムを使うとやや簡潔に書ける。

hash_env <- list2env(setNames(as.list(values), labels), hash = TRUE)
unlist(mget(target, envir = hash_env))
#>    eguiqhmukh    yuvlnhfvba    cjbcajffur    nttsfcuszh    ibvgfrjnro 
#> "eguiqhmukh2" "yuvlnhfvba2" "cjbcajffur1" "nttsfcuszh8" "ibvgfrjnro6" 

3. hash パッケージ

hash パッケージを使うと直感的で簡潔な記法でハッシュテーブルを扱える。 結果の順序が保証されないことには注意が必要である。

library(hash)
h <- hash(labels, values)
values(h[target])
#>    cjbcajffur    eguiqhmukh    ibvgfrjnro    nttsfcuszh    yuvlnhfvba 
#> "cjbcajffur1" "eguiqhmukh2" "ibvgfrjnro6" "nttsfcuszh8" "yuvlnhfvba2" 

4. collections パッケージ

collections は高速なコンテナデータ型を提供するパッケージであり、ハッシュテーブルは辞書型 Dict として提供される。 メソッドはベクトル化されていないことに注意が必要である。

library(collections)
d <- Dict$new()
invisible(Vectorize(d$set)(labels, values))
Vectorize(d$get)(target)
#>    eguiqhmukh    yuvlnhfvba    cjbcajffur    nttsfcuszh    ibvgfrjnro 
#> "eguiqhmukh2" "yuvlnhfvba2" "cjbcajffur1" "nttsfcuszh8" "ibvgfrjnro6"

5. datastructures

datastructures パッケージはいくつかの高度なデータ型を提供する。 ハッシュテーブルは hashmap で提供される。

library(datastructures)
hm <- hashmap("character")
hm[labels] <- values
unlist(hm[target])
#> [1] "eguiqhmukh2" "yuvlnhfvba2" "cjbcajffur1" "nttsfcuszh8" "ibvgfrjnro6"

6. hashmap パッケージ

hashmap パッケージはキーとバリューに特定の型しか使えないが環境より速いとのこと。

library(hashmap)
H <- hashmap(labels, values)
H[[target]]
#> [1] "eguiqhmukh2" "yuvlnhfvba2" "cjbcajffur1" "nttsfcuszh8" "ibvgfrjnro6"

速度比較

上で紹介した 6 つの方法について速度を比較してみる。 テーブルサイズ N を 100~100000 まで変化させて速度を調べている。

library(magicfor)

magic_for()

for(N in 10^(2:5)) {
  # Create Data -------------------------------------------------------------
  labels <- generate_random_labels(N)
  values <- generate_random_values(labels)
  df <- data.frame(labels, values, stringsAsFactors = FALSE)
  
  target <- sample(labels, 100)
  
  # Preparation -------------------------------------------------------------
  named_values <- values
  names(named_values) <-  labels
  
  hash_env <- list2env(setNames(as.list(values), labels), hash = TRUE)
  
  library(hash)
  h <- hash(labels, values)
  
  library(collections)
  d <- Dict$new()
  invisible(Vectorize(d$set)(labels, values))
  
  library(datastructures)
  hm <- datastructures::hashmap("character")
  hm[labels] <- values
  
  library(hashmap)
  HH <- hashmap::hashmap(labels, values)
  
  # Benchmark ---------------------------------------------------------------
  library(microbenchmark)
  mb_result <- microbenchmark(
    named_vector = {
      named_values[target]
    }, 
    environment = {
      unlist(mget(target, envir = hash_env))
    },
    hash = {
      hash::values(h[target])
    },
    collections = {
      Vectorize(d$get)(target)
    },
    datastructure = {
      unlist(hm[target])
    },
    hashmap = {
      HH[[target]]
    }
  )
  
  time_df <- data.frame(N=as.character(N), mb_result)
  time_df
}

result <- Reduce(rbind, magic_result()$time_df)

結果を可視化する。

library(ggplot2)
ggplot(result, aes(x=N, y=time, fill=expr)) + 
  geom_boxplot() + scale_y_log10() + facet_wrap(~expr)

f:id:hoxo_m:20180828234546p:plain

library(dplyr)
result_mean <- result %>% 
  group_by(N, expr) %>% 
  summarise(mean=mean(time))
ggplot(result_mean, aes(x=N, y=mean, group=expr, color=expr)) + 
  geom_line() + scale_y_log10()

f:id:hoxo_m:20180828234555p:plain

テーブルサイズが数百程度であれば名前付きベクトルが速い。 それ以上は環境が一番速いようである。 hashmap は環境と遜色ない速度であり、直感的で簡潔な記述ができる。 hash パッケージを使う理由はもはや無いようである。

R言語徹底解説

R言語徹底解説