#include <CRingItemZMQDataSource.h> class CRingItemZMQSourceElement : public CDataSourceElement { public: typedef struct _Message { uint64_t s_timestamp; size_t s_nBytes; void* s_pData; } Message, *pMessage; public: CRingItemZMQSourceElement( const char* ringUri, const char* routerUri, size_t chunkSize=1 ); virtual ~CRingItemZMQSourceElement() {} virtual void operator()(); // Override b/c process frees memory. virtual void process(void* pData, size_t nBytes); };
A fanout data source which extracts a timestamp
from ring items and prepends it to work items.
This is meant to work with parallel workers that
must then re-sort the data in timestamp order
using e.g. CRingItemSorter
(3daq).
See CRingItemMarkingWorker
for one example of a worker that can make use of the
data from this source.
Each work item consist of several ring items determined
by the chunkSize
at construction
time. Ring items come from
ringUri
which is either a
tcp: or file:
URI specifying a source of ring items. The
routerUri
provides the endpoint
of a ZMQ Router fanout transport that will be used
to send work units to worker processes.
Each work item consists of a block of items Each of those items in the block contains the following fields
s_timestamp
The timestamp either extracted from the ring item or a sensibly assigned time-stamp that will allow a sorting stage to reconstruct (for the most part) the original order of the ring items. For the most part means that there can be cases when non Physics items will come slightly out of order due to the manner in which they were assigned timestamps.
s_nBytes
The number of bytes of data in the ring item that follows.
data[]
The ring item.
Here's a code fragment that shows
how to process a work item from this data source.
On entry to the fragment, pData
points
to the work item and dataSize
is the full size of the fragment.
#include <DataFormat.h> #include <stdint.h> struct Item { uint64_t s_timestamp; uint32_t s_ringItemSize; uint8_t s_data[]; }; ... uint8_t* p = static_cast<pData>(pData); while (dataSize) { Item* pItem = reinterpret_cast<Item*>(p); uint64_t stamp = pItem->s_timestamp; pRingItem pR = reinterpret_cast<pRingItem>(pItem->s_data); processTheRingItem(pR, stamp); size_t nItemSize = sizeof(uint64_t) + sizeof(uint32_t) + pItem->s_ringItemSize; p += nItemSize; dataSize -= nItemSize; } free(pData);