CRingItemZMQSourceElement

Name

CRingItemZMQSourceElement -- Fanout ring items from some data source.

Synopsis


#include <CRingItemZMQDataSource.h>

class CRingItemZMQSourceElement : public CDataSourceElement
{
public:
    typedef struct _Message {
        uint64_t  s_timestamp;
        size_t    s_nBytes;
        void*     s_pData;
    } Message, *pMessage;

    
public:
    CRingItemZMQSourceElement(
        const char* ringUri, const char* routerUri, size_t chunkSize=1
    );
    virtual ~CRingItemZMQSourceElement() {}
    virtual void operator()();             // Override b/c process frees memory.
    virtual void process(void* pData, size_t nBytes);

};
        

DESCRIPTION

A fanout data source which extracts a timestamp from ring items and prepends it to work items. This is meant to work with parallel workers that must then re-sort the data in timestamp order using e.g. CRingItemSorter(3daq).

See CRingItemMarkingWorker for one example of a worker that can make use of the data from this source.

Each work item consist of several ring items determined by the chunkSize at construction time. Ring items come from ringUri which is either a tcp: or file: URI specifying a source of ring items. The routerUri provides the endpoint of a ZMQ Router fanout transport that will be used to send work units to worker processes.

DATA TYPES

Each work item consists of a block of items Each of those items in the block contains the following fields

uint64_t s_timestamp

The timestamp either extracted from the ring item or a sensibly assigned time-stamp that will allow a sorting stage to reconstruct (for the most part) the original order of the ring items. For the most part means that there can be cases when non Physics items will come slightly out of order due to the manner in which they were assigned timestamps.

size_t s_nBytes

The number of bytes of data in the ring item that follows.

uint8_tdata[]

The ring item.

Here's a code fragment that shows how to process a work item from this data source. On entry to the fragment, pData points to the work item and dataSize is the full size of the fragment.


#include <DataFormat.h>
#include <stdint.h>

struct Item {
    uint64_t s_timestamp;
    uint32_t s_ringItemSize;
    uint8_t  s_data[];
};
...

uint8_t* p = static_cast<pData>(pData);
while (dataSize) {
    Item* pItem = reinterpret_cast<Item*>(p);
    uint64_t stamp = pItem->s_timestamp;
    pRingItem pR   = reinterpret_cast<pRingItem>(pItem->s_data);
    
    processTheRingItem(pR, stamp);
    
    size_t nItemSize = sizeof(uint64_t) + sizeof(uint32_t) + pItem->s_ringItemSize;
    p += nItemSize;
    dataSize -= nItemSize;
}
free(pData);