49.4. Thread safe queues (CBufferQueue).

Often threads implement a pipeline in which data is passed from a producer thread to a consumer thread. Examples of this pattern are seen in the CCUSBReadout and CVMUSBReadout applications in which an acquisition thread passes buffers of XXUSB data to an output thread which decodes these buffers transforming them into a stream of ring items that are inserted in to a ring buffer.

The CBufferQueue class is a templated class that allows you to build queues that are intended to communicate data from a producer thread to a consumer thread in a thread safe manner. The important methods of the queue are:

queue

Normally called by a producer to queue an element of data for the consumer. If the consumer is blocked waiting for data it is woken.

get

Gets an object from the queue. If there are no objects in the queue, the calling thread is blocked until there are.

getnow

Attempts to get an object from the queue. This method never blocks, instead it indicates there were no objects to get if the queue is empty.

Here are a few usage patterns that build on single buffer queues:

Limited size queue

This is useful to implement back-pressure flow control on a data source that has unbounded length. Implement a queue that contains a limited number of 'free elements', and a second queue that transmits elements from the source to the consumer. The source gets a queue element from the the free queue fills it in and queues it to the sender queue. When the sink has finished processing an element from the sender queue it returns it to the free queue.

If the consumer can't keep up with the source, the source will eventually block on getting an element from the free queue.

Work queue (processing farm)

Multiple consumers can get/put items to a queue. A work queue provides a method for distributing work to multiple threads. As work becomes available to be spun off to the processing farm, it is queued to the work queue. Several threads (the processing farm) get element get elements from the queue. Normally sufficient information is provided in the queue element to parameterize the computation of the worker thread.

Collecting results (farmer for a procesing farm)

Once work has been done by a worker in the previous pattern the results of the work must often be collected by a single thread. A farming queue can have results objects queued to it by many threads (the workers). The collection thread (farmer) would get elements from the collection queue and do whatever is needed with them.

The example below is a simplified version of the VMUSB Reaodut thead's main loop. It shows the sender side of a limited entry queue. Each entry is a VMUSB readout buffer with a header. The code fragment gets a free buffer reads data into it and puts it on a queue for the output thread.

Example 49-6. Sending data to a CBufferQueue


struct DataBuffer {
  uint32_t   s_bufferSize;	// Bytes used in the buffer.
  uint32_t   s_storageSize;     // bytes in s_rawData[].
  uint32_t   s_bufferType;	// Type of buffer.              (1)
  timespec   s_timeStamp;	// When the buffer was received.
  uint16_t   s_rawData[1];	// Really larger than that

};
...
extern CBufferQueue<DataBuffer*>  gFilledBuffers;                  (2)
extern CBufferQueue<DataBuffer*>  gFreeBuffers;        

...
while(1) {
   DataBuffer* pBuffer = gFreeBuffers.get();                      (3)
   ReadFromVMUSB(pBuffer);
   gFilledBugffers.queue(pBuffer);                                (4)
}
...
	
(1)
This struct is what will be pushed around in the buffer queues. The s_rawData element is really much larger, the [1] size is used to allow us to treat the item as an array. Normally it will hold 13Kwords of data.
(2)
These are the queues. By passing pointers around in the queue bulk data movement is avoided. In fact, the queues can carry any object that supports copy construction. The gFilledBuffers carry buffers with data from the reaoudout thread to the output thread. The gFreeBuffers are buffers that are available for new data.
(3)
This line gets a free data buffer. If there are no elements in gFreeBuffers, the thread blocks until the output thread returns some buffers to that queue.
(4)
This line takes the buffer that was filled in the previous line and queues it for the output thread. Calls to queue will only block (and then only for a short time) if the call happens while the consumer is actively pulling an object from the queue.

The next example shows a simplified version of the VMUSBReadout output thread so that you can see what a consumer of data from a buffer queue might look like. The same data structure definitions and queue definitions are used and therefore not shown.

Example 49-7. Consumer of data from a CBufferQueue


...
while(1) {
  DataBuffer* pBuffer = gFilledBuffers.get();    (1)
  processBuffer(pBuffer);
  gFreeBuffers.queue(pBuffer);                   (2)
}
      
(1)
The output thread gets a filled buffer from the queue and processes it.
(2)
Once the buffer is processed, it is put back in the free queue where the readout thread can put new data into it.