PARDS:
A library for PARallel programs with Dataflow Synchronization (version 0.4)

目次

  1. はじめに
    1. 概要
    2. 動作環境
  2. 使い方
    1. 簡単な使い方
    2. SyncList<T>の使い方
    3. SyncQueue<T>の使い方
    4. WorkPool<T1, T2>の使い方
    5. SPMDサポート
    6. 並行処理サポート
  3. プログラム例
    1. Quick Sort
    2. bzip2
    3. GTK+によるGUI
  4. リファレンス
    1. 大域関数
    2. Sync
    3. SyncList
    4. SyncQueue
    5. WorkPool
    6. Merger
    7. 利用上の注意点
  5. おわりに
    1. 設計方針など
    2. 今後の課題

はじめに

これは、並列プログラムを作製するためのライブラリです(UNIX系OS用)。 pthreadなどを直接利用するよりも、簡単に並列プログラムを作製することが できます。実用的な例として、bzip2の並列化も行っています。

概要

近年、周波数向上によるCPUの性能向上が頭打ちになり、CPU台数を増やすこと による性能向上を目指すことが多くなってきています。従来はこのような共有 メモリ型並列計算機(SMP)は、ワークステーションなどに限られるものでした が、近年はPCや組み込み型プロセッサにも見られるようになってきています。

しかし、SMPにおけるプログラミングは簡単なものではありません。通常は、 pthread等のライブラリを用い、ロックやセマフォを駆使したプログラミング が必要です。これは、コーディングが面倒なだけでなく、動作するたびに情况 が変わり、修正することが非常に困難な非決定的なバグを引き起こします。

このライブラリは、このような問題を解決するため、より直観的で簡単な並列 処理記述を可能にします。具体的には、

機能を提供します。

例えばSync<T>の場合、Sync<int> aと宣言すると、aに対しては a.read(), a.write(1)のような操作が可能です。a.write(1)は、aの内容を1に する操作で、a.read()はその内容を取り出す操作です。この機能を用いて、 プロセス間で通信を行ないます。

ここで、a.read()は、a.write(1)が実行されるまでブロック(動作を停止)し ます。これをデータフロー同期と呼びます。また、同一変数に対するwrite操 作は一度しか行なえません(正確には、2度目以降の操作は内容を変更すること が出来ません)。

SyncList<T>, SyncQueue<T>は,Sync<T>をリストにしたも の、またリストの長さを制限したものです。これらは、generator-consumerパ ターンで利用されます。

WorkPool<T1,T2>は、SyncList<T>またはSyncQueue<T>から 複数のプロセスが仕事を取り出すパターンをサポートするためのものです。

また、その他にも、SPMDスタイルでの並列プログラムをサポートする機能、お よび並行処理をサポートする機能として、タイムアウトやインタラプト、リス トのマージ機能を提供します。

動作環境

このライブラリは、Unix系OSおよびプログラミング言語としてC++を前堤とし ます。また、OSの機能として、fork, System V IPC (共有メモリ、セマフォ)を 利用しています。開発とテストはRed Hat Linux 9 上で行なっていますが、 他の環境でも動作すると思います。

使い方

簡単な使い方

samplesディレクトリのsample.ccを例に説明します。以下は、samples.ccの mainにコメントを加えたものです。

int main()
{
  pards_init();      // ...(main:1) ライブラリの初期化
  
  Sync<int> a, b, c; // ...(main:2) 同期用変数の宣言

  SPAWN(add(1,b,c)); // ...(main:3) addをfork, bを待ち3番目に動作
  SPAWN(add(1,a,b)); // ...(main:4) addをfork, aを待ち2番目に動作

  a.write(3);        // ...(main:5) 最初に動作

  int v = c.read();  // ...(main:6) (main:3)のadd(1,b,c)の終了を待つ

  printf("value = %d\n",v);
  pards_finalize();  // ...(main:7) ライブラリの終了
}

ライブラリを利用するには、pards_init();を呼び出す必要があります (main:1)。また、ライブラリの利用を終了する際には、pards_finalize();を 呼び出す必要があります(main:7)。

同期用の変数は、Sync<int> a,b,c;のように宣言します(main:2)。この 場合、aの中にint型の変数を保持できます。

(main:3)で、add(1,b,c)という関数をSPAWNしています。これは、add(1,b,c)と いう関数をプロセスとしてforkすることをあらわします(SPAWNはマクロとし て実現されています)。

ここで、関数addの定義を以下に示します。

void add(int i, Sync<int> a, Sync<int> b)
{
  int val;

  val = i + a.read(); // ...(add:1) a.read()でaの値が書きこまれるのを待つ
  b.write(val);       // ...(add:2) b.writeで値を書きこむ
}

この関数は、第1引数の値と第2引数の値を足して、第3引数に返す、というも のです。第1引数は単純なint型、第2引数と第3引数はSync<int>型になっ ています。

(add:1)のa.read()はaに値が書きこまれるまでブロックします。aに値が書き こまれると実行を再開し、その値を取り出します。値を取りだした後は、 第1引数の値と可算し、valにその値を入れています。

(add:2)では、可算した値valをbに書きこんでいます。これにより、bの値を待っ てブロックしているプロセスが動作を再開します。

mainの説明に戻ります。(main:3)でforkされたadd関数の弟2引数bは、SPAWNし た時点ではどのプロセスもwriteしていません。従って、このadd関数はしばら くブロックすることになります。

同様に、(main:4)でforkされたadd関数の弟2引数aもwriteされていませんので、 このadd関数もブロックします。

ここで、(main:5)でaに3をwriteしています。これにより、(main:4)でforkし たadd関数が実行を再開します。実行が完了すると、bに値4がwriteされます。

bに値がwriteされると、(main:3)でforkされたadd関数も実行を再開します。 実行が完了すると、cに値5がwriteされます。

(main:6)では、cの値をreadしています。この文も、cの値がwriteされるのを 待ってブロックします。従って、(main:3)でforkされたadd関数の実行が完了 するのを待ちます。

このように、Sync<int>型変数を利用することで、SPAWNによりforkした プロセス間の通信と同期が可能になりました。

ここで、同じ変数に複数回writeしても、2回目以降のwriteは無視され、値を 変更することはできません(sample.ccでコメントアウトしてある部分を利用し て、実験することができます)。このような性質の変数を単一代入変数と呼ぶ ことがあります。

この性質を積極的に利用することで、「早い者勝ち」のようなアルゴリズムを 実現することもできます。

実装に関するコメント

先ほども触れましたが、SPAWNは、内部からfork()を呼び出すマクロとして 実現されています。

Sync<T>は、プロセス間通信を実現するために、System V IPCを利用し ています。具体的には、pards_init();で共有メモリを確保し、プロセス間で の通信に利用しています。

また、プロセスのブロックと再開、および共有メモリアクセスの排他制御を実 現するために、System V IPCが提供するセマフォを利用しています。

Sync<T>型変数の中には、共有メモリへのポインタとセマフォIDだけが 保存されているため、関数に値渡しすることが可能です(sample.ccのaddの引 数)。もちろん、ポインタや参照として渡しても構いません。

また、pthreadのようなスレッドではなく、forkによりプロセスを生成してい るため、fork後に(誤って)大域変数を変更してしまっても、別のプロセスには 影響しません(スレッドを利用した場合、うっかり他のプロセスに影響するよ うな変数の更新を行ってしまうことが修整困難なバグの原因になりがちでした が、そのようなことはおこりません)。また、forkはメモリ空間をコピーする ので(正確には、実際にコピーが発生するのは書きこんだ際)、fork前に親プロ セスで書きこんだ変数の値を利用することも出来ます。

資源の確保と解放

sample.ccでは行ないませんでしたが、多くの同期変数を利用する場合や、長 く動作するプログラムの場合、確保した資源(共有メモリ領域とセマフォ)を 解放する必要があります。ここで、共有メモリやセマフォは複数のプロセスで 共有されるため、単純にデストラクタで解放するのは危険です(writeしたプロ セスが不用だからといって解放すると、readが終了する前に解放してしてしま う可能性がある)。このため、資源の解放は明示的に行ないます。

スタック上に確保した変数に対しては、free()を呼び出します。もちろん、解 放は、他に同じ資源を参照しているプロセスがないことが保証される情况で行 う必要があります。典型的には、writeするプロセスが1つ、readするプロセス が1つの情况で、readするプロセスがreadを完了した後に行います。

free()を使った例はsamplesディレクトリのfib.ccにあります。このプログラ ムは、フィボナッチ数列を2つのプロセスで並列に計算しています。

newを利用して同期変数を確保する場合は、受け渡しされる値そのものだけで はなく、Sync<T>型変数の領域そのものも共有メモリ領域に確保されま す。この場合は、deleteを実行することで、共有される資源および同期用変数 の領域が解放されます。したがって、readする側で一度deleteするだけで良い ということになります。

newを利用した際の実装がこうなっている理由は、SyncList<T>型変数に おける実装と整合性を取るためです。SyncList<T>型変数については、 後述します。

SyncList<T>の使い方

SyncList<T>は、generator-consumerパターンで利用されます。このパ ターンでは、あるプロセスが値のリストを生成(generate)し、別のプロセスが リストの値を使います(consume)。リストの生成と消費を別のプロセスで行う ことで、パイプライン的な並列化が可能になります。

samplesディレクトリのlistsample.ccを例に説明します。

int main()
{
  pards_init();

  SyncList<int> *a;      // ...(main:1) リストの最初のセルの宣言
  a = new SyncList<int>  // ...(main:2) リストセルの確保

  SPAWN(generator(a));   // ...(main:3) generatorプロセスのフォーク
  SPAWN(consumer(a));    // ...(main:4) consumerプロセスのフォーク

  pards_finalize();
}

まず、(main:1), (main:2)において、リストの最初のセルとなる変数の宣言と newによる確保を行っています。そして、(main:3), (main:4)において、 generatorプロセスおよびconsumerプロセスのフォークを行っています。 generatorプロセス、およびconsumerプロセスには、先ほど確保したリストの 最初のセルとなる変数を渡しています。

つぎにgeneratorを見てみましょう。

void generator(SyncList<int> *a)
{
  int i;
  SyncList<int> *current, *nxt;  
  current = a;                 // ...(gen:1) 引数をcurrentに入れる

  for(i = 0; i < 10; i++){
    current->write(i);         // ...(gen:2) リストセルに値を入れる
    printf("writer:value = %d\n",i);
    nxt = new SyncList<int>;   // ...(gen:3) あたらしいリストセルを確保
    current->writecdr(nxt);    // ...(gen:4) currentのcdrを確保したセルに
    current = nxt;             // ...(gen:5) currentを確保したセルに
    sleep(1);                  // ...(gen:6) 動作を見るためのwait
  }
  current->write(i);
  printf("writer:value = %d\n",i);
  current->writecdr(0);        // ...(gen:7) リストを0で終端
}

generatorはリストを作製し、中に値を入れていく機能を持ちます。 Sync<T> の場合と同様、write(i)でリストセルに値を入れることができ ます(gen:2)。

リストの次のセルを(gen:3)でnewを用いて作成しています。そして、そのセル を(gen:4)でwritecdr()を用いてもとのセルにつなげています。

ここで、新しいセルはnewを用いて生成する必要があります(スタック上に取っ たセルを結合してはいけません)。これは、セルを共有メモリ上に確保しない と、別プロセスであるconsumerに値を渡すことができないためです。newを用 いると、共有メモリ上にセルが確保するように実装されていますので、 consumerプロセスにセルを渡すことができます。

このように、SyncList<T> ではnewを用いると共有メモリ上にメモリ領域 を確保しますので、これに合わせ、Sync<T> でもnewを用いると共有メモ リ上に領域を確保する、としました。

これをforループで繰り返すことにより、リスト構造を作成しています。動作 を見るため、ループの繰り返しごとに1秒waitを入れています(gen:6)。 リストの最後は0で終端しています(gen:7)。

つぎに、consumerの方を見てみましょう。

void consumer(SyncList<int> *a)
{
  SyncList<int> *current,*prev;
  current = a;

  while(1){
    printf("reader:value = %d\n",
                current->read()); // ...(cons:1) セルの値を取り出し、表示
    prev = current;               // ...(cons:2) 現在のセルを保存
    current = current->readcdr(); // ...(cons:3) cdrの取り出し、現在のセルに
    delete prev;                  // ...(cons:4) 利用済セルの削除
    if(current == 0) break;       // ...(cons:5) 終端チェック
  }
}

(cons:1)でセルの値を取り出して、表示しています。ここでSync<T> の 場合と同様、値がwriteされるまで、ブロックします。

現在処理中のセルを保存しておいて(cons:2)、cdrを取り出して現在のセルに します(cons:3)。ここでも同様に、writecdrによってcdrが書きこまれるまで は、この操作はブロックします。

cdrの読み取りが終了すると、もとのセルは利用済ですから、削除します (cons:4)。ここで、deleteを使っていますが、これにより、共有メモリ領域に 確保されたセルの解放、およびセマフォの解放が行われます。

最後に終端チェックを(cons:5)で行い、終端の場合は終了します。

このプログラムの実行結果は、以下のようになるはずです。

writer:value = 0
reader:value = 0
writer:value = 1
reader:value = 1
writer:value = 2
reader:value = 2
...

consumerはgeneratorが値を書きこむのを待つので、1秒ごとに上記のような出 力が表示されるはずです。

省略記法

上であげたようなリストの生成、消費は典型的なパターンなので、若干記述量 を減らすことができる省略記法を用意しました。

まず、generatorにおいて、「新たなリストセルを生成し、現在処理中のセル のcdrに接続する」という操作は、前述の例では以下のように記述していました。

  nxt = new SyncList<int>; 
  current->writecdr(nxt);
  current = nxt;

これを簡潔に記述するため、「新たに作成したセルを、対象セルのcdrに接続 した上で返す」createというメンバ関数を用意しました。この記法を用いると、 上記の例は以下のように1行で記述できます。

  current = current->create();

さらに、nxtというテンポラリ変数も不要になりました。

また、consumerにおいて、「現在処理中のセルからcdrを取り出し、これを次 の処理対象のセルとし、前のセルを削除する」という操作は、以下のように記 述していました。

  prev = current;
  current = current->readcdr();
  delete prev;

これも、簡潔に記述するため、「cdrを取り出して、もとのセルをdeleteした 後、取り出したcdrを返す」というrelease()というメンバ関数を用意しました。 これを用いると、以下のように記述できます。

  current = current->release();

以上の省略記法を用いることで、プログラムを簡潔に記述することができます。 この記法を用いた例は、listsample2.ccにあります。

SyncQueue<T>の使い方

前述の例では、generator側にwaitを入れていました。このように、generator側 がボトルネックになる場合はよいのですが、consumer側の動作が遅い場合、 generator側の動作が先行してしまい、consumer側での資源の解放が追いつか ず、資源が枯渇してしまう可能性があります。

これを避けるためには、consumer側で資源の解放を行うまで、 generatorの動作をブロックする必要があります。SyncQueue<T>は、こ のような機能を提供します。

SyncQueue<T>は、SyncList<T>とほとんど同じですが、最初のセ ルを確保する際にQueueの長さをコンストラクタの引数として指定します。

  a = new SyncQueue<int>(2);

このように宣言すると、このQueueにリストを繋ぐことのできる回数(すなわち writecdrまたはコンストラクタでcdrに書き込む操作)を2回に制限します。 それ以上、cdrに繋ごうとする操作はブロックします。また、このQueueに繋がっ たセルをdeleteまたはreleaseメンバ関数で削除すると、リストを繋ぐ回数は 増えます。cdrに繋ごうとしてブロックしている操作がある場合は再開されます。 すなわち、常時存在しているリストセルの個数は最大で3個になります。

上記のように確保しなければならないセルは、最初のセルだけです。それ以降 のセルは、SyncList<T>と同じように確保、cdrとして設定します。

また、SyncList<T>型とは異なり、あるセルは複数のセルのcdrとして設 定することは出来ません(システムが検出し、エラーを出力します)。

queuesample.ccでは、listsample2.ccの型をSyncQueue<T>型とし、wait をconsumer側でかけています。実行結果は以下のようになるはずです。

writer:value = 0
writer:value = 1
writer:value = 2
reader:value = 0
writer:value = 3
reader:value = 1
writer:value = 4
...

まず、generator側にwaitは入っていませんから、0, 1, 2まではすぐに表示します。 つぎに、consumerが0を表示し、セルをひとつ削除した時点で、generatorの動 作が再開し、3を表示します。以下同様に、1秒ごとに表示が進んでいきます。

WorkPool<T1, T2>の使い方

SyncList<T>, SyncQueue<T>だけを用いてもプログラムは記述で きますが、これらだけを使ってプログラムを記述すると、SPAWNをたびたび呼 び出す必要があるかも知れません。プロセス起動のオーバヘッドは最近のOSで はそれほど大きなものではありませんが、それでもプロセス起動の回数を減ら したいこともあると思います。

そこで、あらかじめworkerとなるプロセスをいくつか(例えばプロセッサ台数 分)立ち上げておき、それらが仕事の「プール」から適宜仕事を取り出すとい う形のプログラムをサポートするクラスを用意しました。

使い方をsamplesディレクトリのworkpoolsample.ccを使って説明します。

int main()
{
  pards_init();

  SyncQueue<int> *work = new SyncQueue<int>(2);  // ...(main:1) 仕事キュー
  SyncQueue<int> *output = new SyncQueue<int>(2);// ...(main:2) 出力キュー

  WorkPool<int,int> *pool = new WorkPool<int>(work,output);
                                         // ... (main:3) WorkPoolの定義
  SPAWN(generator(work));                // ... (main:4) 仕事の生成
  SPAWN(worker(1, pool));                // ... (main:5) woker1のfork
  SPAWN(worker(2, pool));                // ... (main:6) woker2のfork

  while(1){
    printf("%d...\n",output->read());    // ... (main:7) 出力キューの表示
    if((output = output->release()) == 0) break;
                         // ... (main:8) 次のセルを取り出す。終端チェック
    
  }
  pards_finalize();
}

まず、「仕事」を値として持つSyncQueue workと、「仕事」を処理した結果を値と して持つSyncQueue outputを定義します(main:1), (main:2)。ここでは SyncQueueを用いましたが、SyncListを使っても構いません。

次にWorkPool型の変数poolを定義します(main:3)。WorkPool<T1,T2>の T1, T2には、仕事用SyncQueue<T>に指定した型(int)と、出力用 SyncQueue<T>に指定した型(int)を指定します。また、コンストラクタ の引数には、先ほど作成した仕事用SyncQueue workと出力用SyncQueue output を指定します。

このように、WorkPool型では、仕事用SyncQueue (SyncList)と出力用 SyncQueue (SyncList)をペアで扱います。すなわち、処理する「仕事」を取り 出す際には、対応した出力先となるリストセルとペアで取り出されます。

これにより、WorkPoolから取り出された仕事がそれぞれ別のプロセスにより処 理され、ばらばらの順番で完了したとしても、その出力結果はSyncQueue / SyncListに順番に入れられることになります。

(main:5), (main:6)では、WorkPoolであるpoolを引数にworkerをSPAWNします。 workerを区別するために、idも引数として渡しておきます。

(main:7), (main:8)では、処理した結果を出力しています。

それではworkerの方の定義を見てみましょう。

void worker(int id, WorkPool<int,int> *workpool)
{
  while(1){
    WorkItem<int,int> item = workpool->getwork(); // (worker:1) 仕事の取り出し
    if(item.output == 0) break;                   // (worker:2) 終端チェック
    else{
      item.output->write(item.work*2);            // (worker:3) 値を2倍して出力にwrite
      printf("(%d by [%d])\n",item.work*2,id);    // (worker:4) 処理したworker idをプリント      
    }
  }
}

まず、(worker:1)でWorkPoolから仕事を取り出します。ここで、取り出した仕 事の型はWorkItemとなります。WorkItem<T1,T2>のT1, T2に与える型は、 WorkPool<T1,T2>と同じです。

WorkPoolのメンバ関数getwork()を呼び出すことで、WorkItem型の「仕事」を 取り出すことができます。WorkItem型の中には、対象とする仕事と、出力先の セルが入っています。

ここで、getwork()の内部では、

という処理が行われます。これにより、WorkPoolの利用者は、セルの解放/作 成を意識せず、WorkPoolを利用できます。

とりだしたWorkItem型の変数itemのメンバoutputには出力先のセルが入ってい ます。(worker:2)ではこれが0かどうかで終端チェックを行っています。

WorkItem型変数のメンバworkには、T1型(ここではint型)変数として、仕事内 容が入っています。(worker:3)ではこれを2倍して、outputセルにwriteしてい ます。

(worker:4)では処理したworkerが誰かを見るため、outputセルにwriteした内 容と、workerのidをプリントしています。これを見ることで、複数のworkerで 処理していることが確認できます。

このサンプルの出力結果は、こんな感じになるはずです:

(0 by [1])
(2 by [1])
0...
2...
(4 by [1])
(6 by [2])
4...
6...
(8 by [1])
(10 by [2])
8...
10...

0, 1, 2, ... といった整数列が2倍された結果が出力され、またどのworkerがそ の計算を行ったのかが出力されます。実行するごとに、どのworkerが計算を行っ たかは変わるはずです。

上記の例では行いませんでしたが、WorkPool型変数をdeleteする際に、そのタ イミングが問題になる場合があります。すなわち、出力がそろったからといっ てWorkPool型変数をdeleteすると、まだworkerが残っていて(終端チェッ クまで済んでおらず)、解放された資源を参照してエラーになる可能性がある のです。

これを避けるために、WorkPool型変数のコンストラクタに、workerの数をリファ レンスカウントとして与えることができます(最後の引数として渡します)。そ して、workerでは、終端チェックの結果、終了する際にrelease()というメン バ関数を呼び出すことで、workerが終了したことをシステムに通知します。

全てのworkerがreleaseを呼ぶと、WorkPool型変数はdeleteされます。delete を明示的に呼び出す必要はありません(旧版から変更されました)。

これらの例は、workpoolsample.ccにコメントアウトした形で記述してあります。

コンストラクタ引数にカウンタを与えなかった場合は、release()を呼ばなく てもdeleteすることができます。しかし、これは一般に危険な操作となります。 また、コンストラクタ引数にカウンタを与えなかった場合でも、release()を 呼んで構いません。

newではなく、スタック上にWorkPool型変数を確保した場合は、Syncの場合と 同様、free()で資源を解放します。この場合も同様になります。

SPMDサポート

数値計算系のプログラムを並列化する際には、ループを並列化する、というこ とが良く行われます。SPAWNを使ってもこれは実現できますが、並列に処理を 行う単位を別関数にするなど、若干手間がかかります。

そこで、ライブラリ側で、SPMDと呼ばれるプログラム型式をサポートすること にしました。

SPMDとは、"Single Program, Multiple Data"の略で、実行するプログラムは 一つだけれども、それぞれのプロセッサで異なるデータを処理の対象とするこ とで、並列実行を行おう、というものです。

SPMDに基づく並列ライブラリとしては、"MPI"というものが有名です。 MPIでは、分散メモリ並列計算機で、メッセージパッシングを行いながら並列 処理を行います。全てのプロセッサは同じプログラムを実行しますが、自プロ セッサ番号を取得し、それによって分岐することで、処理内容を変えたり、 処理を行う配列の場所を変えたりします。

PARDSによるSPMDサポートもMPIによるものと類似しています。ただし、MPIで はプロセッサ間の情報の受け渡しにはメッセージパッシングを用いる必 要がありましたが、PARDSでは共有メモリを介して行うことができます。

使い方をsamplesディレクトリのspmdsample.ccを使って説明します。 このプログラムは、配列aのデータを可算するだけのものです。

#define NP 3
...

  int *a = (int*)pards_shmalloc(100*NP*sizeof(int)); // (1) 入力用データ
  int *out = (int*)pards_shmalloc(NP*sizeof(int));   // (2) 出力用データ

  for(int i = 0; i < 100 * NP; i++)       // (3) 中身を初期化
    a[i] = 1;

  PBInfo* pbi = pards_begin_parallel(NP); // (4) 並列ブロックの開始

  int pno = pbi->getpno();                // (5) 自プロセッサ番号の取得
  int *my_part = a + 100 * pno;           // (6) 自プロセッサが処理する場所を計算
  
  int sum = 0;

  for(int i = 0; i < 100; i++)            // (7) 自プロセッサ担当部分を処理
    sum+=my_part[i];

  out[pno] = sum;                         // (8) 結果を共有メモリに置く

  pards_barrier(pbi);                     // (9) 全プロセッサの終了を待つ

  int totalsum = 0;
  if(pno == 0){                           // (10) 0番プロセッサが総和を計算
    for(int i = 0; i < NP; i++)
      totalsum += out[i];
  }

  pards_end_parallel(pbi);                // (11) 並列ブロック終了
  ...

まず、(1), (2)で全プロセッサで共有する入力データと出力データを共有メモ リ上に確保しています。

ここで、明示的な共有メモリの確保には、pards_shmallocという関数を用います。 mallocと同様ですが、共有メモリ上にメモリを確保します。解放には、 pards_shmfreeを用います。

(3)では、入力となる配列aの中身を1に初期化しています。

(4)からSPMDによる処理が開始します。このpards_begin_parallel以降、 (11)のpards_end_parallelまで、pards_begin_parallelの引数に渡した数のプロセス で、並列に同じ処理を行います。

pards_begin_parallelは、PBInfo型の値を返します。ここにはこの並列ブロック の情報がおさめられています。

全てのプロセスで全く同じ処理を行うわけにはいきませんので、(5)で自プロ セス番号を取得します。これには先ほどのPBInfo型の変数を使います。 自プロセス番号は、0から始まり、生成したプロセス数-1までの整数です。

(6)で、自プロセス番号から、自プロセスが処理すべき場所を計算します。

(7)では自プロセスが処理すべき場所について、配列の総和を求めています。 そして、(8)で、その結果を(1)で確保した共有メモリの自プロセス担当場所に 書き込みます。

(9)では「バリア」という処理を行っています。これは、全てのプロセスが この文に到達するまで待つ、という処理です。これを行わないと、他のプロセ スが処理を完了したかどうか分りません。

(9)のバリアによって、全プロセスの処理が完了したことを確認した後、 (10)で、0番プロセスが代表して、全プロセスの処理結果の総和を求めて います。

(11)のpards_end_parallelでSPMD処理は終了します。この文以降、プロセスは 1つに戻ります。

SPMD機構は、大きな配列を処理するループがあり、その部分が全実行時間の主 な部分を占める、という場合に有効な並列化手法です。samplesディレクトリ には、配列乗算の例もありますので、ご参考にしてください。

並列ブロック中で、SPAWNなどを使うことも可能ですが、そのSPAWNはプロセス 番号により分岐しない限り、全プロセスで実行されてしまうということに注意 してください。

また、並列ブロック(pards_begin_parallel)をネストして使用することも可能 ですが、同様に全プロセスで実行されてしまい、制御が難しくなります。特に バリア文を実行する場合は、実行するプロセスを間違えないよう注意が必要で す。

並行処理サポート

これまでは、複数のプロセッサを利用してプログラムの実行速度を向上するた めの「並列処理」をサポートする機能について議論してきました。しかし、プ ログラムを行う際には、これ以外にも、処理の並行性を明示的に記述すると いう「並行処理」機能が必要になることがあります。

「並行処理」が必要になるのは、I/Oが関連する場合などです。例えば、 GUIアプリケーションで、ユーザからの入力を待ちながら、ネットワークから のデータ転送を待つ場合などが考えられます。

このようなプログラムを記述する場合、「ユーザの入力をブロックして待つ」 処理と「ネットワークからの入力をブロックして待つ」処理を別のプロセス (やスレッド)として記述したい、というわけです。

(もちろん、"select"のように、複数の入力を待ち合わせる機能を使っても、記 述することはできますが、別のプロセスやスレッドとして記述した方が、分り やすい場合もあります。)

PARDSでは、このようなプログラムを組む場合に必要になる機能も用意してい ます。具体的には、

となります。以下にそれぞれ説明します。

マージ機能

GUIアプリケーションで、ユーザからの入力を待ちながら、ネットワークから のデータ転送を待つ場合などを考えます。ユーザからの入力待ちとネットワー クからの入力待ちをそれぞれ別のプロセスで行い、どちらかから入力があれば、 別のプロセスがその入力に応じた処理を行う、というプログラムを書きたい場 合があります。

このような機能を実現するため、リストあるいはキューのマージを行う仕組み を導入しました。この機構は、ちょうど、"select"のように複数の入力を待ち 合わせるのに利用することができます。

この機能は、Mergerというクラスを利用することで利用できます。これを以下 の例を用いて説明します。この例は、samplesディレクトリのmergersample.cc にあります。

  SyncQueue<Item> *first = new SyncQueue<Item>(2); // (main:1) 最初のセルを作成
  
  Merger<Item> *merger = new Merger<Item>(first,2);// (main:2) 最初のセルを渡して、
                                                   //          mergerを作成
  
  SPAWN(generator(1,merger));  // (main:3) mergerを渡してgenerator1を呼び出し
  SPAWN(generator(2,merger));  // (main:4) mergerを渡してgenerator2を呼び出し
  SPAWN(consumer(first));      // (main:5) 最初のセルを渡してconsumerを呼び出し

まず、(main:1)で、最初のセルを作成します。ここではSyncQueueを使ってい ますが、SyncListでも構いません。Itemは、

class Item{
public:
  int val;
  int id;
};

のように定義しています。valに値を、idに後述するgeneratorのidを入れます。

次に、(main:2)で、作成した最初のセルをコンストラクタの引数として、 Mergerクラスのインスタンス mergerを作成します。コンストラクタの第2引数は、このmergerを共有するプ ロセスの数です。これは、mergerが利用するリソースを解放する際に使います。 (WorkPoolと同様です。)

(main:3), (main:4)では、作成したmergerを引数にgeneratorを呼び出してい ます。generatorは、mergerを利用してfirstから始まるSyncQueueにリストセ ルを追加していきます。この追加は、generator1, generator2が任意の順番で 行います。

(main:5)では、最初のセルを引数にconsumerを呼び出しています。consumerで は、generator1, generator2が作成したリストをたどります。

次に、generatorの定義を見てみます。

void generator(int id, Merger<Item> *m)
{
  ...
  for(int i = 0; i < 3; i++){
    Item it;       // (generator:1) Itemの作成
    it.val = i;    // (generator:2) Itemの値をセット
    it.id = id;    // (generator:3) Itemのidをセット
    m->put(it);    // (generator:4) Itemをmergerにput
  }
  m->release();    // (generator:5) mergerの解放
}

(generator:1), (generator:2), (generator:3)で、リストに入れるItemを作 成、値の設定を行っています。

そして、(generator:4)で、mergerのputを呼び出すことで、作成したItemをリ ストに入れています。リストセルの生成やcdrへの設定等は、putの中で行われ ます。

他のプロセス(この場合generator)が同じmergerに対してputを呼び出すことも あります。その場合、呼び出した順番で、リストに追加されていきます。

リストを作成し終ったら、(generator:5)でreleaseを呼び出し、解放します。 merger生成時に指定した数のプロセスがreleaseを呼び出した時点で、merger の資源が実際に解放されます。また、リストが終端されます。

実行中に、mergerを共有するプロセスの数を増やしたくなった場合は、 increase_referer()を呼び出すことで、増やせます。

最後にconsumerを見てみます。

void consumer(SyncQueue<Item> *l)
{
  SyncQueue<Item> *current = l;
  while(1){
    Item it = current->read();
    printf("val = %d, id = %d\n",it.val, it.id);
    current = current->release();
    if(current == 0) break;
  }
}

こちらは、mergerを使ったことによる特別な処理はありません。リストを終端 までたどり、その値を表示しています。結果は以下のような感じになるはずで す。

val = 0, id = 2
val = 0, id = 1
val = 1, id = 2
val = 1, id = 1
val = 2, id = 1
val = 2, id = 2

リストに値を入れたgeneratorのidが交互にでていることが見て取れます。

この例では、generatorの中で直接Mergerを扱いましたが、リストを生成する 複数のプロセスがすでに存在する場合、それらのリストをマージしたい場合も あります。その場合は、マージするプロセスをそれぞれのリストについて作成 することで実現することができます。この例はmergersample2.ccにあります。

値が決定しているかチェックする機能

この機能はシンプルで、Sync<T>型変数に対してはis_written(), SyncList<T>, SyncQueue<T>型変数に対しては、それに加えて is_cdr_written()という関数を用意することで、値、あるいはcdrが決定して いるかどうか調べることができます。決定している場合は1を、していない場 合は0を返します。

この関数を用いることで、read()やreadcdr()などを実行することでブロック するかどうかを事前に確認することができます。

タイムアウト機能

ブロックする可能性のある操作、例えばSync<T>型変数に対するread()のよう な操作を行う際、ある一定時間以上はブロックせず、操作を終了するようにし たい場合があります。例えば、socketからの入力を待つような際、相手側ホス トやネットワークが原因で応答が返ってこないような場合、一定時間経過後 にその操作を終了して、ユーザにエラーを返したい、ということが考えられま す。

このような場合に対応するため、以下のタイムアウト操作を用意しました。

それぞれの操作は、read, readcdr, release, writecdr, create, putのタイムアウ ト付きの版です。struct timeval* にはタイムアウトするまでの時間を与えま す(ただし、0が与えられた場合はタイムアウトを行いません)。

pards_status* はenum型へのポインタで、SUCCESS, TIMEOUT, INTERRUPTのい ずれかの値を持ちます。プログラマはメモリを確保した上で、そのポインタを 上記のtimed...操作に渡すことで、その内容がSUCCESS, TIMEOUT, INTERRUPT のいずれかとなります。SUCCESSの場合はタイムアウトせずに成功、TIMEOUTの 場合はタイムアウトしたことを表します。INTERRUPTはインタラプトがあった ことを表し、操作は成功しなかったことを表します。INTERRUPTについては後 述します。

ここで、writecdr、およびcreateのタイムアウト版はSyncListには存在しませ ん。これは、SyncListではこれらの操作はブロックしないためです。逆に、 SyncQueueでは、現在のキューのサイズによってはこれらの操作はブロックす る可能性があるため、タイムアウト版が存在します。

timedputは、MergerのコンストラクタにSyncQueueが与えられた場合のみ利用 可能です(SyncListの場合、putはブロックしません)。

サンプルは、samplesディレクトリのtimedreadsample.cc, timedlistsample.cc, timedqueuesample.cc, timedputsample.ccにあります。

インタラプト機能

タイムアウトだけではなく、ユーザが操作をキャンセルしたい場合もあります。 例えば、socketからの入力を待っていて、タイムアウト時間内ではあるものの、 ユーザが待ちきれずキャンセルしたい場合などです。

このような場合に対応するため、「インタラプト」という機能も提供していま す。

インタラプトは、SPAWNしたプロセスに対してシグナルを送る仕組みです。現 在の実装では単純にSIGUSR1を利用しています。

また、これまで利用してきたSPAWNでは、プロセスのIDを知る方法がありませ んので、あらたにISPAWN(pid, func)というマクロを用意しました。第1引数に PIDを保存する変数を指定します(マクロですので、ポインタにする必要は無い ため、直接変数を指定する形にしています)。PIDの型は、pards_pidです。現 在はpid_tのtypedefですが、将来の拡張のため、型名を変えています。

ISPAWNでSPAWNしたプロセスに対しては、pards_send_interrupt(pards_pid)で インタラプトを送ることができます。

インタラプトを受け取ったプロセスはpards_is_interrupted()でインタラプト があったかどうか知ることができます。インタラプトがあった場合は1を、無 かった場合は0を返します。インタラプトがあったという状態をクリアしたい 場合は、pards_clear_interruption()でクリアできます。

ブロックする操作の途中でインタラプトがあった場合、操作の種類によって動 作が異ります。

まず、read, readcdr, release, writecdr, createのような通常の操作の場合 は、インタラプトがあってもブロックから抜けません。操作が終了した場合、 操作が成功したことが保証されます。

タイムアウトのために導入した操作(timedread, timedreadcdr, timedrelease, timedwritecdr, timedcreate)の場合は、インタラプトがある と、操作を中断します。そして、pards_statusとして、INTERRUPTを返します。

さらに、タイムアウトは持たないが、インタラプトに対しては中断する操作と して、

を用意しました。これらはタイムアウトはしませんが、インタラプトに対して は中断します。中断があったか、操作が終了したかどうかはpards_statusを見 ることで判定できます。SUCCESSの場合は成功、INTERRUPTの場合はインタラプ トによって中断したことを表します。

ブロックするシステムコールを実行中にインタラプトがあった場合、そのシス テムコールはブロックから抜けます。この場合、通常、システムコールが成功 したかどうかを表す返り値が-1となり、errnoがEINTRを返します(システムコー ルの種類に依存します)。

サンプルは、samplesディレクトリのintrreadsample.cc, intrlistsample.cc, intrqueuesample.cc, intrputsample.cc にあります。

プログラム例

ここでは、プログラム例として、Quick Sortと実用的な例としてbzip2の並列 化を紹介します。 また、並行処理機能を用いたGUIプログラムの例として、GTK+プログラムも紹介します。

Quick Sort

ここではSyncをデータ入出力に使うのではなく、単純に同期用に使う方法を紹 介します。

まず、Quick Sortのアルゴリズムについて説明します。Quick Sortでは、

というアルゴリズムでソートを行います。

直観的に分るように、分割した配列のそれぞれに対するソートは並列に行うこ とができます。

ここで、分割した配列を引数に与えてSPAWNし、ソートした配列の中身を Sync<T>に入れ、readで値を取り出す、という方法も考えられますが、 配列の値のコピーを何度も行うことになり、効率的ではありません。そこで、 Sync<T>はソートが終了したということを待つ同期のためだけに用い、 配列の値は共有メモリに入れ、各プロセスからアクセスすることにします。

明示的な共有メモリの確保には、pards_shmallocという関数を用います。 mallocと同様ですが、共有メモリ上にメモリを確保します。解放には、 pards_shmfreeを用います。以下では、ソート対象の配列は共有メモリにある ものとして、説明します。

まず、逐次版のQuick Sortのプログラムは、単純には以下のようになります。

void qsort (int v[], int left, int right)
{
  ...

  if(left >= right)
    return;

  last = partition(v,left,right);  
  qsort(v,left,last-1);
  qsort(v,last+1,right);
}

ここで、partitionは、先ほど述べたとおり、配列をある値以下、以上に分割 します。返り値が上下を分割する配列のインデックスです。分割後、 対象とする範囲をqsort関数の引数に与え、再帰呼び出しを行います。

これを並列化すると、以下のようになります。

void qsort_sync (int v[], int left, int right, Sync<int> s)
{
  ...

  if(left >= right){
    s.write(1);
    return;
  }

  if(pards_avail_processes() == 0 ||      // (1) プロセス数に余裕があるか?
     right - left + 1 < min_size){        // (2) 充分な処理量か?
    qsort(v,left,right);                  // (3) そうでなければ逐次で実行
    s.write(1);                           // (4) 終了通知
    return;
  } else {
    Sync<int> t, u;                       // (5) 同期用変数の確保
    last = partition(v,left,right);       // (6) まず分割
    SPAWN_F(qsort_sync(v,left,last-1,t)); // (7) 新プロセスで左側再帰呼出
    qsort_sync(v,last+1,right,u);         // (8) 右側再帰呼出
    t.read();                             // (9) 左側の終了を待つ
    t.free();                             // (10) 同期変数の解放
    u.read();                             // (11) 右側の終了を待つ
    u.free();                             // (12) 同期変数の解放
    s.write(1);                           // (13) 呼出元に終了通知
    return;
  }
}

qsortを並列化したqsort_syncでは、第4引数が同期用の変数として逐次 版に対して追加されています。それ以外の引数はqsortと同じです。

まず、プロセスを生成して、並列に行うべきかどうかを判断します。(1)の pards_avail_processesは、現在システムで利用可能なプロセス数を返します。 このプロセス数は、pards_init()呼出時に引数として指定することができます。 また、処理量が充分で無い場合は、プロセスを生成して並列に行うよりも、逐 次に行った方が高速です。これを(2)で判断しています。逐次で実行(3)した後 は、(4)で終了通知を行います。ここで、第4引数に値をwriteしています。こ れにより、範囲のソートが終了したことを呼出側に伝えることができます。

次に並列に行う場合ですが、まず同期用の変数を確保します(5)。そして、担 当範囲の分割をまず行います(6)。

分割した範囲に対して、いよいよ並列に処理を行います。まず分割した左側に ついて、新しいプロセスを生成し、再帰呼出を行います(7)。ここで、SPAWNで はなく、SPAWN_Fというマクロが用いられていることに注意してください。 SPAWN_Fマクロは、SPAWNマクロと異なり、利用可能できるプロセス数を超過し ている場合、関数として呼出を行います。((1)で利用可能なプロセス数を調べ ていますが、調べた後に0になっている可能性があります。)

(プログラムの意味として、SPAWNとSPAWN_Fは異ります。SPAWNは利用可能なプ ロセスが無い場合、例外を投げますが、SPAWN_Fは関数として呼び出されるた め、プロセス数によっては大域的な変数を変更/参照してしまう可能性があり ます。)

(8)で右半分について、再帰呼び出しを行います。ここではプロセスのフォー クを行いませんが、再帰呼び出しを行ったqsort_syncの中で、プロセスのフォー クが行われる可能性があるため、同期を取る必要があります。

左右のソートについて、(9), (11)で終了を待ち合わせ、終了を確認した後、 (13)で呼び出し元に終了を通知します。

上記では、簡単にした例でプログラムを説明しましたが、samplesディレクト リにあるqsort.ccでは、もう少し複雑で、例外のキャッチを行っています。こ れは、同期変数を大量に確保した結果、OSが提供するセマフォの利用可能数を 越えてしまう可能性があるためです。特にLinux系では、デフォルトで利用可 能なセマフォの数が128と少ないため(変更は可能)、簡単にこの制限を越えて しまいます。

性能評価

Quick Sortは高速なアルゴリズムなため、充分に大きなサイズでないと、 測定可能なほどの時間がかかりません。以下に、ソート対象のデータサイズを 10,000,000, min_sizeを1,000,000とした際の逐次版と並列版の実行時間を示 します。(このサイズで実験するためには、Linuxマシンでは、OSが許す最大共 有メモリサイズをデフォルトから変更する必要があると思います。これについ ては後述します。)

マシン 逐次版(sec) 並列版(sec) 並列 度/理想値
IA64/HP-UX 11.23 (2CPU) 5.99 3.23 1.85/2
DualCoreXeon/FedoraCore6 (2*2CPU) 1.40 0.5 2.8/4
Xeon/FedoraCore5 (2CPU) 3.6 1.73 2.08/2(HT)

本アルゴリズムでは、1段分のpartition処理はどうしても逐次に行なわれるこ とになりますが、単純な並列化にもかかわらずおおむね良好な並列度が得られ ています。

2CPUのXeonで2倍以上の速度向上が得られているのは、Hyper Threadingがサポー トされており、論理的には4CPUに見えていたからだと思われます。

bzip2

実用的な例として、本ライブラリを用いてbzip2の並列化を行いました。bzip2 はファイルを圧縮するプログラムですが、同時にライブラリとしてのAPIも提 供しています。今回は、このAPIを利用して、対象とするファイルを分割して 圧縮する方法で並列化を行いました。(この手法では、残念ながら逐次版と全 く同じバイナリが生成されるわけではありませんが、bunzip2で正常に展開す ることができます。また、今回は圧縮だけを並列化しています。)

bzip2を並列化する試みは他にもなされていますが(参考)、多くがpthreadを用 いるなど、大幅なプログラムの変更を行っています。 (並列化の手法は同様なものと、ライブラリ内部まで修整して逐 次版と同じバイナリを生成するようにしたものがあります。) ここでは、本ライブラリを用いることで、簡単に並列化を行うことができること を示したいと思います。

まず、bzip2でファイルを圧縮する部分を見てみます。

void compressStream ( FILE *stream, FILE *zStream )
{
   ...

   bzf = BZ2_bzWriteOpen ( &bzerr, zStream, 
                           blockSize100k, verbosity, workFactor );   
   ...
   while (True) {
      if (myfeof(stream)) break;
      nIbuf = fread ( ibuf, sizeof(UChar), 5000, stream );
      ... 
      if (nIbuf > 0) BZ2_bzWrite ( &bzerr, bzf, (void*)ibuf, nIbuf );
      ... 
   }
   BZ2_bzWriteClose64 ( &bzerr, bzf, 0, 
                        &nbytes_in_lo32, &nbytes_in_hi32,
                        &nbytes_out_lo32, &nbytes_out_hi32 );
   ...
}

この関数は、streamで読みこんだファイルストリームを圧縮し、zStreamに出 力します。圧縮するためには、BZ2_bzWriteOpenで初期化を行い、あとはfread で読みこんだ内容を、BZ2_bzWriteに入れるだけです。最後に BZ2_bzWriteClose64でストリームを閉じています。

これらのAPIは逐次で圧縮する場合は単純で良いのですが、このままでは並列 化が困難です。そこで、BZ2_bzBuffToBuffCompressというメモリの内容を圧縮 するAPIを利用することにします。

基本的な並列化戦略は、以下のようにします:

まず、上記のcompressStreamを以下のように変更します:

void compressStream ( FILE *stream, FILE *zStream )
{
   ...
   pards_init();                        // (1) 初期化

   ...
   SyncQueue<Block> *rs;                // (2) 読出データ用キュー
   SyncQueue<Block> *cs;                // (3) 圧縮データ用キュー

   rs = new SyncQueue<Block>(pards_get_nprocs()); // (4) 最初のセルの確保:
   cs = new SyncQueue<Block>(pards_get_nprocs()); // (5) キューサイズはCPU数に

   SPAWN(readStream(stream, rs));       // (6) データ読み出しプロセスをSPAWN
   SPAWN(myCompressStream(rs,cs));      // (7) 圧縮プロセスをSPAWN
   writeStream(zStream, cs);            // (7) 書きこみ

   pards_finalize();                    // (9) 終了処理

   return;
}

まず、(1)で初期化を行います。そして、(2), (3)でデータ読み出したデータ を保存するためのキューと、圧縮データを保存するためのキューの最初のセル へのポインタを作成します。

ここで、キューの要素の型は"Block"としていますが、これは以下のように定 義されています。

struct Block
{
  UChar* buf;
  int size;
};

最初の要素は、対象とするデータへのポインタで、2番目の要素はデータサイ ズになります。ここで、対象とするデータは複数のプロセスから参照されるた め、ポインタの指す先は共有メモリ領域になります。

つぎに、(4), (5)でキューの最初の要素を確保します。ここで、コンストラク タにキューサイズを指定する必要がありますが、ここではCPU数を用いていま す。これは、圧縮を行うプロセスの数を、CPU数程度に抑えるためです。

CPU数を求めるために、本ライブラリは"pards_get_nprocs()"という関数を提 供しています。この関数は、現在利用可能なCPU数を返します。

そして、先ほど説明した戦略の通り、(6) (7) でデータ読み出しプロセスと圧 縮プロセスをSPAWNします。書き込みは親プロセスで行い(8)、書き込み終了後、 ライブラリの終了処理を行います(9)。

次に、読み出しプロセスのreadStream関数を見てみましょう。

void readStream(FILE* stream, SyncQueue<Block> *rs)
{

  int bufsize;
  Block blk;
  SyncQueue<Block> *crnt;

  bufsize = blockSize100k * 100000;           // (1) 1ブロックのサイズ
  
  crnt = rs;
  while (True) {
    blk.buf =                                 // (2) 共有メモリを確保
      (UChar*)pards_shmalloc(bufsize * sizeof(UChar)); 
    blk.size =                                // (3) ファイルから読み出し
      fread (blk.buf, sizeof(UChar), bufsize, stream); 
    crnt->write(blk);                         // (4) キューにwrite
    if (myfeof(stream)){
      crnt->writecdr(0);                      // (5) EOFならキューを終端
      return;
    } else {
      ... // エラー処理
    } else {
      crnt = crnt->create();                  // (6) 次のブロックを確保
    }
  }
 }

まず、(1)でブロックのサイズを設定します。ここで、"blockSize100k"という 変数は、bzip2の大域変数で、bzip2の起動時にオプションで渡されるブロック サイズです。bzip2では、このサイズを単位として圧縮を行います。 並列化もこのサイズを単位として行うことにします。

(2)でpards_shmallocを使って、共有メモリを確保します。サイズは先ほどの ブロックサイズです。確保した領域へのポインタは、Block構造体の中に保存 します。

次に、(3)でfreadを使って、ブロックサイズ分、ファイルからデータを読み出 します。読み出したサイズは、やはりBlock構造体の中に保存します。

読み込みが終了したら、(4)でBlock構造体のデータをキューにwriteします。 これにより、圧縮プロセスが読み込んだデータに対して圧縮を開始します。

読み込んだデータがファイルの最後の場合、キューを終端します(5)。そうで ない場合、次のブロックの読み出しのため、新しいセルを確保します(6)。こ こでは省略記法を使っているので、crntのcdrに確保したセルをセットした上 で、新しいセルをcrntにセットしています。

以上で、キューへの読み込みが完了しました。次に圧縮プロセスを見てみましょ う。

void myCompressStream(SyncQueue<Block> *rs, SyncQueue<Block> *cs)
{
  SyncQueue<Block> *crntrs, *nxtrs;
  SyncQueue<Block> *crntcs;

  crntrs = rs;
  crntcs = cs;
  while (True){
    nxtrs = crntrs->readcdr();        // (1) 読み込み用キューからcdrを取り出し
    SPAWN(myCompress(crntrs,crntcs)); // (2) 現在のセルが指すブロックを圧縮
    if(nxtrs == 0){                   // (3) 読み込み用キューが終端なら
      crntcs->writecdr(0);            //     圧縮用キューを終端
      break;
    } else {
      crntrs = nxtrs;                 // (4) 読み込み用キューをcdrにセット
      crntcs = crntcs->create();      // (5) 次の圧縮用セルをセット
    }
  }
}

(1)で読み込み用キューのcdrを取り出しています。これは、(2)で現在のセル が指すブロックを圧縮するのですが、その中で現在の読み込み用セルを解放す るためです。そのため、解放される前にあらかじめcdrを取り出しておきます。

(2)では、現在のセルが指すブロックを圧縮するプロセスをSPAWNします。

(3)で、読み込み用キューが終端かどうかをチェックします。終端の場合、圧 縮用キューも終端します。

終端では無い場合、(4)で先ほど保存しておいた読み込み用キューのcdrを次の ループで処理する変数としてセットします。また、(5)で、次の圧縮用キュー のセルを確保し、cdrにセットします。

それでは、ブロックを圧縮するプロセスを見てみましょう。

void myCompress(SyncQueue<Block> *rs, SyncQueue<Block> *cs)
{
  Block rblk,cblk;                   // (1) 読み込み、圧縮データ用ブロック
  ...

  rblk = rs->read();                 // (2) 読み込み用キューからデータ取り出し
  bufsize = blockSize100k * 110000;  // (3) 圧縮データ用メモリ確保
  cblk.buf = (UChar*)pards_shmalloc(bufsize * sizeof(UChar));
  size = bufsize;
  
  // (4) bzip2が提供するAPIを利用して圧縮処理
  bzerr = BZ2_bzBuffToBuffCompress((char*)cblk.buf, &size,
				   (char*)rblk.buf, rblk.size,
				   blockSize100k, verbosity, workFactor);  
  cblk.size = size;                  // (5) 圧縮したデータサイズ

  ... // エラーチェック

  cs->write(cblk);                   // (6) 圧縮用キューにデータをwrite
  pards_shmfree(rblk.buf);           // (7) 読み込み用キューの共有メモリを削除
  delete rs;                         // (8) 読み込み用キューのセルを削除
}

まず(1)で読み込み、圧縮データ用のBlock構造体を確保します。 次に(2)で、読み込み用キューからBlock構造体のデータをreadします。

(3)で、データ読み出しの際と同様、圧縮したデータ用に共有メモリを確保し ます。確保したメモリへのポインタは、圧縮データ用Blockの中に保存してお きます。

いよいよ(4)で、実際の圧縮処理を行います。圧縮処理は、bzip2が提供する、 BZ2_bzBuffToBuffCompressというAPIを利用します。このAPIはメモリ上のデー タを圧縮して別のメモリ上に書き込みます。それぞれのメモリへのポインタを 引数に設定し、このAPIを呼び出します。

圧縮したサイズを圧縮データ用Block構造体にセットし(5)、このBlock構造体 を圧縮用キューにwriteします(6)。これで、読み込み用のデータはもう不要に なりましたから、(7)でまず共有メモリを解放します。そして、 (8)でキュー のセルを解放します。

それでは最後に書き込み用処理を見てみましょう。

void writeStream(FILE* zStream, SyncQueue<Block> *cs)
{
  Block blk;
  SyncQueue<Block> *crnt;

  crnt = cs;
  while (True){
    blk = crnt->read();     // (1) ブロックの読み出し
                            // (2) ファイルに書き出し
    int s = fwrite(blk.buf, sizeof(UChar), blk.size, zStream); 
    ... // エラー処理
    pards_shmfree(blk.buf); // (3) 書き出したブロックのメモリ解放
    crnt = crnt->release(); // (4) セルの解放とcdrの読み出し
    if(crnt == 0) break;    // (5) 終端チェック
  }
  ...
}

この処理の動作は単純です。まず(1)で圧縮用キューからブロックを読み出し ます。そして、ブロック内にあるポインタとデータサイズを用いてfwriteでファ イルに書き込みます(2)。

書き込んだ後は、ブロック内のデータは不要ですから、(3)でまず共有メモリ の解放を行います。そして、(4)でセルの解放とcdrの読み出しを省略記法 (release)を用いて記述しています。

最後に(5)で終端チェックを行い、終端の場合ループを終了します。

以上で、bzip2の並列化が終了しました。このように、実用的なサイズのプロ グラムであっても、比較的単純に見通しよく並列化が可能だったことがわかる と思います。

性能評価

逐次版のbzip2と性能比較を行ないました。圧縮対象のファイルは、(開発中の) 本ライブラリをtarしたものを4度catして、10MBほどのサイズにしたものです。 ブロックサイズはデフォルト(900k)、出力は/dev/nullにリダイレクトして測 定しました。

マシン 逐次版(sec) 並列版(sec) 並列 度/理想値
IA64/HP-UX 11.23 (2CPU) 14.76 8.96 1.64/2
DualCoreXeon/FedoraCore6 (2*2CPU) 3.79 1.43 2.65/4
Xeon/FedoraCore5 (2CPU) 8.96 5.55 1.61/2(HT)

Quick Sortの場合よりは若干並列化効率は減少していますが、それでも並列化 によって十分に高速化できていることがわかります。

WorkPool版

以上は、SyncQueueだけを使って並列化しました。SPAWNが呼ばれる回数自体は 10MB/900kbで11回程度とそれほど大きくはありませんが、WorkPool版での実装 も試してみます。

これまでの版とWorkPool版にはそれほど大きな違いはありません。 まず、compressStreamを見てみます。(並列化したファイルはwpbzip2.cとして 同じディレクトリにあります。)

   pards_init();

   SyncQueue<Block> *rs;
   SyncQueue<Block> *cs;

   int nprocs = pards_get_nprocs(); 

   rs = new SyncQueue<Block>(nprocs);
   cs = new SyncQueue<Block>(nprocs);

   SPAWN(readStream(stream, rs));

                                      // (1) WorkPoolの定義
   WorkPool<Block,Block> *pool = new WorkPool<Block,Block>(rs,cs,nprocs);
   for(int i = 0; i < nprocs; i++){   // (2) workerのSPAWN
     SPAWN(myCompressStream(pool));
   }
   writeStream(zStream, cs);

   delete pool;                       // (3) WorkPoolの削除
   
   pards_finalize();

主な変更点は以下のとおりです。

まず(1)でWorkPool型変数の定義を行っています。コンストラクタの引数には、 上で定義した入力用のSyncQueue rsと、圧縮用のSyncQueue cs, それにworker の数として、プロセッサ台数を与えています。

(2)でworkerのSPAWNを行っています。数はプロセッサ台数分で、 myCompressStreamをworkerが実行する関数としています。myCompressStreamは WorkPoolを使わない版から改変したものです。

最後に(3)でWorkPool型変数のdeleteを行っています。(直後にfinalizeしてい るので、冗長な処理ではありますが。)

readStream, writeStreamはこれまでの版から変更はありません。次に変更を 行ったmyCompressStreamとcompressStreamについて説明します。

void myCompressStream(WorkPool<Block, Block>* pool)
{
  while (True){
    WorkItem<Block, Block> item = pool->getwork(); // (1) WorkItemの取得
    if(item.output == 0) break;                    // (2) 終端チェック
    else
      myCompress(item);                            // (3) 圧縮処理
  }
  pool->release();                                 // (4) poolのリリース
}

myCompressStreamはシンプルになりました。コメントの通り、(1)でgetwork() を用いてWorkItemを取得し、(2)で終端のチェック、(3)でWorkItemを myCompressに渡して圧縮処理を行います。終端に達してループを抜けた後、 poolのリリースを(4)で行います。

次に、myCompressを見てみましょう。

void myCompress(WorkItem<Block, Block> item)
{
  Block rblk, cblk;
  ...

  rblk = item.work;       // (1) WorkItemから処理対象のBlockを取得
  bufsize = blockSize100k * 110000;
  cblk.buf = (UChar*)pards_shmalloc(bufsize * sizeof(UChar));
  size = bufsize;

  bzerr = BZ2_bzBuffToBuffCompress((char*)cblk.buf, &size,
				   (char*)rblk.buf, rblk.size,
				   blockSize100k, verbosity, workFactor);  
  cblk.size = size;

  ... // エラーチェック

  item.output->write(cblk); // (2) 圧縮結果を出力用セルにwrite
  pards_shmfree(rblk.buf);
}

こちらはほとんど変わりません。(1)でWorkItemから処理対象のBlockをrblkに 取り出します。圧縮後は(2)でWorkItemの出力用セルに結果をwriteしています。

性能評価

WorkPool版のbzip2の性能評価を行いました。条件はこれまでの版と同じです。

マシン 逐次版(sec) WorkPool版(sec) 並列 度/理想値
IA64/HP-UX 11.23 (2CPU) 14.76 8.88 1.9/2
DualCoreXeon/FedoraCore6 (2*2CPU) 3.79 1.57 2.41/4
Xeon/FedoraCore5 (2CPU) 8.96 5.23 1.71/2(HT)

これまでの版より若干速くなっているものもありますが、遅くなっているもの もあります。プロセス数自体が少ないため、ほとんど差がでなかったものと考 えられます。

効果を見るために、DualCoreXeon (4CPU)マシンで、オプションに-1を指定し、 1ブロックのサイズを900kbから100kbにすることでプロセス数を増加させて測 定してみました。この場合、1ブロックの処理量はサイズが減る以上に減少し ますので、1プロセス当たりの粒度は小さなものになります。

この場合、逐次版で1.79sec, これまでの版で0.746sec, WorkPool版で 0.573secと速度向上が見られます。

また、性能差はキューの長さを変更することなどでも起こります。(例えば HP-UXのマシンでは、キュー長を倍にすると、1割程度速くなりました。) これは、キャッシュやreadStream, writeStreamプロセスのスケジューリング が影響するからだと考えられます。両バージョンともまだ最適化の余値はある かも知れません。

GTK+によるGUI

並行処理サポートで述べた機能を用いて、GUIプログ ラムを作成する際のヘルパーライブラリを作成しました。gtkディレクトリの libgtkhelper.cc, libgtkhelper.hがライブラリ、ライブラリを用いたサンプ ルがgtksample.ccにあります。サンプルを用いて、ヘルパーライブラリの使い 方について説明します。

このサンプルプログラムでは、フォークしたプロセスが生成する値を受け取り、 それを表示します。プロセスが値を生成するには時間がかかりますが、その間 もGUIは入力を受けつけます。また、フォークしたプロセスをキャンセルする こともできます。

まず、mainルーチンの関連する部分について説明します。

  pards_init();

  SyncQueue<Callback*> *c = new SyncQueue<Callback*>(4);// (1) メッセージ用キュー
  
  
  gtkhelper_init(c);      // (2) ヘルパライブラリの初期化

  Merger<Callback*> *m = new Merger<Callback*>(c,2);    // (3) メッセージ用Mergerの作成
  

  ISPAWN(w1,worker(1,m)); // (4) 値を生成するプロセス1を作成
  ISPAWN(w2,worker(2,m)); // (5) 値を生成するプロセス2を作成

まず、(1)でメッセージ用のキューの最初のセルを作成します。Callback*型を 指定していますが、Callback型はヘルパライブラリの中で定義されています。 (ポインタ型であることに注意してください。これは、実際にはCallback型を 継承した型を利用するためです。)

(2)で作成したキューの最初のセルを引数として、ヘルパライブラリを初期化 します。これにより、キューに値が入れられるたびに、Callback型変数の run()メソッドが呼ばれるように初期化されます。

これには、「指定されたファイルディスクリプタが読み込み可能になるのをイ ベントループ中で待つ」というGTK+が提供する機能を利用しています。 すなわち、キューを(ブロッキングで)読むプロセスをフォークし、かつそのプ ロセスがパイプに値を入れることで、イベントループ中で処理関数を呼び出す ようにしています。

また、Callback型変数のrun()メソッドは、実際にはCallback型変数から継承 し、プログラマが指定します。これについては後述します。

(4), (5)で値を生成するプロセスを作成します。これらのプロセスは、Merger のインスタンスを引数として取り、Callback型変数を継承した変数へのポイン タをMergerにputします。

次に、生成するプロセスworkerの動作の動作について説明します。まず、 workerの中で使っている、MyCallbackクラスを説明します。

class MyCallback : public Callback
{
public:
  int val;         // (1) イベントループに伝える変数
  int id;          // (2) 設定したプロセスのid

  void run(){      // (3) run()メソッドの再定義
    char str[1000];
    if(val == -1)
      sprintf(str,"Process %d is canceled\n",id);               // (4) キャンセルされた際の文字列
    else
      sprintf(str,"Message from Process %d, val = %d\n",id,val);// (5) 通常時のの文字列
    
    if(currentpos + strlen(str) > NCHARS + labelchars) return;
    else{
      strcpy(currentpos,str);
      currentpos += strlen(str);
      gtk_label_set_text(GTK_LABEL(label),labelchars);          // (6) ラベルを変更
    }
  }
};

ここで定義したMyCallbackクラスのインスタンスへのポインタを、Mergerに putすることになります。イベントループ側では、putされるたびに、再定義さ れたrun()メソッドが呼ばれることになります。

まず、(1)、(2)で、コールバックの実行に必要なインスタンス変数を定義して います。ここでは生成した値と、それを生成したプロセスの値を入れています。

(3)はrun()メソッドの再定義です。(6)でGTK+の関数を呼び出し、ラベルの文 字列を変更しています。文字列は、キャンセルされた場合は(4)で、通常時は (5)で設定されます。プロセスがキャンセル(実装としてはインタラプト)され た場合は、valに-1を入れることでコールバックで判定できるようにしていま す。

次に、値を生成するプロセスの定義です。

void worker(int id, Merger<Callback*> *c)
{  
  for(int i = 0; i < 5; i++){
    int r = simulate_slow_read();     // (1) 値を生成
    MyCallback* val = new MyCallback; // (2) MyCallbackインスタンス生成
    val->id = id;                     // (3) idの設定
    if(pards_is_interrupted()){       // (4) インタラプトがあったかチェック
      val->val = -1;                  // (5) インタラプトがあれば値を-1に
      c->put(val);                    // (6) Mergerにput
      c->release();                   // (7) Mergerのrelease()
      return;                         // (8) プロセスの終了
    } else {
      val->val = r;                   // (9) インタラプトが無ければ読んだ値にセット
      c->put(val);                    // (10) Mergerにput
    }
  }
  c->release();                       // (11) Mergerのrelease()
}

(1)で、伝えるべき値を読み込んでいます。サンプルではsleepを使っています が、実際にはここがsocketからのデータ読み出し等に相当します。

(2)で先ほど定義したMyCallbackのインスタンスを生成し、(3)でそこに自プロ セスのidをまず入れています。

(4)ではインタラプトがあったかどうかをチェックしています。インタラプト があれば、(5)でvalを-1にセットした上で、(6)でMergerにputし、(7)で Mergerのリリース、そして(8)でreturnし、プロセスは終了します。

インタラプトがなければ、(9)で読み込んだ値にvalをセットし、(10)でMerger にputします。ループが終了すれば、(11)でMergerのreleaseを行い、プロセス が終了します。

その他は、ほとんど通常のGTK+のプログラムと同様です。

このようにすることで、workerが値をputするたびに、labelの文字列が変更さ れます。また、このサンプルプログラムでは、ボタンを押すとlabelの文字列 を変更するボタン、および、プロセスをキャンセルするボタンも用意されてい ます。

このように、ヘルパライブラリのユーザは、Callback型を継承してイベントルー プに実行して欲しいコールバックを定義することで、簡単に複数のプロセスと の通信を実現できます。Mergerには、Callback型を継承したクラスのインスタ ンスへのポインタとして任意のものを入れられるので、別のコールバックを同 時に利用することも可能です。

リファレンス

以下に、本ライブラリが提供する関数、クラスのリファレンスをまとめます。

大域関数

本ライブラリが提供する大域関数を利用するには、libpards.hをインクルード する必要があります。ライブラリ本体は、libpards.aとしてコンパイルされま す。

pards_init

初期化用関数です。 void pards_init(int ap = NUM_OF_PROCS, unsigned bytes = NALLOC_BYTES); と宣言されています。第1引数、第2引数は省略できます。

第1引数はライブラリ内で利用するプロセス数を渡します。現在実行中の、 SPAWNでforkされたプロセスの数がこの値にまで達しており、さらに新たなプ ロセスをSPAWNでforkしようとすると、例外が発生します。また、SPAWN_Fが使 われている場合は、プロセスをフォークせず関数として実行します。デフォル トは16です。

第2引数は確保する共有メモリのサイズを指定します。現在の実装では、共有 メモリの確保はあらかじめ行われ、pards_shmallocではその中からメモリを確 保する形で実装されています。したがって、プログラム中で大量の共有メモリ を必要とする場合には、この引数で指定してください。デフォルトは16MBです。

(あらかじめ確保した共有メモリが不足する場合、pards_shmalloc内で共有メ モリの確保を行うための実装も行っていますが、Linuxなどの場合、うまく動 作しないため、現在は無効にしています。これについては後述します。)

pards_finalize

ライブラリの終了処理用関数です。ライブラリの利用を終了する際には、必ず 呼び出す必要があります。void pards_finalize(int wait = DO_WAIT)と宣言 されていますが、引数は指定する必要はありません。

この関数の中では、確保した共有メモリ、セマフォの解放を行います。したがっ て、この関数を呼び出さなかった場合、共有メモリとセマフォが解放されない まま残ってしまいます。ipcsというOSが提供するコマンドを利用すると、現在 システム上にあるこれらのリソースの様子を見ることができますので、終了処 理を行わずにプログラムが終了してしまった場合は、ipcsで確認の上、ipcrm というコマンドでリソースを削除して下さい。

SPAWN(func), SPAWN_F(func), ISPAWN(pid, func)

これらは、funcで表された関数をプロセスとして呼び出すマクロです。

pards_initで指定したプロセス数を越えてプロセスをforkしようとした場合、 SPAWNは例外をthrowします。例外は、PardsExceptionを継承した、 ForkException型です。 SPAWN_Fの場合は、例外をthrowせず、関数として実行します。

ISPAWNの場合は、pidに生成したプロセスのpidが入ります。ポインタで はなく、変数をそのまま指定します(マクロで実現しているため)。 pidの型は、pards_pidです。現在の実装では、pid_tと等価ですが、将来の拡 張を考え、別の型としています。

pards_shmalloc, pards_shmfree

共有メモリを確保、解放します。void* pards_shmalloc(unsigned), void pards_shmfree(void*)と宣言されています。pards_shmalocの引数には、確保 したいメモリのバイト数を渡します。確保できなかった場合、0を返します。 pards_shmfreeの引数には、pards_shmallocで返された値を渡す必要がありま す。alloc, freeには、K&Rで紹介されているmalloc/freeのアルゴリズム を利用しています。

pards_get_nprocs

システムで現在利用可能なCPU数を返します。int pards_get_nprocs()と宣言 されています。本関数は、Linux (sysconf(_SC_NPROCESSORS_ONLN)), HP-UX (mpctl(MPC_GETNUMSPUS,NULL,NULL)), FreeBSD(sysctl, 未テスト)のみ対応し ています。それ以外のシステムでは、1を返します。

pards_avail_processes

現在利用可能なプロセス数を返します。この値は、最初pards_init()で指定さ れた値で、プロセスがSPAWNされるたびに減少します。またSPAWNされたプロセ スが終了すると増加します。

pards_set_error_level

ライブラリが出力するメッセージのレベルを制御します。 int pards_set_error_level(int)と宣言されており、引数には、DBG, INFO, CRITICAL, FATAL, NO_PRINTが指定できます(これらは、ヘッダファイ ル内で定義されています)。この順で出力されるメッセージが少なくなります。 NO_PRINTではメッセージは出力されません。リリース版のソフトなどで、ライ ブラリがエラーメッセージを出さないようにしたい場合は、この関数を利用し ます。

返り値は現在の設定レベルです。現在の設定レベルは、 pards_get_error_level()でも得ることができます。

void pards_send_interrupt(pards_pid)

引数に与えられたpidを持つプロセスにインタラプトを送ります。pidはISPAWN によって得ます。

インタラプトを受けたプロセスは、timed/intrがついたメソッドを実行中の場 合はブロックから抜けます。また、シグナルで実現しているため、 ブロックするシステムコールを実行中にインタラプトがあった場合、そのシス テムコールはブロックから抜けます。この場合、通常、システムコールが成功 したかどうかを表す返り値が-1となり、errnoがEINTRを返します(システムコー ルの種類に依存します)。

int pards_is_interrupted()

インタラプトを受けたかどうかを返します。受けた場合は1, 受けていない場 合は0を返します。

void pards_clear_interruption()

インタラプトを受けた場合でもその状態をクリアし、 (再度インタラプトを受けるまで)次のpards_is_interrupted()呼び出しで0が 返るようにします

PBInfo* pards_begin_parallel(int)

parblock.hをインクルードして使います。SPMDスタイルでの並列処理記述をサ ポートします。

引数に与えられた数のプロセスをフォークし、pards_end_parallelまでフォー クした複数のプロセスで同じプログラムを実行します。実際には、プロセス番 号で分岐することで、並列処理を実現します。

PBInfo型の変数には、並列ブロックの状態が保存されています。getpno()とい うメンバ関数を呼び出すことで、プロセスの番号を知ることができます。ここ でのプロセスの番号は0から始まり、pards_begin_parallelの引数に与えた数- 1までとなります。

void pards_end_parallel(PBInfo*)

parblock.hをインクルードして使います。pards_begin_parallelの返り値に与 えられたPBInfoを引数にして呼び出すことで、全てのプロセスがこの文を実行 するまで待ち並列ブロックを終了します。

void pards_barrier(PBInfo*)

parblock.hをインクルードして使います。pards_begin_parallelの返り値に与 えられたPBInfoを引数にして呼び出すことで、全てのプロセスがこの文を実行 するまで待ちます。

Sync

Sync.hをインクルードして利用します。また、プログラムはlibpards.aとリン クする必要があります(libpards.hはSync.hの中からインクルードされます)。 また、これはclass templateなので、メンバ関数の定義もSync.hの中に記述されてい ます。

コンストラクタ

コンストラクタに引数は必要ありません。コンストラクタ中では、値を保存す るための共有メモリおよび、同期のためのセマフォが確保されます。また、 スタック上にではなく、newを用いて生成した場合は、Sync<T>そのもの も共有メモリ上に確保されます。

デストラクタ

スタック上に確保した場合、コンストラクタで確保された共有メモリ、セマ フォを解放するため、後述するfree()を明示的に呼び出す必要があります。こ れは、この変数を共有する複数のプロセスのうち、ひとつだけから呼び出す必 要があります。

また、newで確保した変数の場合、deleteで解放することで、内部からfreeが 呼び出されます。また、変数は共有メモリ上にあるため、deleteはこの変数を 共有する複数のプロセスのうち、ひとつだけから呼び出す必要があります。

void write(T val)

変数に値をセットします。これにより、read()で値がセットされるのを待って いるプロセスがあった場合、動作を再開します。すでに値がセットされていた 場合、値の変更は行いません。

T read()

変数から値を取り出します。値が未セットの場合は、プロセスがブロックしま す。

int is_written()

値がセットされているかどうかを確認します。セットされている場合は1を、 されていない場合は0を返します。

T timedread(struct timeval *, pards_status*)

read()と同様ですが、第1引数で与えられた時間以上はブロックしません。第1 引数で与えられた時間が経過すると、ブロックから抜け、pards_statusに TIMEOUTを返します。 ブロック中にインタラプトを受けた場合は、INTERRUPTを返します。 処理が成功した場合はSUCCESSを返します。

T intrread(pards_status*)

read()と同様ですが、ブロック中にインタラプトを受けた場合は、ブロックか ら抜け、pards_statusとしてINTERRUPTを返します。インタラプトを受けなかっ た場合は、SUCCESSを返します。

void free()

確保した共有メモリ領域およびセマフォを解放します。この処理は、共有する 複数のプロセスで1回だけ行う必要があります。

SyncList

SyncList.hをインクルードして利用します。その他の使い方はSync.hと同様で す。Syncを継承しているため、write, read, freeはそのまま利用できます。

コンストラクタ

Syncと同様ですが、SPAWNで新たなプロセスをforkしてからListをつなぐ際に は、スタック上に確保した変数を繋いではいけません。そのメモリ領域は別プ ロセスからは見えないからです。newで確保した場合は、共有メモリ上に取ら れるため、newを使います。

(引数無しのコンストラクタの他に、引数にSyncList<T>*型をとるコンストラク タも利用できます。この場合、「引数のcdrに確保したSyncList<T>型の変数を writecdrした上で、確保したSyncList<T>型の変数へのポインタを返す」 という処理を行います。この機能はcreateメンバ関数で代用できるため、将来 削除される可能性があります。)

void writecdr(SyncList<T>*)

cdrに値をセットします。これにより、readcdr()で値がセットされるのを待って いるプロセスがあった場合、動作を再開します。すでに値がセットされていた 場合、値の変更は行いません。

SyncList<T> *readcdr()

cdrを取り出します。値が未セットの場合は、プロセスがブロックします。

int is_cdr_written()

cdrに値がセットされているかどうかを確認します。セットされている場合は1 を、されていない場合は0を返します。

SyncList<T> *timedreadcdr(struct timeval*, pards_status*)

readcdrと同様ですが、第1引数で与えられた時間以上はブロックしません。第1 引数で与えられた時間が経過すると、ブロックから抜け、pards_statusに TIMEOUTを返します。 ブロック中にインタラプトを受けた場合は、INTERRUPTを返します。 処理が成功した場合はSUCCESSを返します。

SyncList<T> *intrreadcdr(pards_status*)

readcdr()と同様ですが、ブロック中にインタラプトを受けた場合は、ブロックか ら抜け、pards_statusとしてINTERRUPTを返します。インタラプトを受けなかっ た場合は、SUCCESSを返します。

SyncList<T> *release()

「cdrを取り出して、操作対象のSyncList<T>変数をdeleteした後、取り出し たcdrを返す」という処理を行います。

SyncList<T> *timedrelease(struct timeval*, pards_status*)

releaseと同様ですが、第1引数で与えられた時間以上はブロックしません。第1 引数で与えられた時間が経過すると、ブロックから抜け、pards_statusに TIMEOUTを返します。 ブロック中にインタラプトを受けた場合は、INTERRUPTを返します。 処理が成功した場合はSUCCESSを返します。

SyncList<T> *intrrelease(pards_status*)

release()と同様ですが、ブロック中にインタラプトを受けた場合は、ブロックか ら抜け、pards_statusとしてINTERRUPTを返します。インタラプトを受けなかっ た場合は、SUCCESSを返します。

SyncList<T> *create()

「新たにSyncList<T>変数を作成し、対象セルのcdrに接続した上で、新 たに作成したSyncList<T>変数を返す」という処理を行います。

SyncQueue

SyncQueue.hをインクルードして利用します。その他の使い方はSync.h, SyncList.hと同様です。SyncListを継承しており、write, read, free, writecdr, readcdr, release, createは同様の仕様を持つものとして利用でき ます。

ただし、同じSyncQueue<T>変数を複数のキューのcdrにすることはでき ません。

SyncQueueはSyncListと異なり、writecdr, createは、呼び出し時のキューの サイズがコンストラクタに指定されたキューの最大長(実際にはwritecdr可能 な回数)の場合はブロックします。 キューのサイズが減少した所で、ブロックから復帰します。

コンストラクタ

SyncListと同様ですが、最初のセルの確保時には、Queueの最大長(writecdr可 能な回数)を引数に指定する必要があります。

void timedwritecdr(SyncQueue<T>*, struct timeval*, pards_status*)

writecdrと同様ですが、第1引数で与えられた時間以上はブロックしません。第2 引数で与えられた時間が経過すると、ブロックから抜け、pards_statusに TIMEOUTを返します。 ブロック中にインタラプトを受けた場合は、INTERRUPTを返します。 処理が成功した場合はSUCCESSを返します。

void intrwritecdr(SyncQueue<T>*, pards_status*)

writecdr()と同様ですが、ブロック中にインタラプトを受けた場合は、ブロックか ら抜け、pards_statusとしてINTERRUPTを返します。インタラプトを受けなかっ た場合は、SUCCESSを返します。

SyncQueue<T> *timedcreate(struct timeval*, pards_status*)

createと同様ですが、第1引数で与えられた時間以上はブロックしません。第1 引数で与えられた時間が経過すると、ブロックから抜け、pards_statusに TIMEOUTを返します。 ブロック中にインタラプトを受けた場合は、INTERRUPTを返します。 処理が成功した場合はSUCCESSを返します。

SyncQueue<T> *intrcreate(pards_status*)

create()と同様ですが、ブロック中にインタラプトを受けた場合は、ブロックか ら抜け、pards_statusとしてINTERRUPTを返します。インタラプトを受けなかっ た場合は、SUCCESSを返します。

WorkPool

WorkPool.hをインクルードして使います。SyncListまたはSyncQueueと同時に 使われることが前堤です。SyncQueueを利用する場合は、SyncQueue.hをインク ルードする必要があります。また、libpards.aとリンクする必要があります。

コンストラクタ

WorkPool<T1, T2>型のコンストラクタは、「仕事」のリストが入った SyncQueue<T1>*またはSyncList<T1>*型の変数を第1引数とし、 出力先のリストとして、SyncQueue<T2>*またはSyncList<T2>*型 の変数を第2引数とします。

release()でWorkPool<T1, T2>型変数が持つ資源を解放する際は、 workerが終了したことを確認するため、参照するworkerの数を第3引数として 与えます。これは省略可能です。

デストラクタ

Syncの場合と同様です。すなわち、スタック上に確保した場合、コンストラク タで確保された共有メモリ、セマフォを解放するため、後述するfree()を明示 的に呼び出す必要があります。これは、この変数を共有する複数のプロセスの うち、ひとつだけから呼び出す必要があります。

また、newで確保した変数の場合、deleteで解放することで、内部からfreeが 呼び出されます。また、変数は共有メモリ上にあるため、deleteはこの変数を 共有する複数のプロセスのうち、ひとつだけから呼び出す必要があります。 workerの数がコンストラクタに与えられていた場合、全てのworkerが release()を呼び出すことで、資源が解放されます。

WorkItem<T1, T2> getwork()

仕事と出力先のペアであるWorkItem<T1, T2>を返します。 WorkItem<T1, T2>は、T1型のwork, SyncList*型のoutputをメンバ 変数に持ちます。

getwork()の実行により、仕事用リストに対してrelease()が呼び出され、取り 出された仕事に対応するリストセルが解放されます。また、出力用リストにつ いては、create()が呼び出され、新たなセルが作成されます。

仕事リストが0で終端に達していた場合、WorkItem<T1, T2>のメンバ outputは0になります(workの値は不定です)。これで終端チェックを行います。

release()

workerが終端チェックを行った後、関数(プロセス)を終了する際に、 release()を呼ぶことで、WorkPool<T1, T2>型変数に対する参照数が1 減ったことをシステムに伝えることができます。この数がコンストラクタに指 定した数に達すると、資源が解放されます。

free()

WorkPool<T1, T2>型変数が内部に持つ資源(共有メモリとセマフォ)を解 放します。

Merger

Merger.hをインクルードして使います。SyncListまたはSyncQueueと同時に 使われます。SyncQueueを利用する場合は、SyncQueue.hをインク ルードする必要があります。また、libpards.aとリンクする必要があります。

コンストラクタ

Merger<T>型のコンストラクタは、 SyncQueue<T>*またはSyncList<T>*型の変数を第1引数とします。 この変数は、マージ対象のリスト/キューの先頭のセルとなります。 すでに確保されている必要があります。

release()で資源を解放する際は、 Mergerを共有するプロセスが全て終了したことを確認するため、参照する プロセスの数を第2引数として与えます。これは省略可能です。

デストラクタ

WorkPoolの場合と同様です。すなわち、スタック上に確保した場合、コンストラク タで確保された共有メモリ、セマフォを解放するため、後述するfree()を明示 的に呼び出す必要があります。これは、この変数を共有する複数のプロセスの うち、ひとつだけから呼び出す必要があります。

また、newで確保した変数の場合、deleteで解放することで、内部からfreeが 呼び出されます。また、変数は共有メモリ上にあるため、deleteはこの変数を 共有する複数のプロセスのうち、ひとつだけから呼び出す必要があります。 workerの数がコンストラクタに与えられていた場合、全てのworkerが release()を呼び出すことで、資源が解放されます。

void put(T)

引数に与えられた値をもつセルをリストに繋げます。リストセルの確保、cdr への設定は、putの中で行われます。

void timedput(T, struct timeval*,pards_status*)

put()と同様ですが、第2引数で与えられた時間以上はブロックしません。第2 引数で与えられた時間が経過すると、ブロックから抜け、pards_statusに TIMEOUTを返します。 ブロック中にインタラプトを受けた場合は、INTERRUPTを返します。 処理が成功した場合はSUCCESSを返します。

void intrput(T, pards_status*)

put()と同様ですが、ブロック中にインタラプトを受けた場合は、ブロックか ら抜け、pards_statusとしてINTERRUPTを返します。インタラプトを受けなかっ た場合は、SUCCESSを返します。

void increase_referer(int)

Mergerを参照しているプロセスの数を増やします。引数を省略した場合は1、 引数が指定されていた場合は指定された数だけ増やします。

release()

release()を呼ぶことで、Merger型変数に対する参照数が1 減ったことをシステムに伝えることができます。この数がコンストラクタに指 定した数に達すると、資源が解放されます。

free()

Merger型変数が内部に持つ資源(共有メモリとセマフォ)を解放します。

利用上の注意点

シグナルの取扱い

現在、プログラムを終了させるシグナル については、ライブラリでキャッチしています。 キャッチした後、共有リソース(共有メモリとセマフォ)の解放を行います。も しすでにシグナルに登録されたコールバック関数がある場合は、シグナル登録 時(pards_init呼び出し時)に記録しておき、共有リソース解放後呼び出します。

もしプログラムでシグナルをキャッチしたい場合は、pards_initを呼び 出す前に登録しておくのが簡単です。pards_init呼出し後に登録する場合は、 ライブラリが登録したコールバック関数も忘れずに呼び出すようにしてくださ い。そうしないと、プログラム終了時に共有リソースの解放が行われません。

ただし、SIGALRMとSIGUSR1はライブラリで利用しているため、ユーザが利用す ることはできません。SIGALRMはタイムアウトする処理(timedreadなど)で利用 しており、SIGUSR1はインタラプトに利用しています。

SPAWNでフォークされるプロセスに関しては、SIGCHLDを無視することで、ゾン ビプロセスを生成しないようにしています。これは最近のUNIXでは動作します が、古いUNIXではうまくいかないかも知れません。

OSの設定

このライブラリでは、System V IPCが提供する共有メモリ(またはmmap)およ びセマフォを利用しています。通常、これらの資源は利用可能な最大量がOSに よって設定されています。

例えば、Linux系OSでは、/proc/sys/kernel以下にある、shmall, shmmax, shmmniに共有メモリに関する制限が記述されています。shmallには共有メモリ の総ページ数のシステム全体での制限が書かれており、shmmaxには共有メモリ セグメントを作成するときの最大サイズが書かれています(man procを参照)。 pards_initで大きなメモリサイズを利用する場合は、これらの値を大きくする 必要があるかも知れません。(mmapを利用する場合は不要です。)

また、/proc/sys/kernel/semには4つの値が書かれており、順にセマフォ集合 ごとのセマフォ数の最大値、システム全体での全てのセマフォ集合における セマフォ数の制限、semopコールに指定されるオペレーション数の最大値、シ ステム全体でのセマフォ識別子の最大値となりますが、本ライブラリでは、2 番目の値と4番目の値が影響します。特にデフォルトでは4番目の値が128と小 さいので、プログラムによっては大きくする必要があるかも知れません。

現在利用中のSystem V IPCの資源はipcsというコマンドで知ることができます (man ipcsを参照)。異常終了やpards_finalizeを呼ばなかった等で資源の解放 を行わずにプログラムが終了した場合、ipcsコマンドによって解放されなかっ た資源を見ることができます。これらはipcrmというコマンドで削除すること ができます。共有メモリの場合は、"ipcrm -m id", セマフォの場合は"ipcrm -s id"で削除できます。idはipcsコマンドによって知ることができます。

#defineの設定

libpards.hで#defineにより定義されている値を変更してコンパイルすること で、ライブラリの動作を変更することが出来ます。

おわりに

設計方針など

データフロー同期を用いる手法自体は新しいものではありません。様々な先例 がありますが、本ライブラリは直接的には並列論理型言語から影響を受けてい ます。(筆者はFlengという並列論理型言語の研究を過去に行っていました。一 般には、第五世代コンピュータプロジェクトにおけるKL1, GHCが有名だと思い ます)。

他にもLisp系言語のfuture/touchも同様のものです。また、最近ではJavaにも 同様の機能があるそうです(FutureTask)。

本ライブラリは新しい言語を作るのではなくC++を利用し、forkやSystem V IPCを利用するなど、OSの機能を利用しているところに特長があります。これ により、特に既存の逐次プログラムを並列化するのに適しています。

pthreadを利用するのではなく、forkでプロセスを作成することは、既存の逐次 プログラムを並列化する際にも有効に働きます。ある処理を並列化したいと思っ たとき、その関数がスレッドセーフかどうかを確認することは容易ではありま せん。スレッドセーフでない関数を並列に呼び出すことによって、非決定的な バグを作りこんでしまう可能性があります。

それに対して、forkで別プロセスを作成した場合、並列に動作するプロセスが お互いに影響しあうことは、明示的に確保した共有メモリを介して以外にはあ りえません。このことが並列プログラムのデバッグに大きな意味を持ちます。 デバッグ時に「スレッド間で競合が起こっているのではないか」と疑う必要が 無いためです。

pthreadではなくforkを用いることで、オーバヘッドが大きいのではないかと 心配されるかも知れません。しかし、最近のOSはプロセスの生成時に全てのメ モリ空間のコピーを行うのではなく、書き込みがあった際にはじめてそのペー ジをコピーします(Copy-on-Writeと呼ばれます)。これにより、プロセスの生 成は想像されるほど重い処理とはなりません。

これを確かめるためにベンチマークを行ってみました。pthread, forkを用い て、ただスレッド、プロセスを生成してjoinするだけのプログラムと、本ライ ブラリを用いてプロセス間で値を受け渡して終了するだけのプログラムを作成 し、実行時間を測定しました。回数は10000回です。(プログラムはmiscディレ クトリにあります)。

マシン pthread (sec) fork (sec) PARDS (sec)
IA64/HP-UX 11.23 (2CPU) 0.72 4.85 5.48
DualCoreXeon/FedoraCore6 (2*2CPU) 0.13 0.61 1.03
Xeon/FedoraCore5 (2CPU) 0.31 1.59 3.09

OSにもよりますが、ライブラリ経由で同期を行った際でも10倍以下のオーバヘッ ドで済んでいます。この程度のオーバヘッドでアドレス空間を分けることがで きるのであれば、十分有効だと考えます。もちろん、SPAWNはこの程度のオー バヘッドがあることを認識し、並列処理を行う際には、十分大きな粒度で処理 を行う必要があります。

今後の課題

今後の課題としては、bzip2以外の他のプログラムの並列化などが考えられます。 また、より将来の課題として、他の言語への移植 (rubyなど)や、Windows系OSへの移植も考えられます。