Wiki : softeng:parallelization
 

Parallelization

Parallel computing dates back from the 50s but only on the 90s that it became widespread because of commodity clustering (opposed to ultra-expensive parallel computers) with free operating systems installed and the beginning of symmetric multi-processing in commodity hardware. But even much cheaper than the big mainframes, SMP computers and clusters were still too expensive for general computing. This reality changed with the millennium and nowadays every chip is, at least, dual core for desktops, laptops, clusters and SMP machines.

Therefore, parallelization is not anymore an optimization but a fundamental design to be followed and taken into account on every software product. Even the small programs should be considered, maybe not for internal parallelization but to assure that, when running at the same time with different data, it won't conflict with other similar processes (ie. write in the same temporary file).

Concurrent execution and data parallelization

The first level of parallelization is achieved by simple running several copies of the same program over different chunks of data (sometimes in the same data). This can be done by intelligently splitting the data into as many chunks as you can (either by simply cutting the file or rearranging it) and executing as many copies of your program as you have data chunks.

Another way of doing it is to change your program to make it run threads (sometimes called light-weight processes) which have a slightly different process scheduling and can share information a bit easier. Most programming languages have their thread libraries ans it's fairly easy to implement it.

But unfortunately it's not as simple as it may seem. Parallelization brings its own problems and most of the time they're far worse than the benefits. Concurrent access, where two or more processes try to write on the same place (file or variable) at the same time and multiple copies management, where you store a local copy of the same data and must synchronize it to keep it up-to-date with others are the biggest problems for simple parallel solutions.

Also keep in mind that you can't run much more processes than the number of processors you have or the effect will be negative. If you're running local in your multi-core machine or in a multi-machine cluster you must account for all available processors you have and spawn the correct number of jobs to fit your infra-structure.

The easiest way to split parallel execution is by using automation scripts like Makefile, Ant or SCons. You can execute them with a parameter telling the maximum number of jobs to spawn in any given time and it'll automatically take care of the dependencies provided you have programmed it to. One example of Makefile:

# Numeric list, 0-9
NUMBERS=0 1 2 3 4 5 6 7 8 9
# make chunk0 chunk1 ...
CHUNKS=$(NUMBERS:%=chunk%) 

# Main rule, depends on others
run: split_data run_processes check
  echo "Run done"

# Split the data, creating chunk0 chunk1 ...
split_data:
  echo "splitting data"
  split data -chunks 10

# This rule *must* depend on split_data or it'll start concurrently
run_processes: split_data $(CHUNKS:=.log)

# This rule will execute 10 times, one for each chunk
chunk%.log: chunk%
  echo "running program over $<"
  my_program $< > $@

# Also needs to depend on run_processes or it'll check nothing
check: run_processes
  echo "checking results"
  grep -c ERROR *.log

In the code above, when you run make with 10 parallel processes it'll expand all rules (ie. chunk%.log becomes the list “chunk0.log chunk1.log …”) and create a dependency graph. It'll start executing every task which dependency was already met and there is process to do it, so even if there are tasks waiting but the number of processes you've asked for is full it'll wait.

In a dual-core machine you would have to run it with only two processes:

$ make -j2
splitting data
...
running program over chunk0
running program over chunk1
...
running program over chunk2
running program over chunk3
...
running program over chunk4
running program over chunk5
...
running program over chunk6
running program over chunk7
...
running program over chunk8
running program over chunk9
...
checking results

As you can see, the splitting data part is running alone, because all other parts depend on it and won't start before it finishes. That's the biggest power of those automation scripts and why they're so widely used for parallelization.

Threads

There are some times when parallelization is not that easy and only by splitting the data file is not enough. Some other times when you want a bit more of control over what's going on with the data for instance when one part of the program needs to access another part while doing some computation.

For that you need to use threads but again, this power comes with a big problem. It's quite difficult (if possible) to migrate threads among different machines. So you'll end up locked on a few number of processors (even octal quad-core machines still have only 32 processors) so that must be used when the problem is not that big or in a local solution inside a broader algorithm.

In a multi-threaded environment you can share some global data between threads (taking special care with the conflicts mentioned earlier) adn you can also spawn different functions on each thread doing different things at the same time. It's much easier to manage this asynchronous execution using threads than using processes as described above. For the later you'd have to use Inter-Process Communication or Message Passing Interface, what would complicate things a bit.

To understand a bit better how threads deal with global data, check this example:

int global_test;

// My thread, changes the value of a global variable
void my_thread () {
  global_test = 10;
}

int main () {
  // Zero the variable
  global_test = 0;

  // Define the thread
  thread child(&my_thread);

  // The variable is still zero
  ASSERT( global_test == 0 );

  // Start the thread
  child.join();
  child.wait();

  // The variable changed to 10
  ASSERT( global_test == 10 );

  return 0;
}

This program have two main lines of execution: main and my_thread where main call my_thread to execute in parallel. Note that main is also running at the same time as my_thread, this is why there is the wait call, so we can be sure that the value of the global variable will be correct at the second ASSERT.

Now, what would happen if you had two threads trying to change the same variable? If both threads add a value, say 10 and 20, you expect the final result to be 30, but if both get the same initial value (0) and add their respective values (10 and 20) they'll hold two different values for the same variable and the last one to write will dictate the final value, which will be either 10 or 20 but never 30. For that you need locks:

int global_test;
mutex lock;

// My thread, changes the value of a global variable
void my_thread1 () {
  // If already locked, wait
  while (mutex_locked(lock)) sleep 1;
  // Else lock
  lock_mutex(lock);

  // your code here
  global_test += 10;

  // Release the lock
  release_mutex(lock);
}

void my_thread2 () {
  // If already locked, wait
  while (mutex_locked(lock)) sleep 1;
  // Else lock
  lock_mutex(lock);

  // your code here
  global_test += 20;

  // Release the lock
  release_mutex(lock);
}

int main () {
  global_test = 0;

  // Define the threads
  thread child1(&my_thread1);
  thread child2(&my_thread2);

  // The variable is still zero
  ASSERT( global_test == 0 );

  // Start the threads
  child1.join();
  child2.join();
  child1.wait();
  child2.wait();

  // The variable changed to 30
  ASSERT( global_test == 30 );

  return 0;
}

This way, no matter which order they start and how long they run, the results will always be the same. Of course, every problem is different requiring a different approach, more locks, semaphores and signals, but that's the basic of threading. Now you know how to share data in a controlled way using threads.

Clusters and Grids

Now that you understand A bit more of splitting jobs, lets jump to the next hop: clusters. A cluster is nothing more than a bunch of computers together in the same network organized by a set of programs. The programs can be job dispatchers, which just fire jobs on different machines and gather the results later, or message passing interfaces, which also dispatch jobs but of a very distinctive form, containing specialized code to execute in that environment. There are also a whole bunch of other administration programs but those are not useful to us now.

Job dispatcher

The job dispatcher simply connects to the remote machine and execute the job you asked for, wait for it to finish and gather any output created and store it in a local file or mail it to you at the end. Job dispatcher usually don't care about filesystems, so you must assure your programs that the directory you requested is accessible throughout every node in the cluster. It's common to setup a network filesystem or a parallel virtual filesystem for that and the sysadmins should take care of that to you.

Some job dispatchers like the Sun Grid Engine (SGE) require you to submit the jobs by using scripts:

$ qsub my_script.sh

Other job dispatchers like Platform LSF enable you to submit whatever valid command line you want:

$ bsub ls -l

Both of them will allow you to monitor your jobs and even kill them if they become wild. But normally you wouldn't want to care how many jobs are running, as long as they finish well and do their job. At the end you should receive an email saying how well did your job go and what was the output (or where to find the file).

Once you submit the job, the scheduler will define which machine is less busy to receive and execute it. Machines with more processors can cope with more jobs, but some jobs might also spawn some threads internally and the scheduler must understand it and monitor how much CPU the nodes have left in order to know how many jobs it still can handle.

There are two types of dispatch: interactive when you wait until the job is finished and background when you fire and forget, getting the console prompt back again. The former will block any further execution until the job is finished so it's required when the sequence is important (B can't run before or together with A). The later can be used for spawning the master process that will spawn smaller jobs (the parallel execution) so you don't break your process if your console dies or your machine reboots.

Because it's very simple to submit you can easily embed it in your Makefiles to spawn the job instead of directly executing it in one machine. Let's revisit the previous Makefile example:

# Numeric list, 0-9
NUMBERS=0 1 2 3 4 5 6 7 8 9

# make chunk0 chunk1 ...
CHUNKS=$(NUMBERS:%=chunk%) 

# Dispatch (-K is for interactive)
DISPATCH=bsub -q fast_queue -K

# Main rule, depends on others
run: split_data run_processes check
  echo "Run done"

# Split the data, creating chunk0 chunk1 ...
split_data:
  echo "splitting data"
  $(DISPATCH) split data -chunks 10

# This rule *must* depend on split_data or it'll start concurrently
run_processes: split_data $(CHUNKS:=.log)

# This rule will execute 10 times, one for each chunk
chunk%.log: chunk%
  echo "running program over $<"
  $(DISPATCH) my_program $< > $@

# Also needs to depend on run_processes or it'll check nothing
check: run_processes
  echo "checking results"
  $(DISPATCH) grep -c ERROR *.log

Note that I had to use the interactive dispatch for all jobs (except for the echos) because if I had used the fire-and-forget the dependencies would break, ie. the command (submitting a job) would return before the job was finished. On the other hand, when calling the Makefile you should run without the interactive argument, so you can monitor from any machine in the cluster.

$ bsub -q fast_queue make -j2
Job <1233> is submitted to queue <fast_queue>.

$ bpeek
Job <1234> is submitted to queue <fast_queue>.
<<Waiting for dispatch ...>>
splitting data
<<Job is finished>>

Job <1235> is submitted to queue <fast_queue>.
<<Waiting for dispatch ...>>
Job <1236> is submitted to queue <fast_queue>.
<<Waiting for dispatch ...>>
running program over chunk0
running program over chunk1
<<Job is finished>>
<<Job is finished>>

(...)

Job <1243> is submitted to queue <fast_queue>.
<<Waiting for dispatch ...>>
Job <1244> is submitted to queue <fast_queue>.
<<Waiting for dispatch ...>>
running program over chunk8
running program over chunk9
<<Job is finished>>
<<Job is finished>>

Job <1245> is submitted to queue <fast_queue>.
<<Waiting for dispatch ...>>
checking results
<<Job is finished>>

Note that, just after submitting the make job you get the confirmation and the console prompt again. When you inspect the output with bpeek you see your jobs being submitted two by two, as you requested. See that you didn't have to change anything in the cluster program, Makefile itself manages the whole parallelization scheme. If you had chosen make -j 10 they'd all go at the same time and (providing you had 10 idle CPUs in your cluster) would have finished in roughly 1/10th of the time.

Message Passing Interface

In a complete different direction, you can also use message passing interfaces such as PVM (Parallel Virtual Machine, old) and MPI (current). MPI have many implementations such as MPICH2, LAM/MPI, OpenMPI, mpijava, PyMPI and even some hardware implementations, but the current standard is OpenMPI.

With MPI, instead of only dispatching jobs, you can control tightly what every piece of your software is doing, how they're communicating with each other, spawning other jobs, threads, opening clients and servers and so on. Like job dispatchers, the MPI infrastructure controls all jobs and resources in the cluster.

Instead of statically stating that you want 10 simultaneous jobs and wait for more nodes as resources become available, you can pre-calculate the size of your available cluster or even grow as resources become available and shrink when you don't need them anymore. You also get an important reliable communication channel between all parts of your program, multicast, broadcast, unicast, gather, reduce, sending whole structures, flags etc. You can do virtually anything you want with MPI.

Being able to send messages during the execution opens a new set of solutions to complex problems. You can resolve matrix manipulations where each area of the matrix is dealt by one node and they send informations one to another to calculate what you need, you can read different files in different paces and send from one process to the other only the digest that the other needs, and so on.

The problem, again, is complexity. Because you have to code all this, MPI programs tend to be much more complex than normal single-threaded programs. But not all problems can be easily parallelized and the Makefile solution is not an option. For those problems, if you try to parallelize using Makefiles and regular inter-process communication, the code can be much more complex and unreliable than MPI, and that's where the true power of MPI is.

As an example, see the code below to synchronize the clock between two machines:

#include <stdio.h>
#include <mpi.h>
#include <sys/time.h>

double to_double (struct timeval this) {
    return ((double)(this.tv_sec) + (double)(this.tv_usec)/1e6);
}

int main (int argc, char **argv) {

    int myrank, size;
    MPI_Status status;
    struct timeval before, after;
    double local_time = 0.0, remote_time = 0.0;
    // data to be exchanged
    double resp = 0.0; // the local time of the remote clock
    int flag = 0; // the advice to get localtime

    /* Initialize MPI */
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    if(myrank == 0){
        /* this is the local process, with the local clock */
        gettimeofday(&before, 0);
        MPI_Send( &flag, 1, MPI_INT, 1, 17, MPI_COMM_WORLD );
        MPI_Recv( &resp, 1, MPI_DOUBLE, 1, 17, MPI_COMM_WORLD, &status );
        gettimeofday(&after, 0);
        printf("DEBUG: %lf + (%lf - %lf) / 2\n", to_double(before), to_double(after), to_double(before));
        local_time = (to_double(before)+((to_double(after)-to_double(before))/2));
        remote_time = resp;

        printf(" Local time %lf\nRemote time %lf\n Difference %lf\n", local_time, remote_time, (local_time - remote_time));
    } else if (myrank == 1) {
        /* this is the remote process, with the remote clock */
        MPI_Recv( &flag, 1, MPI_INT, 0, 17, MPI_COMM_WORLD, &status );
        gettimeofday(&before, 0);
        remote_time = to_double(before);
        MPI_Send( &remote_time, 1, MPI_DOUBLE, 0, 17, MPI_COMM_WORLD );
    }

    /* Terminate MPI */
    MPI_Finalize();
    return 0;
}

As you can see, it's not very simple. It have the first initialization part, where it says: “Hey, I'm an MPI code, spawn my jobs across the cluster, please”.

    /* Initialize MPI */
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

Than, you have the variable myrank that will tell you in which node you are (from 0 to N-1 being 0 the master process and N the total number of processes). One of the ways to separate the code between local machine (master process machine) and all the others is to check this variable:

    if(myrank == 0){
        /* this is the local process, with the local clock */
        (...)
    } else if (myrank == 1) {
        /* this is the remote process, with the remote clock */
        (...)
    }

In this case we're ignoring all other nodes as this clock synchronization only occurs between two machines. Everything that is inside the if will run only on local machine and everything inside the else will run on the remote. The rest will run on both. To terminate, we call:

    /* Terminate MPI */
    MPI_Finalize();

Now, about the message passing. The server (master node) will send the signal to the client get its time and synchronously (blocking) wait for the client response:

        gettimeofday(&before, 0);
        MPI_Send( &flag, 1, MPI_INT, 1, 17, MPI_COMM_WORLD );
        MPI_Recv( &resp, 1, MPI_DOUBLE, 1, 17, MPI_COMM_WORLD, &status );
        gettimeofday(&after, 0);

While the client will receive the go-ahead, get the time and respond:

        MPI_Recv( &flag, 1, MPI_INT, 0, 17, MPI_COMM_WORLD, &status );
        gettimeofday(&before, 0);
        remote_time = to_double(before);
        MPI_Send( &remote_time, 1, MPI_DOUBLE, 0, 17, MPI_COMM_WORLD );

This is rarely used in real MPI applications but illustrates a bit how the messages are passed and what types of messages you can send. There is plenty more and you should consult a proper MPI website to learn more.



 
softeng/parallelization.txt · Last modified: 05 09 2007 19:15 (external edit)
 
Recent changes RSS feed Creative Commons License Driven by DokuWiki