PARDS:
A library for PARallel programs with Dataflow Synchronization

Table of contents

  1. Introduction
    1. Abstract
    2. Environment
  2. How to use
    1. First step
    2. SyncList<T>
    3. SyncQueue<T>
    4. WorkPool<T1, T2>
    5. SPMD support
    6. Concurrency support
  3. Example of programs
    1. Quick Sort
    2. bzip2
  4. Reference
    1. Global functions
    2. Sync
    3. SyncList
    4. SyncQueue
    5. Other notes
  5. Conclusion
    1. Design policy
    2. Future work

Introduction

This is a library for writing parallel programs (for UNIX OSes). With this library, you can write parallel programs more easily than using other libraries like pthread directly. As a practical example, bzip2 is parallelized using this library.

Abstract

Today, speedup of single processor is becoming difficult, and using multiple processors is a popular way to achieve high performance. This is true not only for EWS, but also for PCs and embedded processors.

However, it is not easy to write programs for parallel machines. Usually, programmers should use libraries like pthread, and need to use lock and/or semaphores. This is not easy, and tend to cause bugs that are quite difficult to debug, because the behavior of the program changes at every program run.

This library offers a way to write parallel programs that is more intuitive and easier. It offers:

For example, if "a" is declared as "Sync<int> a", you can do operations like "a.read()", "a.write(1)". The operation "a.write(1)" makes the contents of "a" 1, and the operation "a.read()" gets the contents of "a". You can do "inter process communication" using this functionality.

Here, the operation "a.read()" stops (blocks) until the operation "a.write(1)" is executed. This is called dataflow synchronization. The write operation can only be applied once for the same variable (more exactly, other operations after the 1st write operation cannot change the contents.)

SyncList<T> is a list of Sync<T>, and SyncQueue<T> is SyncList<T> whose length is limited.

WorkPool<T1,T2> supports multiple processes to extract "works" from a work pool.

In addition, this library supports SPMD style programming, and functionality for concurrent programming like timeout, interrupt, and list merger.

Environment

This library is for C++ and works on UNIX OSes; this library uses "fork", and System V IPC (shared memory and semaphore). Development environment is Red Hat Linux 9, but should work on other UNIXes.

How to use

First step

I will explain the usage using the samples.cc file in the samples directory. Here is the main of samples.cc with comments.

int main()
{
  pards_init();      // ...(main:1) Library initialization
  
  Sync<int> a, b, c; // ...(main:2) Decl. of sync vars

  SPAWN(add(1,b,c)); // ...(main:3) fork add, wait for b, executed 3rd
  SPAWN(add(1,a,b)); // ...(main:4) fork add, wait for a, executed 2nd

  a.write(3);        // ...(main:5) executed 1st

  int v = c.read();  // ...(main:6) wait for add(1,b,c) of (main:3)

  printf("value = %d\n",v);
  pards_finalize();  // ...(main:7) Finalize of the library
}

In order to use the library, you need to call pards_init()(main:1)ĄŁ In addition, to finalize the library, you need to call pards_finalize() (main:7).

Synchronization variable are declared like Sync<int> a,b,c(main:2). In this case, these variables can contain values whose type is "int".

The function add(1,b,c) is SPAWNed at (main:3). This means that the function add(1,b,c) is forked as a process. (SPAWN is implemented as an macro).

Here, the function add is defined as follows:

void add(int i, Sync<int> a, Sync<int> b)
{
  int val;

  val = i + a.read(); // ...(add:1) a.read() waits for a to be written
  b.write(val);       // ...(add:2) b.write writes a value
}

This function adds the 1st argument and the 2nd argument, and returns the value as the 3rd argument.The type of the 1st argument is simple "int", and that of 2nd and 3rd argument is Sync<int>.

"a.read()" in (add:1) blocks until the value of a is written. After the value of a is written, it restarts the execution, and get the value of it. After that, the value is added with the 1st argument, and written to the variable val.

In (add:2), val is written to b. This makes the processes that wait for b's value restart its execution.

Back to the main. The 2nd argument of the function add that is forked in (main:3) is not written by any processes. Therefore, this function will block for a while.

Likewise, the 2nd argument of the function add that is forked in (main:4) is not written at this time, this add also blocks.

Then, 3 is written to a in (main:5). This makes the function add that is forked in (main:4) restarts. After execution, 4 is written to b.

After the value is written to b, the function add that is forked in (main:3) also restarts. After execution, 5 is written to c.

The value of c is read in (main:6). This also blocks until a value is written to c. Therefore, it waits for the execution of the function add that is forked in (main:3).

As you can see, inter process communication and synchronization between the processes forked by SPAWN can be realized using Sync<int> variables.

Here, multiple writes to the same variable cannot change the value. This kind of variable is called single assignment variable.

You can write an algorithm like first-come-first-served using this functionality.

Implementation

As mentioned above, SPAWN is implemented as a macro that calls fork() from it.

Sync<T> uses System V IPC to realize inter process communication; shared memory is allocated in pards_init().

In addition, semaphore of System V IPC is used in order to realize block and resume of processes and mutual exclusion of shared memory access.

Sync<T> variable only stores a pointer to shared memory and IDs of semaphores. Therefore, this variable can be passed as value to functions (arguments of function add in sample.cc). Of course, you can pass these variables as pointers or references.

The important thing is that even if you modify the global variable in the SPAWNed function, it does not affect other processes, because we use fork instead of pthread. Changing global variables in threads is a typical reason of bugs that cannot be corrected easily, but our library does not cause such bugs. And SPAWNed functions can read global / local variables that is set before SPAWN, because fork() logically copies all memory spaces (to be exact, the copy occurs only when write to the memory happens).

Allocation and release of resources

If we use many synchronization variables or the program runs for a long time, we need to release resources (shared memory and semaphores). Here, shared memory and semaphores are shared between multiple processes, so it is dangerous to release these resources in the destructor; even the resources is not needed in the process that writes a value to the synchronization variable, the process that reads the value still needs the resources. Therefore, basically in this library, you need to release resources explicitly.

As for variables allocated in the stack, you need to call free(). Of course, free() should be called only when other processes are not referring the resources. Typically, there is one writer process and one reader process, and just after the read is finished, free() can be called. Example of free() is in fib.cc in the samples directory.

If you allocate a synchronization variable using new, not only the value inside of the variable, but also memory for Sync<T> will be stored in the shared memory area. In this case, you can just use delete in order to release both shared resources and memory area for the synchronization variable.

The reason of this specification is that I wanted to make the specification similar to that of SyncList<T>. I will explain SyncList<T> next.

SyncList<T>

SyncList<T> is used in the generator-consumer pattern. In this pattern, one process creates list of values (generator), and the other process uses these values (consumer). By using different processes for generating and consuming lists, pipeline parallel processing becomes possible.

I will explain this using listsample.cc in the samples directory.

int main()
{
  pards_init();

  SyncList<int> *a;      // ...(main:1) declaration of first cell of the list
  a = new SyncList<int>  // ...(main:2) allocation of the list cell

  SPAWN(generator(a));   // ...(main:3) fork generator process
  SPAWN(consumer(a));    // ...(main:4) fork consumer process

  pards_finalize();
}

First,the first "cell" of the list is declared and allocated at (main:1), (main:2). Then, the generator process and the consumer process are forked at (main:3), (main:4). The first cell of the list is passed to the generator process and the consumer process.

Then, let's see the definition of the generator process.

void generator(SyncList<int> *a)
{
  int i;
  SyncList<int> *current, *nxt;  
  current = a;                 // ...(gen:1) assign the argument to current

  for(i = 0; i < 10; i++){
    current->write(i);         // ...(gen:2) write a value to the current list cell
    printf("writer:value = %d\n",i);
    nxt = new SyncList<int>;   // ...(gen:3) allocate new list cell
    current->writecdr(nxt);    // ...(gen:4) set the allocated cell as cdr of the current cell
    current = nxt;             // ...(gen:5) set the allocated cell as the current cell
    sleep(1);                  // ...(gen:6) "wait" to show the behavior
  }
  current->write(i);
  printf("writer:value = %d\n",i);
  current->writecdr(0);        // ...(gen:7) terminate the list using 0
}

The generator process creates a list and inserts values to it. Like Sync<T>, a value can be set to the list cell using write() (gen:2).

The next cell of the list is created using new at (gen:3). Then the cell is connected to the previous cell using writecdr() at (gen:4).

Here, a new cell should be created using "new"; don't connect a cell that is allocated on the stack. This is because the consumer process cannot read the memory if the cell is on the stack. The cell allocated using new is stored in the shared memory, so the consumer process can read it.

Because I need to make "new" of SyncList<T> allocate shared memory, I also made "new" of Sync<T> allocate shared memory.

The list is created by iterating the above process using the for loop. In order to show the behavior, 1 second wait is inserted at the end of the loop (gen:6). The end of the list is terminated by 0 (gen:7).

Then, let's see the definition of the consumer process.

void consumer(SyncList<int> *a)
{
  SyncList<int> *current,*prev;
  current = a;

  while(1){
    printf("reader:value = %d\n",
                current->read()); // ...(cons:1) read the value of the cell and print it
    prev = current;               // ...(cons:2) save the current cell
    current = current->readcdr(); // ...(cons:3) extract the cdr of the current cell, 
                                         and make it the current cell
    
    delete prev;                  // ...(cons:4) delete the used cell
    if(current == 0) break;       // ...(cons:5) check the termination
  }
}

The value of the cell is extracted and shown at (cons:1). Here, this read blocks until the value is written like Sync<T>.

The current cell is saved at (cons:2). Cdr of the current cell is extracted and is made to be the current cell (cons:3). Like read(), readcdr() blocks until cdr is written.

After the cdr is read, the previous cell is no longer needed. So it is deleted at (cons:4). Here, "delete" releases the memory in the shared memory area, and releases the semaphores.

Lastly, termination is checked at (cons:5).

The output of this program should be like this:

writer:value = 0
reader:value = 0
writer:value = 1
reader:value = 1
writer:value = 2
reader:value = 2
...

The consumer process waits for the write of the generator process. Therefore, above output is shown second by second.

Abbreviation

Since the list creation and consumption described above is typical pattern, I prepared abbreviated notation that reduces the amount of codes

Firstly, the operation "create a new list cell, and connect it to the current list cell" is described as follows:

  nxt = new SyncList<int>; 
  current->writecdr(nxt);
  current = nxt;

In order to describe this concisely, there is a create() member function that "creates new SyncList<T> variable, which is connected to the target object, and the newly created variable is returned". Using this member function, the above example can be described as follows:

  current = current->create();

Now, the temporary variable nxt is no longer needed.

Next, the operation "extract cdr from the current cell, and make this as the current cell and delete the previous cell" is described as follows:

  prev = current;
  current = current->readcdr();
  delete prev;

In order to describe this concisely, there is a release() member function that "extracts cdr and delete the cell, then returns the cdr". Using this, the above example can be written as follows:

  current = current->release();

Using these abbreviated notation, you can write programs concisely. The example that uses these notations is in listsample2.cc.

SyncQueue<T>

In the previous example, "wait" is inserted in the generator's side. It is OK to use SyncList if the generator's execution is the bottleneck. However, if the consumer's execution is slower than the generator's execution, the system might run short of the resource because releasing the resource of the consumer's side is slow.

To avoid this problem, we need to block the generator's execution until the consumer's release is done. SyncQueue<T> provides this functionality.

SyncQueue<T> is almost the same as SyncList<T>, but it accepts the length of the "Queue" as the argument of the constructor.

  a = new SyncQueue<int>(2);

If SyncQueue is declared like this, the system limits the number of operations that connects "cdr" to the Queue up to 2. If more cons cells are tried to connected to the Queue, the operation blocks. Then, if the cells connected to the Queue is released by "delete" or "release", the number increases. If there is an operation that is waiting, it resumes the operation. This means that at most 3 cells can exist at a time.

Only the first cell require the number in the constructor. After that, other cells can be allocated same as SyncList<T>.

In addition, a cell cannot be set as multiple cells' cdr unlike SyncList<T> ; the system detects this and outputs an error.

An example of SyncQueue<T> is in queuesample.cc. It changed listsample2.cc so that SyncList type is replaces by SyncQueue<T> and the consumer's side waits. The output of this program should be like this:

writer:value = 0
writer:value = 1
writer:value = 2
reader:value = 0
writer:value = 3
reader:value = 1
writer:value = 4
...

First, because the generator's side does not wait, 0, 1, 2 are shown. Then after the consumer side shows 0 and releases the cell, the generator resumes execution and shows 3, and so on.

WorkPool<T1, T2>

If you use only SyncList and SyncQueue, there might be a case that you need to call SPAWN very frequently. Process invocation cost is not that large in recent OSes, but still you might need to reduce the number of process invocation.

Therefore, I prepared a class that supports a pattern like that worker processes are invoked at first (for example, number of processors), and they get their work from "work pool".

I will explain how to use the class using workpoolsample.cc in the "samples" directory.

int main()
{
  pards_init();

  SyncQueue<int> *work = new SyncQueue<int>(2);  // ...(main:1) work queue
  SyncQueue<int> *output = new SyncQueue<int>(2);// ...(main:2) output queue

  WorkPool<int,int> *pool = new WorkPool<int>(work,output);
                                         // ... (main:3) definition of WorkPool
  SPAWN(generator(work));                // ... (main:4) creation of work
  SPAWN(worker(1, pool));                // ... (main:5) fork woker1
  SPAWN(worker(2, pool));                // ... (main:6) fork woker2

  while(1){
    printf("%d...\n",output->read());    // ... (main:7) show output queue
    if((output = output->release()) == 0) break;
                         // ... (main:8) get the next cell & check termination
  }
  pards_finalize();
}

At first, SyncQueue work whose value is "work", and SyncQueue output whose value is "output" are defined (main:1) (main:2). You can use SyncList instead of SyncQueue.

Next, "pool" whose type is WorkPool is defined (main:3). T1 and T2 of WorkPool<T1,T2> are SyncQueue<T> 's T for work (int), and SyncQueue<T> 's T for output (int). In addition, argument of the constructor includes SyncQueue work for "work" and SyncQueue output for "output".

Here, SyncQueue (SyncList) for work and SyncQueue (SyncList) for output are treated as a pair; when you get work cell from the pool, you also get the output cell for the work as a pair.

This enables us to get the output in order, even if the work is processed out of order by different processes.

In (main:5) and (main:6), worker processes are SPAWNed, whose argument includes the work pool. The other argument is id of the worker.

(main:7), (main:8) show the result.

Next, let's see the definition of the worker.

void worker(int id, WorkPool<int,int> *workpool)
{
  while(1){
    WorkItem<int,int> item = workpool->getwork(); // (worker:1) get the work
    if(item.output == 0) break;                   // (worker:2) check termination
    else{
      item.output->write(item.work*2);            // (worker:3) double the value and write to the output
      printf("(%d by [%d])\n",item.work*2,id);    // (worker:4) print the worker id 
    }
  }
}

At first, the work is got from WorkPool in (worker:1). Here, the type of the work is WorkItem. T1 and T2 of WorkItem<T1,T2> are the same as WorkPool<T1,T2>.

You can get the "work" whose type is WorkItem by calling the getwork() member function. WorkItem includes the work and the output cell for the work.

Here, inside of getwork(),

Therefore, users of WorkPool don't have to worry about release/creation of cells.

The variable item whose type is WorkItem includes a member "output" that has the output cell. You can check if the work pool is terminated or not by checking if it is 0 (worker:2).

The variable item (whose type is WorkItem) includes a member "work" whose type is T1 (in this case, int). In (worker:3) this value is doubled and written to the output cell.

In (worker:4), in order to show the worker id that did the job, the value that is written to the output cell, and worker id are printed. You can see that the works are processed by multiple workers.

The output of this program should be like this:

(0 by [1])
(2 by [1])
0...
2...
(4 by [1])
(6 by [2])
4...
6...
(8 by [1])
(10 by [2])
8...
10...

Integer lists like "0, 1, 2, ..." are doubled and shown like "0... 2... 3...". In addition, which worker did the work is shown like (2 by [1]). The worker id would change by each execution.

In the above example, WorkPool type variable was not deleted. When deleting this, it is difficult to decide when it is safe to delete it; if it is deleted just after the output is all extracted, there might be still exist workers (that didn't reach the termination check), and they cause errors by using the released resources.

To avoid this, you can specify worker's number as reference count as the argument of the constructor (the last argument). In addition, workers call the release() member function after the termination check. This reduces the reference count, and can tell the system that the worker terminated.

When delete of WorkPool variable is called, this counter is checked; until all the workers terminate, delete is blocked.

This example is written in workpoolsample.cc as comments.

If the counter is not specified in the constructor, you can delete the variable without calling release(). However, this is a dangerous operation in general. In addition, you can call release() even if the counter is not specified in the constructor.

If WorkPool variable is allocated in the stack, you need to call free() to release the resources (same as Sync). The above mechanism are also applied in this case.

SPMD Support

When parallelizing numerical programs, loops are most often parallelized. It can be implemented with SPAWN, but it requires to create a function, which is a burden of programmer.

Therefore, the library supports a programming style called "SPMD".

SPMD stands for "Single Program, Multiple Data". The program is only one, but different processors work on different data.

"MPI" is famous for a parallel library of SMPD. MPI works (mainly) on distributed memory computers. Communication is done by message passing. All processors execute the same program, but by obtaining the my processor number, each processor can do different thing.

SPMD support by PARDS is similar to MPI. But communication is done through shared memory on PARDS.

I will explain how to use this using spmdsample.cc in the samples directory. This program only adds the data in the array "a".

#define NP 3
...

  int *a = (int*)pards_shmalloc(100*NP*sizeof(int)); // (1) input data
  int *out = (int*)pards_shmalloc(NP*sizeof(int));   // (2) output data

  for(int i = 0; i < 100 * NP; i++)       // (3) initialize a
    a[i] = 1;

  PBInfo* pbi = pards_begin_parallel(NP); // (4) begin parallel block

  int pno = pbi->getpno();                // (5) get the process number
  int *my_part = a + 100 * pno;           // (6) calculate where to work
  
  int sum = 0;

  for(int i = 0; i < 100; i++)            // (7) work on my part
    sum+=my_part[i];

  out[pno] = sum;                         // (8) store the result in shared memory

  pards_barrier(pbi);                     // (9) wait for all processes

  int totalsum = 0;
  if(pno == 0){                           // (10) process #0 calculate the sum
    for(int i = 0; i < NP; i++)
      totalsum += out[i];
  }

  pards_end_parallel(pbi);                // (11) end parallel block
  ...

At first, (1) and (2) allocates arrays for input data and output data on shared memory, which are shared by all the processes.

Here, to explicitly allocate shared memory, pards_shmalloc function is used. To release the memory, pards_shmfree is used.

In (3), the contents of input data array a is initialized as "1".

From (4) SPMD processing starts. From pards_begin_parallel to pards_end_parallel in (11), processes (whose number is the argument of pards_begin_parallel) do the same work.

pards_begin_parallel returns PBInfo, which includes the information of the parallel block.

(5) returns process no. of the executing process, so that each process can do different work. PBInfo is used for this. Process no. starts from 0, ends no. of process - 1.

(6) calculates the position to work from the process no.

(7) calculates the sum of the array where the processes should work on. Then, (8) stores the result in the shared memory allocated in (1).

(9) does "barrier", which means that all processes wait until they reach this statement. By this statement, the processes can know that other processes already did their work.

After the barrier in (9), (10) calculates total sum of all processes, which is done by process no. 0.

pards_end_parallel of (11) ends SPMD. After this statement, number of processes returns to 1.

SPMD functionality is effective if there are loops that manipulate large arrays and their execution time is dominant. Please see the matrix multiply example in samples directory also.

You can use SPAWN in the parallel block, but note that the SPAWN will be executed by all processes unless it is guarded by branch that uses the process number.

In addition, you can nest the parallel block (pards_begin_parallel), but it is also executed by all processes, which is hard to control. Especially, if you use barrier, be sure to use correct processes.

Concurrency support

So far, we discussed "parallel processing" whose purpose is speedup of programs utilizing multiple processors. However, sometimes we need "concurrent processing" whose purpose is to express concurrency explicitly.

We need "concurrent processing" when we express multiple I/O, for example; i.e. a GUI application waits for both input from the user and data from the network.

When we write such a program, we want to make "block and wait for input from the user" process and "block and wait for input from the network" different processes (or threads).

(Of course, you can use something like "select", which waits for multiple inputs. But there would be a case that programs that use different processes/threads are easier to understand.)

PARDS has functionality that is needed for such programs. That is:

They are explained in detail as follows.

Merger

Consider a GUI application that waits for both input from the user and data from the network. There would be a case that waiting for these inputs are done by different processes, and if there is input from either process, another process does some process according to the input.

To realize such functionality, merger functionality of SyncList / SyncQueue is introduced. This functionality can be used for waiting multiple inputs like "select".

You can use this functionality by using class "Merger". I will explain this as follows. This example is in mergersample.cc in the samples directory.

  SyncQueue<Item> *first = new SyncQueue<Item>(2); // (main:1) create the first cell
  
  Merger<Item> *merger = new Merger<Item>(first,2);// (main:2) create merger with the
                                                   // first cell  
  SPAWN(generator(1,merger));  // (main:3) call generator1 with merger
  SPAWN(generator(2,merger));  // (main:4) call generator2 with merger
  SPAWN(consumer(first));      // (main:5) call consumer with the first cell

At first (main:1) creates the first cell. In this case SyncQueue is used, but you can use SyncList. "Item" is defined as follows:

class Item{
public:
  int val;
  int id;
};

The value of the item is in "val", and "id" holds generator's id.

Then (main:2) creates the "merger" that is an instance of Merger class. The first cell is passed as an argument of the constructor. The second argument of the constructor is the number of the processes that share the merger. This is used to release the resources that merger uses (same as WorkPool).

(main:3) and (main:4) call generators with the created merger. The generators add list cells to the SyncQueue List that starts from the "first" using the merger. This addition is done by generator1 and generator2 in non-deterministic order.

(main:5) calls "consumer" with the first cell. The consumer accesses the list that is created by generator1 and generator2.

Then let's see the definition of the generator.

void generator(int id, Merger<Item> *m)
{
  ...
  for(int i = 0; i < 3; i++){
    Item it;       // (generator:1) create Item
    it.val = i;    // (generator:2) set val of Item
    it.id = id;    // (generator:3) set id of Item
    m->put(it);    // (generator:4) put Item to the merger
  }
  m->release();    // (generator:5) release the merger
}

(generator:1), (generator:2) and (generator:3) create Item and set the value of it.

Then, (generator:4) add the created Item to the list by calling "put" of the merger. This "put" does creation of list cell and setting to the cdr.

Other processes (in this case, another generator) may call "put" of the same merger. In this case, the cells is added to the list in the order of calling.

After creation of the list, (generator:5) calls release to release the merger. After "release" is called by all the processes (the number is set in the constructor of the merger), the resource of the merger is released, and the list is terminated by 0.

If you want to increase the number of processes that share the merger, you can use increase_referer().

Lastly, let's see the consumer.

void consumer(SyncQueue<Item> *l)
{
  SyncQueue<Item> *current = l;
  while(1){
    Item it = current->read();
    printf("val = %d, id = %d\n",it.val, it.id);
    current = current->release();
    if(current == 0) break;
  }
}

This doesn't have anything special. It just traverses the list until the termination, and shows their values. The result would be something like this.

val = 0, id = 2
val = 0, id = 1
val = 1, id = 2
val = 1, id = 1
val = 2, id = 1
val = 2, id = 2

You can see that generator's id's are shown in arbitrary order.

In this case, I used the merger directory in the generators. But there might be a case that you can want to merge lists that are created by processes, if you already have such process definitions. In this case, you can implement this by creating processes for merging for each list. You can find this example in mergersample2.cc.

Timeout

There would be a case that you want to terminate blocking operations like read() of Sync<T> after some amount of time. For example, in the case of waiting for data from a socket, there might be a case that you want to show an error to the user if the operation does not complete after some amount of time.

To support such cases, there are timeout operations as follows:

These operations are timeout version of read, readcdr, release, writecdr, create, put. The "struct timeval*" shows the time until the operation timeouts (if 0 is given, the operation does not timeout).

The pards_status* is pointer to enum that has either of SUCCESS, TIMEOUT or INTERRUPT. The programmer needs to allocate memory for the value and pass the pointer to it to timed operations above. The value of the variable is set to either SUCCESS, TIMEOUT or INTERRUPT by the operations. If the value is SUCCES, it shows that the operation succeeded without timeout, if it is TIMEOUT, it shows that the operation timedout. INTERRUPT shows that there was interrupt and the operation didn't succeed. The interruption will be explained later.

Here, timeout version of writecdr and crate does not exist in SyncList. This is because these operations does not block in SyncList. On the other hand, these operations might block in SyncQueue, so there are timeout version of them.

The timedput can only be used if SyncQueue is given to the constructor of Merger (if it is SyncList, put does not block).

Samples of them are in timedreadsample.cc, timedlistsample.cc, timedqueuesample.cc, and timedputsample.cc in the samples directory.

Interruption

The user might want to cancel the operations, not only timeouts. For example, the user is waiting for data from a socket, and wants to cancel the operation even if it is within the timeout period.

To support such a case, the library has "interrupt" functionality.

Interrupt is functionality to send signal to the SPAWNed processes. Currently, SIGUSR1 is used for the signal.

In addition, you can't get the process id from SPAWN. Therefore, new macro ISPAWN(pid, func) is provided. The first argument is the variable to hold the PID (you can specify the variable directly (not pointer), because ISPAWN is macro). The type of PID is pards_pid. Currently, it is typedef of pid_t, but different type name is used for future extension.

You can send interrupt using pards_send_interrupt(pards_pid) to the processes that are spawned by ISPAWN.

You can know if interrupt is sent by calling pards_is_interrupted(). It returns 1 if there was interrupt, otherwise it returns 0. pards_clear_interruption() can clear the status.

If there is interrupt during the blocking operations, the behavior depends on the operations.

If the operations are normal operations like read, readcdr, release, writecdr and create, then they do not stop operations. It is guaranteed that the operations succeed.

If the operations are timeout operations like timedread, timedreadcdr, timedrelease, timedwritecdr, timedcreate, then the operations stop. They returns INTERRUPT as pards_status.

In addition, operations that stop by interruption without timeout are added:

These operations do not timeout, but stop by interrupt. You can know if there was interrupt or the operation completed successfully by seeing pards_status. If it is SUCCESS, the operation completed successfully, and if it is INTERRUPT, the operation stopped by interruption.

If there was interrupt during blocking system calls, the system call stops. In this case, usually the return value of the system call becomes -1, and errno shows EINTR (it depends on the kind of system calls).

The samples are intrreadsample.cc, intrlistsample.cc, intrqueuesample.cc and intrputsample.cc in the samples directory.

Example of programs

Here, parallelization of quick sort and bzip2 is explained as practical examples. In addition, a GUI program that uses GTK+ and concurrency support of PARDS is also described.

Quick Sort

Here, I will explain an example that uses Sync for just synchronization, not for data input and output.

At first, I will explain the algorithm of quick sort. Quick sort does:

As you can guess it intuitively, the sort of partitioned arrays can be done in parallel.

Here, you can SPAWN a process whose argument has Sync<T> that holds partitioned array. However, you need to copy the array many times in this case. Therefore, Sync<T> is used just for synchronization, and values of the array is stored in shared memory.

To allocate shared memory explicitly, you can use pards_shmalloc. This allocates memory on shared memory area just as malloc. To release the shared memory, you can use pards_shmfree. In the below explanation, the sorting array is already in the shared memory area.

At first, serial version of the quick sort program can be implemented as below:

void qsort (int v[], int left, int right)
{
  ...

  if(left >= right)
    return;

  last = partition(v,left,right);  
  qsort(v,left,last-1);
  qsort(v,last+1,right);
}

Here, the function "partition" partitions the array above a certain value and below the certain value.The return value of the function is the index to divide the array. after the partition, call the function qsort recursively giving the area to sort.

This can be parallelized as follows:

void qsort_sync (int v[], int left, int right, Sync<int> s)
{
  ...

  if(left >= right){
    s.write(1);
    return;
  }

  if(pards_avail_processes() == 0 ||      // (1) Is it possible to fork?
     right - left + 1 < min_size){        // (2) Enough work?
    qsort(v,left,right);                  // (3) Otherwise, serial sort
    s.write(1);                           // (4) Tell the completion
    return;
  } else {
    Sync<int> t, u;                       // (5) Allocate sync variables
    last = partition(v,left,right);       // (6) partition the array
    SPAWN_F(qsort_sync(v,left,last-1,t)); // (7) Recursively process the left part in a new process
    qsort_sync(v,last+1,right,u);         // (8) Process the right part
    t.read();                             // (9) Wait for the left part completion
    t.free();                             // (10) Release the sync variable
    u.read();                             // (11) Wait fot the right part
    u.free();                             // (12) Release the sync variable
    s.write(1);                           // (13) Tell the completion to the caller
    return;
  }
}

The function qsort_sync is the parallelized version of qsort; it has an additional 4th argument for synchronization. Other arguments are the same as the serial version.

At first, the function decides if the process should be done in parallel or not. The function "pards_avail_processes" in (1) returns the number of currently available processes. The maximum number of the available process can be given as the argument of pards_init(). In addition, if the amount of work is not enough, serial processing is faster than parallel processing. This is decided in (2). If the sort is done in serial in (3), (4) tells the completion of the sort by writing a value to the 4th argument.

Next, in the case of parallel processing, variables for synchronization is allocated in (5). Then, partition the given area at first in (6).

Then, the partitioned areas are sorted in parallel. At first, a new process is forked for the left part and call qsort_sync recursively. Here, SPAWN_F is used instead of SPAWN. The SPAWN_F macro does not fork a process but call the argument as a function if the number of process exceeds number of available processes. ((1) checks the number of available processes, but this might become 0 after the check.)

(SPAWN and SPAWN_F are different in semantics: SPAWN throws an exception if forking is not possible, but SPAWN_F calls it as an function. In this case, the function may change/refer to the global variables.)

(8) recursively processes the right part. Here, a process is not forked, but the function qsort_sync may fork another process; therefore we need synchronization for this function also.

(9) and (11) waits for the completion of the left and right sort. After the completion, (13) tells the completion to the caller.

The above program is a simplified version of qsort.cc in the samples directory. In the program, exception handling is added. This is because the program might use more number of semaphores than the available number that is specified by the OS. Especially, the default number is quite small (128) in Linux (this can be changed).

Evaluation

Because quick sort is a fast algorithm, it does not take enough time for evaluation unless the size is large enough. The evaluation below is done in the condition that the data size is 10,000,000, and min_size is 1,000,000. (To evaluate the program in this condition, you might need to change the size of maximum shared memory in Linux. This will be explained later.)

Machine Serial (sec) Parallel (sec) Parallel efficiency/Ideal
IA64/HP-UX 11.23 (2CPU) 5.99 3.23 1.85/2
DualCoreXeon/FedoraCore6 (2*2CPU) 1.40 0.5 2.8/4
Xeon/FedoraCore5 (2CPU) 3.6 1.73 2.08/2(HT)

In this algorithm, one partition always done in serial. However, we could get good parallel efficiency.

We could get more than 2 speed up using 2 CPU Xeon. This would be because Xeon supports Hyper Threading and can be seen 4 CPU logically.

bzip2

As a practical example, "bzip2" was parallelized using this library. Bzip2 is a compression program; and it also provides compression library and its API. This parallelization uses this library, which divides the target file into parts and compresses them in parallel. (This method does not create the same binary as the sequential version, unfortunately. But the compressed file can be uncompressed by bunip2 successfully. In addition, I parallelized only compression, not decompression.)

Parallelization of bzip2 was done also elsewhere , but they need a lot of change to the original program, using pthread etc. (Some programs uses the same parallelization method like mine, others parallelize the internal library and create the same binary as the sequential version.) Here, I want to show that the program can be parallelized easily using PARDS.

At first, let's see the compression part of bzip2.

void compressStream ( FILE *stream, FILE *zStream )
{
   ...

   bzf = BZ2_bzWriteOpen ( &bzerr, zStream, 
                           blockSize100k, verbosity, workFactor );   
   ...
   while (True) {
      if (myfeof(stream)) break;
      nIbuf = fread ( ibuf, sizeof(UChar), 5000, stream );
      ... 
      if (nIbuf > 0) BZ2_bzWrite ( &bzerr, bzf, (void*)ibuf, nIbuf );
      ... 
   }
   BZ2_bzWriteClose64 ( &bzerr, bzf, 0, 
                        &nbytes_in_lo32, &nbytes_in_hi32,
                        &nbytes_out_lo32, &nbytes_out_hi32 );
   ...
}

This function compresses the file stream "stream", and outputs to "zStream". To compress the stream, BZ2_BzWriteOpen does initialization, and BZ2_bzWrite does the compression of the contents read by fread. After compression, BZ2_bzWriteClose64 closes the stream.

These APIs are simple for sequential compression, but are not suitable for parallelization. So let's use "BZ2_bzBuffToBuffCompress" API that compresses the contents in the memory.

Basic parallelization strategy is as follows:

At first, the "compressStream" above is changed as follows:

void compressStream ( FILE *stream, FILE *zStream )
{
   ...
   pards_init();                        // (1) Initialization

   ...
   SyncQueue<Block> *rs;                // (2) Queue for reading data 
   SyncQueue<Block> *cs;                // (3) Queue for compressed data 

   rs = new SyncQueue<Block>(pards_get_nprocs()); // (4) Allocate the first cell
   cs = new SyncQueue<Block>(pards_get_nprocs()); // (5) The size of the queue is # of CPUs

   SPAWN(readStream(stream, rs));       // (6) SPAWN the data reading process
   SPAWN(myCompressStream(rs,cs));      // (7) SPAWN the compression process
   writeStream(zStream, cs);            // (7) Write

   pards_finalize();                    // (9) Finalization

   return;
}

At first, initialization is done in (1). Then, (2) and (3) allocates pointers for queues that store the read data and compressed data.

Here, the type of the queue's item is "Block"; this is defined as follows:

struct Block
{
  UChar* buf;
  int size;
};

The first member is a pointer to the data for processing, and the second member is the size of the data. Here, the data is referred from multiple processes, so the pointing data is in shared memory area.

Then, (4) and (5) allocates the first cell. Here, the number of the CPUs is used as the queue size (which is passed as the constructor), so that the number of compressing processes is limited to around number of CPUs.

To get the number of CPUs, this library offers the "pards_get_nprocs()" function. This returns the available number of CPUs.

Then, (6) and (7) SPAWN the data reading process and the compressing process as described above. The writing of the data is done in the parent process (8), and after writing, finalization is done (9).

Then, let's see the "readString" function of the reading process.

void readStream(FILE* stream, SyncQueue<Block> *rs)
{

  int bufsize;
  Block blk;
  SyncQueue<Block> *crnt;

  bufsize = blockSize100k * 100000;           // (1) Size of 1 block
  
  crnt = rs;
  while (True) {
    blk.buf =                                 // (2) Allocate shared memory
      (UChar*)pards_shmalloc(bufsize * sizeof(UChar)); 
    blk.size =                                // (3) Read from the file
      fread (blk.buf, sizeof(UChar), bufsize, stream); 
    crnt->write(blk);                         // (4) Write to the queue
    if (myfeof(stream)){
      crnt->writecdr(0);                      // (5) Terminate the queue if EOF
      return;
    } else {
      ... // Error handling
    } else {
      crnt = crnt->create();                  // (6) Allocate the next block
    }
  }
 }

First, (1) sets the size of the block. Here, the variable "blockSize100k" is a global variable defined in bzip2, which specifies the block size that is given as the option of bzip2. Bzip2 uses this size as the unit of compression. So, this size is also used as the unit of parallelization.

(2) allocates shared memory using pards_shmalloc. The size of it is the block size before. The pointer of the allocated area is stored in the Block structure.

Then, (3) reads the data from the file. The size is the block size. The actual read size is also stored in the Block structure.

After reading, (4) writes the Block structure data to the queue. This makes the compressing process start compression.

If the read data is end of the file, the queue is terminated (5). If not, a new cell is allocated for the next block (6). Here, using the abbreviated notation, new cell is set to the cdr of "crnt", and then new new cell is set to "crnt".

Now reading to the queue is finished. Then, let's see the compression process.

void myCompressStream(SyncQueue<Block> *rs, SyncQueue<Block> *cs)
{
  SyncQueue<Block> *crntrs, *nxtrs;
  SyncQueue<Block> *crntcs;

  crntrs = rs;
  crntcs = cs;
  while (True){
    nxtrs = crntrs->readcdr();        // (1) Extract cdr from the reading queue
    SPAWN(myCompress(crntrs,crntcs)); // (2) Compress the current cell's block 
    if(nxtrs == 0){                   // (3) If the reading queue is terminated,
      crntcs->writecdr(0);            //     terminate the compression queue
      break;
    } else {
      crntrs = nxtrs;                 // (4) Set the next reading queue 
      crntcs = crntcs->create();      // (5) Set the next compression queue
    }
  }
}

(1) extracts the cdr of the reading queue. This is because the current reading queue's cell is released in the compression in (2); therefore, it is saved before release.

(2) SPAWNs the compression process for the current reading queue.

(3) checks if the reading queue is terminated or not. If it is terminated, the compression queue is also terminated.

If it is not terminated, the cdr of the current reading queue saved before is set as the next cell in (4). In addition, (5) allocates the next compression cell and sets to the cdr of the current cell.

Then, let's see the compression process.

void myCompress(SyncQueue<Block> *rs, SyncQueue<Block> *cs)
{
  Block rblk,cblk;                   // (1) Blocks for reading and compression
  ...

  rblk = rs->read();                 // (2) Extract data from the reading queue
  bufsize = blockSize100k * 110000;  // (3) Allocate the data for compression
  cblk.buf = (UChar*)pards_shmalloc(bufsize * sizeof(UChar));
  size = bufsize;
  
  // (4) Compress the data using the API provided by bzip2
  bzerr = BZ2_bzBuffToBuffCompress((char*)cblk.buf, &size,
				   (char*)rblk.buf, rblk.size,
				   blockSize100k, verbosity, workFactor);  
  cblk.size = size;                  // (5) Size of the compressed data

  ... // Error handling

  cs->write(cblk);                   // (6) Write to the compression queue
  pards_shmfree(rblk.buf);           // (7) Release the shared memory for reading
  delete rs;                         // (8) Delete the cell of the reading queue
}

At first, (1) allocates the Block structures for reading and compression. Then (2) reads the Block structure from the reading queue.

(3) allocates shared memory. The pointer to the allocated memory is stored in the Block structure for compression.

Now, (4) does the actual compression. For compression, the API "BZ2_bzBuffToBuffCompress" that is provided by bzip2 is used. This API compresses data on memory and writes to the other memory area. These pointers are set as the arguments of the API.

The size of the compressed data is set to the Block structure for compression (5), and the block structure is written to the compression queue (6). Now the reading data is not needed; (7) releases the shared memory and (8) releases the cell.

Lastly, let's see the writing process.

void writeStream(FILE* zStream, SyncQueue<Block> *cs)
{
  Block blk;
  SyncQueue<Block> *crnt;

  crnt = cs;
  while (True){
    blk = crnt->read();     // (1) Reading of the block
                            // (2) Writing to the file
    int s = fwrite(blk.buf, sizeof(UChar), blk.size, zStream); 
    ... // Error handling
    pards_shmfree(blk.buf); // (3) Release the written memory block
    crnt = crnt->release(); // (4) Release the cell and read cdr
    if(crnt == 0) break;    // (5) Termination check
  }
  ...
}

This process is simple. At first, (1) reads a block from the compression queue. Then using the pointer and the data size in the block, the data is written to the file by fwrite (2).

After writing the data in the block is not needed; (3) releases the shared memory, and (4) releases the cell and reads the cdr using the abbreviated notation (release).

Lastly, (5) checks the termination.

Now, parallelization of bzip2 is finished. You can see that practical size of program can be parallelized relatively easily.

Evaluation

I did performance evaluation comparing the serial version of bzip2. The compressed file is this library under developed cat'ed 4 times, whose size is about 10MB. The block size is default (900k), the output is redirected to /dev/null.

Machine Serial (sec) Parallel (sec) Parallell efficiency / Ideal
IA64/HP-UX 11.23 (2CPU) 14.76 8.96 1.64/2
DualCoreXeon/FedoraCore6 (2*2CPU) 3.79 1.43 2.65/4
Xeon/FedoraCore5 (2CPU) 8.96 5.55 1.61/2(HT)

The parallel efficiency is a bit lower than Quick Sort, but we could attained enough speedup by parallelization.

WorkPool version

So far, the program is parallelized using only SyncQueue. Though the number of SPAWN call is about 11 times (10MB/900kb), which is small enough, let's try using WorkPool also.

There are not so many differences from the previous version. At first, let's see the compressStream. (The parallelized file is wpbzip2.c, which is in the same directory.)

   pards_init();

   SyncQueue<Block> *rs;
   SyncQueue<Block> *cs;

   int nprocs = pards_get_nprocs(); 

   rs = new SyncQueue<Block>(nprocs);
   cs = new SyncQueue<Block>(nprocs);

   SPAWN(readStream(stream, rs));

                                      // (1) Definition of WorkPool
   WorkPool<Block,Block> *pool = new WorkPool<Block,Block>(rs,cs,nprocs);
   for(int i = 0; i < nprocs; i++){   // (2) SPAWN worker
     SPAWN(myCompressStream(pool));
   }
   writeStream(zStream, cs);

   delete pool;                       // (3) Delete the WorkPool
   
   pards_finalize();

The main changes are as follows:

At first, (1) defines the WorkPool variable. The arguments of the constructor are SyncQueue rs for input, SyncQueue cs for compression, and the number of processors as the number of workers.

The workers are SPAWNed in (2). The number of workers is number of processors. The function executed by the worker is myCompressStream, which is modified from the previous version.

Lastly, the WorkPool variable is deleted in (3) (actually, this is redundant because finalization is done just after it).

The functions readStream, writeStream are same as the previous version. Next, modified functions myCompressStream and compressStream are explained.

void myCompressStream(WorkPool<Block, Block>* pool)
{
  while (True){
    WorkItem<Block, Block> item = pool->getwork(); // (1) Get the WorkItem
    if(item.output == 0) break;                    // (2) Check termination
    else
      myCompress(item);                            // (3) Compression
  }
  pool->release();                                 // (4) Release the pool
}

The function myCompressStream became simple. As the comments, (1) gets the WorkItem using getwork(), (2) checks termination, and (3) does compression passing the WorkItem to myCompress. After finishing the loop at the termination, the pool is released in (4).

Next, let's see myCompress.

void myCompress(WorkItem<Block, Block> item)
{
  Block rblk, cblk;
  ...

  rblk = item.work;       // (1) The processing Block is got from WorkItem
  bufsize = blockSize100k * 110000;
  cblk.buf = (UChar*)pards_shmalloc(bufsize * sizeof(UChar));
  size = bufsize;

  bzerr = BZ2_bzBuffToBuffCompress((char*)cblk.buf, &size,
				   (char*)rblk.buf, rblk.size,
				   blockSize100k, verbosity, workFactor);  
  cblk.size = size;

  ... // Error handling

  item.output->write(cblk); // (2) Compressed data is written to the cell for output
  pards_shmfree(rblk.buf);
}

This is similar to the previous version. The processing Block is got as rblk. After compression, the result is written to the cell of WorkItem for output.

Evaluation

I did evaluation of WorkPool version of bzip2. The evaluation condition is same as the previous version.

Machine Serial (sec) WorkPool ver. (sec) Parallel efficiency / Ideal
IA64/HP-UX 11.23 (2CPU) 14.76 8.88 1.9/2
DualCoreXeon/FedoraCore6 (2*2CPU) 3.79 1.57 2.41/4
Xeon/FedoraCore5 (2CPU) 8.96 5.23 1.71/2(HT)

Some are faster but some are slower than the previous version. Because the number of process is small, there wasn't much difference.

To see the difference, I evaluated it by specifying "-1" as option, which makes the block size 100kb instead of 900kb; this makes the number of process larger. In this case, the amount of work is reduced more than the size of the data, which makes the granularity of work smaller. The evaluation machine is DualCoreXeon (4CPU)

In this case, the serial version was 1.79 sec, the previous version was 0.746 sec, and the WorkPool version was 0.573 sec, which shows the speed up.

Actually, the performance changes by changing the queue length. (For example, on the HP-UX machine, doubling the queue length made the performance about 10% faster.) It seems that this is because of cache, or process scheduling of readStream and writeStream. There seems to be still room for improvement.

GUI with GTK+

Using the functionality described in concurrency support, I created a helper library for GUI programs. The files libgtkhelper.cc, libgtkhelper.h are the library, and gtksample.cc is a sample. They are in the gtk directory. I will explain how to use the helper library using the sample.

This sample program receives values created by forked processes, and displays them. It takes some time for the processes to create values. The GUI accepts input during the period. In addition, the user can cancel forked processes.

At first, I will explain the main routine.

  pards_init();

  SyncQueue<Callback*> *c = new SyncQueue<Callback*>(4);// (1) Queue for messages 
  
  gtkhelper_init(c);      // (2) Initialize the helper library

  Merger<Callback*> *m = new Merger<Callback*>(c,2);    // (3) Create a Merger for messages
  

  ISPAWN(w1,worker(1,m)); // (4) Create the process#1 that produces values
  ISPAWN(w2,worker(2,m)); // (5) Create the process#2 that produces values

At first, (1) creates the first cell of the queue for messages. The type of the queue is Callback*. This type is defined in the library. (Note that it is a pointer type. The reason of this is that inherited types are used actually.)

The helper library is initialized in (2), passing the first cell created in (1). After this, every time a (Callback* type) value is written to the queue, the run() method of it is called.

To implement this, the functionality "waiting specified file descriptor to be ready to read in the event loop" provided by GTK+ is used; a process that reads the message queue (in the blocking manner) is forked, and the process writes a value to a pipe (which is watched by the event loop) if a message was read.

In addition, the run() method of Callback type variable is defined by the programmer in the type that inherits Callback. I will explain this later.

The value producing processes are created in (4) and (5). These processes get the instance of Merger as the argument. They "put" variables whose type is inherited from Callback to the Merger.

Next, I will explain the process worker. At first, I will explain MyCallback class used in the worker.

class MyCallback : public Callback
{
public:
  int val;         // (1) Value to show
  int id;          // (2) Process id

  void run(){      // (3) Override run() method
    char str[1000];
    if(val == -1)
      sprintf(str,"Process %d is canceled\n",id);               // (4) Strings when canceled
    else
      sprintf(str,"Message from Process %d, val = %d\n",id,val);// (5) Strings for the normal case
    
    if(currentpos + strlen(str) > NCHARS + labelchars) return;
    else{
      strcpy(currentpos,str);
      currentpos += strlen(str);
      gtk_label_set_text(GTK_LABEL(label),labelchars);          // (6) Change the label
    }
  }
};

The pointer to the instance of MyCallback class will be put to the Merger. At the event loop side, every time the pointer is put, the overridden run() method will be called.

First, (1) and (2) defines the instance variables that is needed to execute the callback. Here, the value produced, and the id of the process is defined.

The run() method is overridden in (3). A GTK+ function is called in (6), which changes the label string. The string is set in (4) when the process is canceled, or in (5) in the normal case. When process is canceled, "val" is set to -1, by which the callback can know that the process is canceled.

Then, here is the definition of the process that produces the value.

void worker(int id, Merger<Callback*> *c)
{  
  for(int i = 0; i < 5; i++){
    int r = simulate_slow_read();     // (1) Create the value
    MyCallback* val = new MyCallback; // (2) Create the instance variable of MyCallback
    val->id = id;                     // (3) Set id id
    if(pards_is_interrupted()){       // (4) Check if interrupted
      val->val = -1;                  // (5) If interrupted, set val -1
      c->put(val);                    // (6) Put to the Merger
      c->release();                   // (7) release() Merger
      return;                         // (8) Terminate the process
    } else {
      val->val = r;                   // (9) If not interrupted, set val to the read value
      c->put(val);                    // (10) Put to the Merger
    }
  }
  c->release();                       // (11) release() Merger
}

The value to show is read in (1). In this sample, sleep was used here, but practically, this corresponds to the data read from a socket etc.

The instance variable of MyCallback is created in (2); the id of the process is set to it in (3).

(4) checks if there was interrupt or not. If the process was interrupted, val was set to -1 in (5), the instance was put() to Merger in (6), then the Merger was released in (7) and the process terminates in (8) by returning from the function.

If there wasn't interrupt, the read value is set to val in (9), and the instance was put to the Merger in (10). After the loop finishes the execution, the Merger was released in (11) and the process terminates.

Other parts are similar to normal GTK+ programs.

By doing this, every time the workers put the value, the label string will be changed. In addition, this sample program also provides a button that changes the label string, and a button that cancels the processes.

The use of the helper library can easily implement communication with multiple processes by defining a class that inherits from Callback type which includes the callback function. You can use multiple callback types simultaneously.

Reference

Here are the reference of the functions and classes which this library provides.

Global function

In order to use the global functions, you need to include libpards.h. The library itself is compiled as libpards.a.

pards_init

The function for initialization. This is defined as "void pards_init(int ap = NUM_OF_PROCS, unsigned bytes = NALLOC_BYTES);". The first and second argument can be omitted.

The first argument specifies the number of processes used in the library. If the number of currently working processes that is forked by SPAWN is already this number, and new process is forked by SPAWN, an exception occurs. If SPAWN_F is used in this case, it is executed as a function, not a process. The default number of this is 16.

The second argument specifies the size of the shared memory. In the current implementation, shared memory is allocated from the OS at the initialization, and pards_shmalloc uses the memory in the area. If you need to use large shared memory area, please specify the size here. The default value is 16MB.

(The functionality that re-allocates shared memory if the allocated area at the initialization is used up is implemented actually, but this doesn't work correctly in Linux, so this is disabled. I will explain this later.)

pards_finalize

The function to finalize the use of the library. You need to call this after you finish the usage. It is declared as "void pards_finalize(int wait = DO_WAIT)", but you don't have to specify the argument.

In this function, the allocated shared memory and semaphores is released. Therefore, if you do not call this function, the shared memory and semaphores are left unreleased. You can use the "ipcs" command to see the current status of these resources. If you failed to call the finalize function, please use the "ipcrm" command to release the resources.

SPAWN(func), SPAWN_F(func), ISPAWN(pid, func)

These are macros that fork processes that call function "func".

If more process is tried to be forked than what is specified in pards_init, SPAWN throws an exception. The type of exception is ForkException, which is inherited from PardsException. SPAWN_F executes "func" as a function, instead of throwing an exception.

ISPAWN sets process id of the created process to "pid". Here, you can directly specify pid, not the pointer of it (because ISPAWN is implemented as a macro). The type of pid is pards_pid. In the current implementation, it is the same as pid_t, but I used different type for the future extension.

pards_shmalloc, pards_shmfree

They allocate and free shared memory. They are declared as " void* pards_shmalloc(unsigned)" and , "void pards_shmfree(void*)". The argument of pards_shmaloc is the byte size of allocating memory. If the memory couldn't allocated, it returns 0. The argument of pards_shmfree should be the value that is returned by pards_shmalloc. The algorithm of alloc and free is what is described in K&R.

pards_get_nprocs

It returns the number of currently available CPUs. It is declared as int pards_get_nprocs(). This function is available on Linux (sysconf(_SC_NPROCESSORS_ONLN)), HP-UX (mpctl(MPC_GETNUMSPUS,NULL,NULL)), and FreeBSD/Mac OS X(sysctl). On other systems. it returns 1.

pards_avail_processes

It returns number of currently available processes. This value is what is specified in pards_init() at first, and decreases when a process is SPAWNed. It increases when the SPAWNed process exits.

pards_set_error_level

It controls the level of error messages that the library outputs. It is declared as int pards_set_error_level(int). The argument can be DBG, INFO, CRITICAL, FATAL, or NO_PRINT (They are defined in the header file.) The kind of messages decreases in this order. In the case of NO_PRINT, no messages is output. Please use this function if you want to disable error messages in the release version of your program.

The return value of this function is current level. The current level can be got by pards_get_error_level() also.

void pards_send_interrupt(pards_pid)

This function sends interrupt to the process specified as the pid of the argument. The pid can be got by ISPAWN.

The process that received the interrupt escapes from "blocking" if the process is in the timed/intr method. In addition, since this is implemented using signal, if the process is in a blocking system call, it also escapes from it. In this case, the return value is usually -1, and the errno is EINTR (it depends on the kind of the system call).

int pards_is_interrupted()

This returns if the process received interrupt(s). If received, it returns 1, otherwise returns 0.

void pards_clear_interruption()

The status of interrupt receipt is cleared; pards_is_interrupted() returns 0 until receiving a next interrupt.

PBInfo* pards_begin_parallel(int)

You need to include "parblock.h" to use this function. This function enables to write an SPMD style parallel program.

It forks processes, whose number is specified in the argument. All processes execute the same program until pards_end_parallel. Each process can do different work by using the process number.

The returned PBInfo type variable stores the state of the parallel block. You can get the process number using getpno() member function of the class. This process number starts from 0 and ends at the number specified as the argument of pards_begin_parallel - 1.

void pards_end_parallel(PBInfo*)

You need to include "parblock.h" to use this function. The argument of this function is what is returned by pards_begin_parallel. By calling this function, the calling process waits for all other processes to execute this function, and finishes the parallel block.

void pards_barrier(PBInfo*)

You need to include "parblock.h" to use this function. The argument of this function is what is returned by pards_begin_parallel. By calling this function, the calling process waits for all other processes to execute this function.

Sync

You need to include "Sync.h" to use this class, and you need to link with libpards.a (libpards.h will be included from Sync.h). In addition, definition of the member functions are written in Sync.h, since this is a class template.

Constructor

You don't need to specify arguments to the constructor. In the constructor, shared memory area and semaphores for synchronization will be allocated. In addition, if you create the instance using new instead of allocating on the stack, the memory for Sync<T> itself will also be allocated on shared memory area.

Destructor

If you allocate the instance on the stack, you need to call free() (which will be described later) in order to release shared memory area and semaphores allocated in the constructor. The free() function should be called only by one process among the processes that share the instance.

If you allocate the instance using new, free() will be called from delete. Since the instance is on shared memory area, delete should be called only by one process among the processes that share the instance.

void write(T val)

This member function assigns a value to the variable. This makes the processes waiting for the value by read() resume the execution. If the value is already set, the value is not changed.

T read()

This member function get the value from the variable. If the value is not set, the call is blocked.

T timedread(struct timeval *, pards_status*)

This is similar to read(), but it does not block more than the time specified in the first argument. After the specified time, this function escapes from blocking, and sets TIMEOUT to pards_status. If the process is interrupted during the blocking, INTERRUPT is set to pards_status. If the function succeeds, SUCCESS is set.

T intrread(pards_status*)

This is similar to read(), but if the process is interrupted during the blocking, INTERRUPT is set to pards_status. Otherwise, SUCCESS is set.

void free()

This releases shared memory area and semaphores allocated for the instance. This function should be called only by one process among the processes that share the instance.

SyncList

You need to include SyncList.h to use this class. The usage is similar to Sync. Since this class is inherited from Sync, you can use write, read, and free.

Constructor

The constructor is similar to that of Sync. However, when adding a cell to the list after forking a new process using SPAWN, you can't use instances that is allocated on the stack. This is because the cell cannot be accessed from other processes. You need to use new instead of allocating the cell on the stack; the cell created by new is allocated in the shared memory area.

void writecdr(SyncList<T>*)

This member function add a cell to the cdr of the cell. This makes the process waiting for the cdr by readcdr() resume the execution. If the cdr is already set, it is not changed.

SyncList<T> *readcdr()

This member function get the cdr of the cell. If it is not set, the call is blocked.

SyncList<T> *timedreadcdr(struct timeval*, pards_status*)

This is similar to readcdr(), but it does not block more than the time specified in the first argument. After the specified time, this function escapes from blocking, and sets TIMEOUT to pards_status. If the process is interrupted during the blocking, INTERRUPT is set to pards_status. If the function succeeds, SUCCESS is set.

SyncList<T> *intrreadcdr(pards_status*)

This is similar to readcdr(), but if the process is interrupted during the blocking, INTERRUPT is set to pards_status. Otherwise, SUCCESS is set.

SyncList<T> *release()

This member function gets the cdr, then the cell ("this") is deleted, and the cdr is returned.

SyncList<T> *timedrelease(struct timeval*, pards_status*)

This is similar to release(), but it does not block more than the time specified in the first argument. After the specified time, this function escapes from blocking, and sets TIMEOUT to pards_status. If the process is interrupted during the blocking, INTERRUPT is set to pards_status. If the function succeeds, SUCCESS is set.

SyncList<T> *intrrelease(pards_status*)

This is similar to release(), but if the process is interrupted during the blocking, INTERRUPT is set to pards_status. Otherwise, SUCCESS is set.

SyncList<T> *create()

It allocates a new SyncList<T> variable, then it is set to the cdr of the cell ("this"), and the newly created cell is returned.

SyncQueue

You need to include SyncQueue.h to use this class. The usage is similar to Sync, and SyncList. Since this class is inherited from SyncList, you can use write, read, free., writecdr, readcdr, release, and create.

However, you cannot set the same SyncQueue<T> as the multiple queues' cdr.

Unlike SyncList, writecdr and create blocks if the queue size the maximum size that is specified in the constructor. (To be exact, it is the number of writecdr/create call.) When the queue size is reduced, it resumes from the blocking.

Constructor

The constructor is similar to that of SyncList. However, you need to specify the maximum size of the Queue (the number of writecdr/create call).

void timedwritecdr(SyncQueue<T>*, struct timeval*, pards_status*)

This is similar to writecdr(), but it does not block more than the time specified in the first argument. After the specified time, this function escapes from blocking, and sets TIMEOUT to pards_status. If the process is interrupted during the blocking, INTERRUPT is set to pards_status. If the function succeeds, SUCCESS is set.

void intrwritecdr(SyncQueue<T>*, pards_status*)

This is similar to writecdr(), but if the process is interrupted during the blocking, INTERRUPT is set to pards_status. Otherwise, SUCCESS is set.

SyncQueue<T> *timedcreate(struct timeval*, pards_status*)

This is similar to create(), but it does not block more than the time specified in the first argument. After the specified time, this function escapes from blocking, and sets TIMEOUT to pards_status. If the process is interrupted during the blocking, INTERRUPT is set to pards_status. If the function succeeds, SUCCESS is set.

SyncQueue<T> *intrcreate(pards_status*)

This is similar to create(), but if the process is interrupted during the blocking, INTERRUPT is set to pards_status. Otherwise, SUCCESS is set.

WorkPool

You need to include Workpool.h to use this class. This class is used together with SyncList or SyncQueue. If you use SyncQueue, you need to include SyncQueue.h.

Constructor

The first argument of the constructor is the list of "work", whose type is SyncQueue<T1>*, or SyncList<T1>*. The second argument is the list of output, whose type is SyncQueue<T2>*, or SyncList<T2>*.

If you want to release the resources that is used by WorkPool automatically after all workers call release(), you need to specify the number of workers that refer the instance as the third argument. You can omit this argument.

Destructor

This is similar to that of Sync. That is, if you allocate the instance on the stack, you need to call free() in order to release shared memory area and semaphores allocated in the constructor. The free() function should be called only by one process among the processes that share the instance.

If you allocate the instance using new, free() will be called from delete. Since the instance is on shared memory area, delete should be called only by one process among the processes that share the instance. If the number of workers is specified in the argument of the constructor, the resource is released after all the workers call release().

WorkItem<T1, T2> getwork()

This returns WorkItem<T1, T2>, which is a pair of work and output. Here, WorkItem<T1, T2> has "work" whose type is T1, and "output" whose type is SyncList* as member variables.

By calling getwork(), release() of the list cell for the work is called to release the cell. In addition, create() of the list cell for output is called to add a new cell.

If the list for work is terminated as 0, the member variable "output" of WorkItem<T1, T2> is 0 (the value of "work" is undefined). You can check the termination using this.

release()

Calling release() tells the system that the number of reference of the variable is decreased by 1. If the number of release() call reaches the number specified in the argument of the constructor, the resource is released.

free()

This member function releases the resources of WorkPool<T1, T2> (shared memory area and semaphores).

Merger

You need to include Merger.h to use this class. This class is used together with SyncList or SyncQueue. If you use SyncQueue, you need to include SyncQueue.h.

Constructor

The first argument of the constructor of Merger<T> is SyncQueue<T>* or SyncList<T>*. This variable the is first cell of the merged list. This cell should be already allocated.

If you want to release the resources that is used by Merger automatically after all processes that share the Merger call release(), you need to specify the number of workers that refer the instance as the second argument. You can omit this argument.

Destructor

This is similar to WorkPool. That is, if you allocate the instance on the stack, you need to call free() in order to release shared memory area and semaphores allocated in the constructor. The free() function should be called only by one process among the processes that share the instance.

If you allocate the instance using new, free() will be called from delete. Since the instance is on shared memory area, delete should be called only by one process among the processes that share the instance. If the number of processes is specified in the argument of the constructor, the resource is released after all the processes call release().

void put(T)

This member function connect the cell given as the argument to the list. Allocation of the cell and setting to the cdr is done in this function.

void timedput(T, struct timeval*,pards_status*)

This is similar to put, but it does not block more than the time specified in the first argument. After the specified time, this function escapes from blocking, and sets TIMEOUT to pards_status. If the process is interrupted during the blocking, INTERRUPT is set to pards_status. If the function succeeds, SUCCESS is set.

void intrput(T, pards_status*)

This is similar to put(), but if the process is interrupted during the blocking, INTERRUPT is set to pards_status. Otherwise, SUCCESS is set.

void increase_referer(int)

This member function increases the number of processes that refer to the Merger. The number to increase is specified in the argument. If the argument is omitted, it is increased by 1.

release()

Calling release() tells the system that the number of reference of the variable is decreased by 1. If the number of release() call reaches the number specified in the argument of the constructor, the resource is released.

free()

This member function releases the resources of Merger (shared memory area and semaphores).

Other note

Signal handling

Currently, signals that terminates the program is caught by the library. After the catch, shared resources (shared memory and semaphores) are released. If there are callback function registered to the signal, they are remembered at pards_init and called after releasing shared resources.

If you want to catch signals by your self, it is easy to register callback functions before calling pards_init. If you register them after calling pards_init, please do not forget to re-register the callback functions that the library registered. Otherwise, shared resources are not released at the time of program termination.

Here, SIGALRM and SIGUSR1 cannot be used by the user program, since they are used by the library. SIGALRM is used for timeout operations (timedread, etc.), and SIGUSR1 is used for interrupt.

SIGCHLD is ignored not to create zombie processes. This works for recent UNIXes, but might not work for old UNIXes.

OS configuration

This library uses shared memory and semaphores that is provided by System V IPC. Usually, maximum size of these resources are set by OS configuration.

For example, the limit of shared memory is written in shmall, shmmax and shmmni in /proc/sys/kernel. Shmall specifies the total page size of shared memory, and shmmax specifies the maximum size of shared memory segment (see man proc). If you want to specify large memory size at pards_init, you might need to change these values.

In addition, there are four values in /proc/sys/kernel/sem; they are maximum number of semaphores in a semaphore group, maximum number of semaphores in the system, maximum number of semaphore operations in a semop call, and maximum number of semaphores id's in the system. This library is affected by the second and fourth value. Especially, fourth value is relatively small by default (128); you might need to increase this.

You can know currently using System V IPC resources by the "ipcs" command (see man ipcs). If the program is terminated abnormally or without calling pards_finalize, you can check if there are any unreleased resources using the ipcs command. In the case of shared memory you can use "ipcrm -m id", in the case of semaphores you can use "ipcrm -s id" to release the resources. The id's can be known by the ipcs command.

#define configuration

You can change the behaviour of the library by changing the #define'd values in libpards.h.

Conclusion

Design policy

The idea of dataflow synchronization is not new. There are a lot of precedents; this library is affected directly by parallel logic languages. (The author did research on a parallel logic languages called Fleng. KL1 and GHC of the fifth generation computer system project would be more famous, I think.)

Future/touch of Lisp is similar. Recently, Java also has similar functionality (FutureTask).

The characteristics of this library is that it does not try to create a new language, but uses C++ and OS functionalities like fork and System V IPC. This makes it easy to parallelize existing programs.

Using fork instead of pthread is advantageous when parallelizing an existing program. It is not easy to check if a function is thread-safe or not when you want to parallelize the function. Calling thread-unsafe function in parallel would result in non-deterministic bugs.

On the other hand, if you create another process using fork, these processes cannot affect each other other than through the shared memory. This is quite important when you debug a parallel program, since you don't have to worry about race conditions between threads.

You may worry about the overhead using fork instead of pthread. However, recent OSes does not copy whole memory when creating a process; copy of a memory page only occurs when there is write on the page (this is called Copy-on-Write). This makes the process creation lighter than you might imagine.

To confirm this, I did some benchmark. I made programs that only create threads/processes and join, and a program that passes a value between processes using this library. It is executed 10000 times. (Programs are in the misc directory.)

Machine pthread (sec) fork (sec) PARDS (sec)
IA64/HP-UX 11.23 (2CPU) 0.72 4.85 5.48
DualCoreXeon/FedoraCore6 (2*2CPU) 0.13 0.61 1.03
Xeon/FedoraCore5 (2CPU) 0.31 1.59 3.09

The amount of overhead depends on OS, but the overhead of the library is less than 10 times. I think it is worth while to pay this amount of overhead to divide address spaces. Of course, programmers should be conscious of the overhead, and make the granularity of parallel execution large enough.

Future work

Future work includes parallelization of other programs, porting to other languages like ruby, and porting to Windows.