EventEditor

Name

EventEditor -- Edit event fragment bodies.

Synopsis

$DAQBIN/EventEditor options...

DESCRIPTION

This program provides the ability to edit the bodies of fragments of event built data. The body of a fragment is, in this case, considered to be the body of the ring item rather than the full body which contains the ring item header and optional body hedaer.

The program recomputes the size of the body and properly updates both the ring item header and the fragment header size fields prior to outputting the event.

The program relies on user code located in a shared library whose path is passed to the program at run-time.

OPTIONS

--help

Outputs a brief summary of the program options and exits. All other options are ignored. See --version below, however.

--version

Prints out the program name and version. All other options are ignored. If --help and --version are both specified, only the first of those optiones is acted on and all other options are ignored.

--source URI

Specifies the data source. The data source can specify a ring buffer (local or remote) or a file. This option is mandatory.

--sink URI

Specifies the data sink, that is where the edited data are written. The URI can specify either a file or a ring buffer. If a ring buffer is specified, the host name must either be localhost or resolve to the local computer.

This option is mandatory.

--workers integer

Specifies the number of worker threads/processes to start. Workers are handed chunks of events to operate on in parallel. In addition to the workers there are three more threads/processes:

An input thread/process reads data from the data source and distributes chunks of complete ring items to workers on demand. This distribution of events is load balanced and uses a pull application level protocol.

The workers send data to a sorting thread/process which re-orders the data by increasing time-stamp.

The sort thread/process sends data to an ouptut thread/process which writes data chunks to the data sink.

Thus processing is a four stage pipeline. Each stage running in parallel while the second stage consists of a farm of workers which work in parallel on segments (clumps) of input data to produce the output data.

This option is mandatory.

--clump-size integer

The value of this option is the number of ring items each worker gets in response to a data pull. Larger clump sizes result in better amortizatino of the communications overhead. Smaller clumps result in better responsiveness at the program's output since the sorter needs to see contributions from all workers before it can sort.

This option is optional and defaults to 1 which isn't good for most cases.

--parallel-strategy threaded | mpi

Specifies which parallelization library to use. Threaded parallelism runs all processing elements in a single machine and is suitable for use in high core count systems. MPI parallelism uses OpenMPI for messaging and, using mpirun to start the program, allows you to distribute the computation across several networked nodes.

When using mpi parallelism, the number of processes specified in the mpirun -np flag must be at least 4. The actual number of workers is computed from -np rather than --workers and is --np - 3. If this value is not consistent with the value specified by --workers a warning is emitted to the stderr of the rank 0 process.

This option is optional and defaults to threaded.

--editorlib file-path

Specifes the path of the shared library that contains the user code that edits the event. See USER CODE below for more information as well as EXAMPLE.

Note that if the library is not in the default library load path, you will need to ensure that your path specification includes enough information to describe the directory e.g. ./libMylib.so as opposed to libMylib.so.

USER CODE

User code is built into a shared library that must provide:

A key data structure that we will be using is a CBuiltRingItemEditor::BodySegment. Objects of this type are used to describe chunks of the output event body. This structure allows in-place editing with minimum data movements. That is body segments can be chunks of the original event as well as new data.

The CBuiltRingItemEditor::BodySegment is a struct with the following fields:

bool s_isDynamic

Specifies if the data described requires deletion once the event has been emitted. The worker process will operate on all events in a chunk before emitting the edited events in that chunk. If you are adding data to an event that is unique to that event that data must be dynamically allocated and this field in its descriptor must be true.

iovec s_description

Describes a chunk of data. This is a struct as well described in the man pages for writev. It has a iov_base which points to the data in the chunk and a iov_len which contains the number of bytes in that chunk.

The job of the editing class is to produce a vector of CBuiltRingItemEditor::BodySegment objects that describe the data in the edited body.

Let's look at the CBuiltRingItemEditor::BodyEditor class definition and its methods.


class BodyEditor {
public:
    virtual std::vector<BodySegment> operator()(
        pRingItemHeader pHdr, pBodyHeader hdr, size_t bodySize,
        void* pBody
    )  = 0;
    virtual void free(iovec& item) = 0;
};

            

Note that the pRingItemHeader and pBodyHeader types are defined in DataFormat.h and described in the reference section 3daq.

operator() is called for each fragment of each event. pHdr points to the ring item header of that fragment (fragments are assumed to be ring items). hdr points to the body header of the event. This program requires that event fragments have body headers (as pretty much all do now). Finally bodySize are the number of bytes in the body of the fragment ring item and pBody points to that body

operator() returns a vector of body segment descriptors that describe what the new body will look like. The method normally should not modify the ring item and body headers. The caller will perform any modifications needed for the new size of the body. Here's an example of code that returns the original body un-modified:


...
{
    std::vector<BodySegment> result;
    BodySegment body(bodySize, pBody);
    result.push_back(body);
    return result;
    
}
            

This is a valid though uninteresting example.

When producing data to go in the body, that data must typically be dynamically allocated. This is because the entire clump of ring items, and fragments in each event ring item are processed before passing the resulting data to the sorting thread/process. Here's sample code to insert a uint32_t 0xa5a5a5a5 at the beginning of the fragment and a 0x5a5a5a5a after the existing body:


{
    std::vector<BodySegment> result;
    uint32_t* pHeader = new uint32_t(0xa5a5a5a5);
    uint32_t* pTrailer= new uint32_t(0x5a5a5a5a);
    BodySegment descHeader(sizeof(uint32_t), pHeader, true);
    BodySegment body(bodySize, pBody);
    BodySegment descTrailer(sizeof(uint32_t), pTrailer, true);
    
    result.push_back(descHeader);
    result.push_back(body);
    result.push_back(descTrailer);
    return result;
}
            

The key point to get here is that it's not necessary to open up space for the header by sliding the original data down or any other data movement. Just create the data for the header, and trailer and provide descriptors. Note that the is_isDynamic field for those items is true.

After events have been emitted, the event editor goes through the set of descriptors for that event and invokes free for each dynamically allocated segment. This method is passed the s_description field of that description. That method should dispose of the dynamic storage associated with that descriptor. For our previous example:


{
    uint32_t* pInt = static_cast<uint32_t*>(item.iov_base);
    delete pInt;
}
            

EXAMPLE

Let's put together the code in the previous section that inserts headers and trailers in each fragment into a complete example.


#include <CBuiltRingItemEditor.h>   (1)
#include <DataFormat.h>

class MyEditor : public CBuiltRingItemEditor::BodyEditor
{
    virtual std::vector<BodySegment> operator()(
            pRingItemHeader pHdr, pBodyHeader hdr,    (2)
            size_t bodySize, void* pBody
    );
    virtual void free(iovec& item);
};
virtual std::vector<BodySegment>
MyEditor::operator()(                       (3)
    pRingItemHeader pHdr, pBodyHeader hdr,
    size_t bodySize, void* pBody
)
{
    std::vector<BodySegment> result;   (4)
    uint32_t* pHeader = new uint32_t(0xa5a5a5a5); (5)
    uint32_t* pTrailer = new uint32_t(0x5a5a5a5a);
    
    BodySegment hdr(sizeof(uint32_t), pHeader, true);
    BodySegment body(bodySize, pBody);        (6)
    BodySegment trailer(sizeof(uint32_t), pTrailer, true);
    
    result.push_back(hdr);
    result.push_back(body);                   (7)
    result.push_back(trailer);
    return result;
}
void
MyEditor::free(iovec& item)             (8)
{
    uint32_t* p = static_cast<uint32_t>(item.iov_base);
    delete p;
}


exern "C" {
    CBuiltRingItemEditor::BodyEditor* createEditor()
    {
        return new MyEditor;                 (9)
    }
}
            

(1)
These are the minimal set of headers needed to write editors. CBuiltRingItemEditor.h provides class definitions for the editor including the bsae class for MyEditor below.
(2)
This is the class definition for the editing class we're providing. Note how it derives from CBuiltRingItemEitor::BodyEditor and promises to supply implementation of its base class's pure virtual methods.
(3)
Here's where we implement the function call operator. See the detailed comments below.
(4)
This declares our method's result. A vector of body segment descriptors. In our case we'll need three of them. One for our header word, one for the original body and one for the trailer word.
(5)
The new items we'll add to the body must remain in scope for an extended period of time (from now until free is called for them). Therefore even though they are simple uin32_t objects they must be new'd into existence and initialized.
(6)
This section of code builds the three descriptors we need. Note how the ones that describe the header and trailer are marked as dynamic while the original body isn't.
(7)
The descriptors are pushed into the return vector in the order in which we want to see them in the final, edited, event. The resulting vector is returned as the method's value.
(8)
Our implementation of free is invoked for each header and trailer. We cast the pointer to the data to a uint32_t pointer and delete. Note that if you might have had several types of data you're adding to the event, you may need to use iov_len to untangle which to delete - or better yet, make them all derive from a common base class with virtual destructors. You can then cast to that base class and delete.
(9)
The last thing the EventEditor program needs is a way to make the editor objects it will call in each worker thread. The factory function createEditor provides that. When the shared library is loaded, this entry point is located and invoked to return Body Editor pointers to brand new body editors.