4.3. Serial reader parallel worker example.

In this chapter we'll work our way along a sample MPITcl/SpecTcl application that implements the serial reader parallel worker model. The example will just read a single event file, but we've already described how to extend this to multiple files or run segments.

To review, workers, non rank 0, will run an mpi data getter and an analysis data distributor. Rank 0, the reader, will run a file getter and and mpi distributor.

We'll look at the script in logical chunks. Note that in the NSCL to run this, the TCL Library path must include the SpecTcl library path, /usr/opt/mpitcl/TclLibs and the directory in which your batch SpecTcl loadable package lives.

Example 4-1. Package and definition loads


mpi::mpi execute all {
    package require spectcl
    package require mpispectcl
    package require MyPipeline
    source defs.tcl
}

                

All ranks must have loaded the packages and the analysis definitions (defs.tcl).

Example 4-2. Setting up sources and distirbutors


mpi::mpi execute others {
    mpisource
    analysissink
}


filesource run-0003-00.evt
mpisink
                

The execute others ensures the non rank 0 processes have an mpi source and analyze that data. Since the rank 0 process is running this script, the next two lines ensure that rank 0 takes data from file and distributes it via MPI.

Note that for parallel jobs, the maximum I/O throughput is a function of the blocksize. You may want to chart it for your application so that you can use an optimal rather than the default block size.

Example 4-3. Analyze the file in parallel:


mpi::mpi stopnotifier

mpi::mpi execute others analyze
analyze

                

The wrinkle here is that the mpi distributor needs to have full control over all MPI messaging. Therefore the notifier is stopped during analysis to ensure no MPI messages get queued to the rank 0 event loop rather than received by the mpi distributor. The execute and analyze commands get analysis running in the workers and data transmission running in rank 0.

Example 4-4. Getting and saving the results.


mpi::mpi startnotifier;         # Run the interpreter event pump again.

clear -all;                     # only needed if multiple runs are analyzed.

set l [spectrum -list]
set f [open spectra.dat w]

foreach spectrum $l {
    set name [lindex $spectrum 1]
    getSpectrumFromWorkers $name
    swrite -format ascii $f $name
    flush $f
}

close $f


                

For data exchange to rank 0 we need the event notifier running. This code is pretty simple, For each defined spectrum, getSpectrumFromWorkers is called to sum in the data from each worker into a spectrum, that spectrum is then written to file. When all spectra have been received, the the file is closed.

We've pushed the magic of fetching spectra off into a pair of procs. One of them initiates data transfer and waits, the other handles incoming data. Let's have a look at them.

Example 4-5. Initiating spectrum data collection and waiting for all the workers to report.


proc getSpectrumFromWorkers name {
    mpi::mpi handle [list addData $name]
    set ::expected [mpi::mpi size]
    incr ::expected -1;         # Number of workers.



    set script "mpi::mpi send 0 "
    append script "\["
    append script "scontents $name]"
    mpi::mpi execute others $script
    while {$::expected > 0} {
        vwait ::expected
    }

}

                

This follows the pattern described in the example in the MPITcl chapter. We're going to send a command to the workers to give us the results of scontents on a specific spectrum. Our data handler, addData will sum the data from one worker and decrement ::expected the loop on vwait waits for all data before returning.

One wrinkle needs to be pointed out. We need the spectrum name substituted by our interpreter but the scontents command to be substituted by the target interpreters. The only way I could figure out how to do that was by incrementally building up the script passed to the ohters.

Note also that addData is passed the spectrum name by us.

Let's have a look at the addData proc. It's called from the event loop whenever we have data from a worker:

Example 4-6. Receiving data from a worker:


proc addData {name src data} {
    foreach datum $data {
        set value [lindex $datum end]; #   This is assumed an integer.
        set coords [lrange $datum 0 end-1]
        set current [channel -get $name $coords]

        incr current $value
        channel -set $name $coords $current
    }
    incr ::expected -1;         #  one fewer workers to expect.

}

                

This is pretty simple, the scontents data is a list of pairs (1d spectra) or triplets (2d spectcra). The last element is the channel value while the previous elements are channel coordinates.

The value and coordinates are split out of the each element and the channel -get and channel -set commands are used to add the channel values to the appropriate channel coordinates. Note that once all elements of the scontents list are processed, ::expected is decremented which releases getSpectrumFromWorkers from its vwait.

Finall we exit:

Example 4-7. Exiting parallel SpecTcl


mpi::mpi stopnotifier;     
mpi::mpi execute others exit
exit

                

Note again that MPITcl can segfault if asked toexit while the notifier thread is still running. Therefore it's stopped before asking the non rank 0 processes to exit and exiting ourselves.