NumPy Arrayのプロセス間コピーについて

大量のデータを使った機械学習ではどれだけ早く学習の計算を終えられるかが重要になります。特にPythonの実装では学習サンプルの事前データ処理にかかる時間がボトルネックになる場合が多く、そのためPyTorchTensorflowなどのフレームワークはプロセス並列化によるデータ事前処理のクラスを用意しています。

PythonにはGlobal Interpreter Lock、つまりオブジェクト操作がスレッド間で排他的に行われるという制約があります。そのため1つのPythonプロセスで並行してデータ事前処理を行うことには限界があります。プロセスを複数立ち上げて並列化することで事前処理全体の速度向上を図ることができるというわけです。

いくら高速化するといってもプロセス並列化にもボトルネックとなりうる処理が存在します。それはプロセス間のデータ移行です。いくらサンプルデータ読み込みと事前処理を高速化してもデータ移行で損をするのはもったいない。

Pythonのプロセス並列化の標準ライブラリmultiprocessingにはいくつかのデータ移行方法が実装されています。このエントリの目的はこの移行方法の中でもPipeとSharedMemoryの速度差を調べることです。Pipeはプロセス間で書き込み読み込みの操作を排他的に行うことができるデザインになっておりプロセス間の同期に便利なオブジェクトです。オブジェクトはPipeを通るときにシリアライズが行われます。SharedMemoryは名前の通りプロセス間で共有できるメモリ領域です。この領域にプロセスたちが個々に書き込んだり読んだりすることでデータ共有が行えます。概してPipeの方がSharedMemoryに比べて制約が多い分、使い勝手が良いわけですが実際に速度差はどのくらいになるのでしょうか?

以下のようなコードを書いてみました。:

import multiprocessing as mp
import time
from multiprocessing.sharedctypes import RawArray

import numpy as np


def connection(shape, dtype, N):
    def f(conn):
        while True:
            conn.send(np.ones(shape, dtype))

    parent_conn, child_conn = mp.Pipe()
    process = mp.Process(target=f, args=(child_conn, ), daemon=True)
    process.start()

    start = time.time()
    for _ in range(N):
        array = parent_conn.recv()
    return (time.time() - start) / N


def shared_memory(shape, dtype, N):
    def f(conn, data):
        while True:
            conn.recv()
            data[:] = np.ones(shape, dtype).ravel()
            conn.send(None)

    ctype = np.ctypeslib.as_ctypes_type(dtype)
    len = int(np.array(shape, copy=False, dtype=dtype).prod())
    data = np.ctypeslib.as_array(RawArray(ctype, len))

    parent_conn, child_conn = mp.Pipe()
    process = mp.Process(target=f, args=(child_conn, data), daemon=True)
    process.start()

    start = time.time()
    parent_conn.send(None)
    for _ in range(N):
        parent_conn.recv()
        array = np.copy(data)
        parent_conn.send(None)
    return (time.time() - start) / N


if __name__ == '__main__':
    dtype = np.float32
    N = 100
    num_tests = 10
    dims = np.power(2, np.arange(1, 10 + 1))

    def run(f):
        durations = np.zeros((len(dims), num_tests))
        for i, dim in enumerate(dims):
            shape = (dim, dim)
            for j in range(num_tests):
                durations[i, j] = f(shape, dtype, N)
        return durations.mean(axis=1), durations.std(axis=1)

    c_mean, c_std = run(connection)
    s_mean, s_std = run(shared_memory)

    import matplotlib.pyplot as plt
    import seaborn as sns
    sns.set_style('darkgrid')
    fig = plt.figure(figsize=(5, 5))
    ax = fig.add_subplot(111)
    ax.errorbar(dims, c_mean, yerr=c_std, label='Connection', fmt='o')
    ax.errorbar(dims, s_mean, yerr=s_std, label='Shared Memory', fmt='o')
    ax.set_xlabel('Dim Size')
    ax.set_ylabel('Duration [s]')
    plt.legend(loc='upper left')
    plt.show()

connection()はPipeを通して直接NumPy Arrayを送る方法です。SharedMemoryに関してはこの記事を参考にしました。こちらはPipeを使ってはいますが、これはあくまで共有メモリの競合を防ぐためのフラグを送り合うためです。2つのプロセスはPipeのsend/recvのタイミングを通してお互いに異なるタイミングで共有メモリにアクセスします。NumPy Arrayと共有メモリの変換はnumpy.ctypeslibを使っており、親側では共有メモリからのコピーも行うことでPipeと同じ状況でarrayを使えるようにしています。

Mac Book Proで実行したらこうなりました: f:id:tanuchan_2020:20220210223155p:plain 図のX軸はコピーされるArrayのサイズ(X, X)におけるXの値です。Y軸はN=100回プロセス間コピーをしたときの平均時間を10回ずつ集めたときの平均と分散です。どちらもデータ移行ににかかる時間のみをtimeで計測しています。

Arrayのサイズが小さい内には両者の違いがほとんどありませんが、大きくなるにつれてShared Memoryが早くなっていることがわかります。

プロセス並列でNumPy Arrayを移行する際にはShared Memoryを使った方が速度的には良いという結果が出ました。ただし全てにおいてShared Memoryを使うべき、というわけではありません。そもそも両者の差がどこまで全体の速度向上に影響を与えるのかは状況次第です。1024x1024の行列を送るのにも0.01秒程度しか変わらないわけです。またShared Memoryは実験コードにあるように、共有の開始前にデータ型と形がわかっている必要があり実装がめんどくさいという欠点もあります。特に学習計算の処理に比べてデータサイズが大きい場合にShared Memoryの検討を導入してみてもいいかもしれませんね。