構造化『並列』プログラミング


構造化プログラミング。
といえば、順次分岐ループのパターンからなる、シリアルプログラミングにおける基本スタイルですね。(これらだけしか使わなくてもシリアルプログラムは書けますっていうね)
C++にはあらかじめこれらのパターンを簡単に使えるようにシンタックスが用意されています。
分岐はifやswitch、ループはwhile、for、doといったかんじで。

もちろんこれらのパターンは並列プログラミングでも使えます。
しかし並列プログラミングにおいては、これらのパターンだけで実装を行うことはほぼ不可能です。
そこで、並列プログラミングにおけるパターンとなる構造が考えられてきました。
これらを使うことで、よりパワフルかつ簡潔に並列プログラムのコードを書けるようになります。


1.マップ
もっとも基本的な並列パターンです。
まず、コードを見てください。

template <class T>
void map(T out[], const T in[], size_t size)
{
    for ( size_t i = 0; i < size; i++ )
    {
        out[i] = in[i];
    }
}

inの要素をoutにコピーしています。
これは、入力と出力の要素が1対1であり、各要素が影響し合いません。
よって、0からsize-1まで律儀に順を追う必要がなく、同時に実行しても問題ありません。
イメージとしては、

といったかんじです。
このパターンをマップといいます。

C++には、並列パターンがシンタックスとして用意されていません。
よって、これらの並列パターンを自分であらかじめ作っておくことといいと思います。
一度作ってしまえば、可読性のある並列プログラミングをいつでも書く用意ができたと言っていいでしょう。(ライブラリアンになるのだ)

言語拡張しているコンパイラやライブラリを使うのも手です。
今回はTBBを使ってみます。

template <class T>
void map(T out[], const T in[], size_t size)
{
    tbb::parallel_for(
        tbb::blocked_range<T>(0, size),
        [&](const tbb::blocked_range<T>& range)
        {
            for ( auto i = range.begin(); i != range.end(); ++i )
            {
                out[i] = in[i];
            }
        },  
        tbb::auto_partitioner()
    );
}

上記のようにparallel_forを使えば、マップを実現できます。


2.リダクション
これもまずコードを見てください。

template <class T>
T reductione(const T in[], size_t size)
{
    T sum = 0;
    for ( size_t i = 0; i < size; i++ )
    {
        sum += in[i];
    }
    return sum;
}

inの要素をsumに足しこんでいっています。
sumは加算前の状態を保持している必要があるので、マップでは無理ですね。
しかし、加算はどの順序で足し合わせても結果が同じになるので同時に実行することは可能です。
イメージとしては、

といったかんじです。
このパターンをリダクションといいます。

TBBを使った例を挙げておきます。

template <class T>
T reductione(const T in[], size_t size)
{
    return tbb::parallel_reduce(
        tbb::blocked_range<T>(0, size),
        T(0),
        [=](const tbb::blocked_range<T>& range, T sum) -> T
        {
            for ( auto i = range.begin(); i != range.end(); ++i )
            {
                sum += in[i];
            }
            return sum;
        },
        std::plus<T>(),
        tbb::auto_partitioner()
    );
}

上記のようにparallel_reduceを使えば、リダクションを実現できます。


3.スキャン
これもまずコードを見てください。

template <class T>
T scan(T out[], const T in[], size_t size)
{
    T sum = 0;
    for ( size_t i = 0; i < size; i++ )
    {
        out[i] = sum += in[i];
    }
    return sum;
}

マップとリダクションのフュージョンです。
これも考え方が合わさっただけで理屈は前の2つと同じですので、同時に実行することは可能です。
イメージとしては、

といったかんじです。
このパターンをスキャン(一次元リカレンス)といいます。

TBBを使った例を挙げておきます。

template <class T>
T scan(T out[], const T in[], size_t size)
{
    scan_body<T> body(out, in);

    tbb::parallel_scan(
        tbb::blocked_range<T>(0, size),
        body,
        tbb::auto_partitioner()
    );

    return body.sum;
}

上記のようにparallel_scanを使えば、スキャンを実現できます。
注意する点は、マップにあたる部分の処理とリダクションにあたる部分の処理を分ける必要があるため、関数オペレータ以外にコールバックされる関数を定義します。
よって、今までのようにラムダ式で渡せません。
以下のようなメンバ関数を持つクラスオブジェクトをファンクタとして渡しています。

// body for scan
template <class T>
struct scan_body
{
    T sum;
    T* out;
    const T* in;

    scan_body(T* out, const T* in)
        : sum(0)
        , out(out)
        , in(in)
    {
    }

    scan_body(const scan_body& body, tbb::split)
        : sum(0)
        , out(body.out)
        , in(body.in)
    {
    }

    template <typename Tag>
    void operator()(const tbb::blocked_range<T>& range, Tag)
    {
        for ( auto i = range.begin(); i != range.end(); ++i )
        {
            out[i] = sum += in[i];
        }
    }

    void reverse_join(const scan_body& body)
    {
        sum += body.sum;
    }

    void assign(const scan_body& body)
    {
        sum = body.sum;
    }
};

4.Fork-Join
これもまずコードを見てください。

T in[100];                // 1 input
T out1[100], out2[100];   // 2 output

map(out1, in, sizeof(in) / sizeof(T));
map(out2, in, sizeof(in) / sizeof(T));

関数mapは、「1.マップ」で作った関数だと思ってください。
ここでは関数自体にはあまり意味はありません。ようは、なにかの処理を複数行っているということです。
ここでの2つの処理はお互いに影響を及ぼしません。つまり同時に実行できます。
イメージとしては、

といったかんじです。
このパターンをFork-Joinといいます。
並列処理として一番イメージしやすいかもしれません。

TBBを使った例を挙げておきます。

T in[100];                // 1 input
T out1[100], out2[100];   // 2 output

tbb::parallel_invoke(
    [&](){ map(out1, in, sizeof(in) / sizeof(T)); },
    [&](){ map(out2, in, sizeof(in) / sizeof(T)); },
);

上記のようにparallel_invokeを使えば、Fork-Joinを実現できます。
ただし、並列実行する処理に前後関係がある場合にはこのパターンは使えません。
(たとえば、out1への出力を終えてからout2への出力を行う必要がある場合とか)

また、task_groupを使って以下のようにも書けます。(おまけ)

tbb::task_group group1;
tbb::task_group group2;

group1.run(
    [&]()
    {
        map(out1, in, sizeof(in) / sizeof(T));
    }
);

group2.run(
    [&]()
    {
        map(out2, in, sizeof(in) / sizeof(T));
    }
);

group1.wait();
group2.wait();

5.パイプライン
これもまずコードを見てください。

for ( size_t i = 0; i < size; i++ )
{
    T data = read(i);   // どっかから読み込む.

    treble(data);         // なにか処理をかける.

    write(data, i);       // どっかに書き込む.
}

読み込み/3倍処理/書き込みの3つの処理を繰り返しています。
ここで重要な点は2つです。
1つ目は、ループの中の処理(ステージ)の順番です。
読み込み→3倍処理→書き込みの順番は絶対に守らなければ、正しい結果が得られません。しかし、前後の周回のステージとの順番は気にする必要はありません。(10回目の書き込みの前に、11回目の3倍処理が行われていても問題ないよ)
2つ目は、インクリメンタルなデータがあるということ。(現在の状態を保持して次に渡していくってニュアンスでインクリメンタルってゆってまふ)
ここでのインクリメンタルなデータは i です。インクリメンタルなデータがあることで、周回を追い越すことはできません。(10回目の周回に入っていないのに、11回目の周回の処理は行えないよ)
これらのことに注意して並列化を行う必要があります。
イメージとしては、

といったかんじです。
このパターンをパイプラインといいます。
上の図では、インクリメンタルなデータは1つですが、もちろん1つとは限りません。

TBBを使った例を挙げておきます。

size_t i = 0;   // incremental data

// body for read
auto read_body = [&](tbb::flow_control& fc) -> Data<T> {
    if ( i >= size )
    {
        fc.stop();     // terminate roop
        return Data<T>(-1, T());
    }
    
    Data<T> data = Data<T>(i, read(i));   // read data

    i++;
	
    return data;
};

// body for treble
auto treble_body = [](Data<T> data) -> Data<T> {
    treble(data.value);
    return data;
};

// body for write
auto write_body = [](Data<T> data) {
    write(data.value, data.index);   // write data
};

tbb::parallel_pipeline(
    16, // max_number_of_live_tokens
    
    tbb::make_filter<void, Data<T>>(
        tbb::filter::serial_in_order,
        read_bosy
    ) &
    tbb::make_filter<Data<T>, Data<T>>(
        tbb::filter::parallel,
        treble_body
    ) &
    tbb::make_filter<Data<T>, void>(
        tbb::filter::serial_in_order,
        write_body
    )
);
// class of data
template <class T>
struct Data
{
    size_t index;
    T value;

    Data(size_t index, T value)
        : index(index)
        , value(value)
    {
    }
};

上記のようにparallel_pipelineを使えば、パイプラインを実現できます。
(少しわざとらしいコードにしすぎて、逆にわかりにくくなったかも・・・)
make_filterのテンプレートパラメータに、body(並列化する関数オブジェクト)の引数の型と戻り値の型を指定します。1つ目の引数は、並列化の動作を指定します。上の例では、読み込みと書き込みはserial_in_orderを指定して並列化されないようにしています。


以上、5つの並列パターンを紹介しましたが、並列パターンはまだ存在します。(多いんだよ・・・)
並列パターンは、その用途を具体的にイメージしやすいものから、どういった場面で使うのかよくわからない複雑なものまで多様です。
マップ → 画像処理のトーンカーブ処理。レイトレーシング。など
リダクション → 行列演算。など
スキャン → 乱数生成。など
パイプライン → データ圧縮。など

自分のよく使う処理に適用できる並列パターンは目を通しておいて損はないかと思います。

構造化並列プログラミングについてより詳しく知りたい人は、この本がおすすめです。
Structured Parallel Programming: Patterns for Efficient Computation
Michael McCool (著), James Reinders (著), Arch Robison (著)