CRingBuffer

Name

CRingBuffer -- Low level ring buffer primitives

Synopsis


#include <CRingBuffer>
         
 class CRingBuffer {
}

      
  static void create(std::string name, size_t dataBytes = m_defaultDataSize, size_t maxConsumer = m_defaultMaxConsumers, bool tempConnection = false);
        static CRingBuffer* createAndProduce(std::string name, size_t dataBytes = m_defaultDataSize, size_t maxConsumer = m_defaultMaxConsumers, bool tempConnection = false);
        static void remove(std::string name);
        static void format(std::string name, size_t maxConsumer = m_defaultMaxConsumers);
        static bool isRing(std::string name);
        static void setDefaultRingSize(size_t byteCount);
        static size_t getDefaultRingSize();
        static void setDefaultMaxConsumers(size_t numConsumers);
        static size_t getDefaultMaxConsumers();
        static std::string defaultRing();
        static std::string defaultRing();
        static  std::string defaultRingUrl();

      
  CRingBuffer(std::string name, ClientMode mode = CRingBuffer::consumer);
      
  virtual ~CRingBuffer();
        size_t put(const void* pBuffer, size_t nBytes, unsigned long timeout = ULONG_MAX);
        size_t get(void* pBuffer, size_t maxBytes, size_t minBytes = 1, unsigned long timeout = ULONG_MAX);
        size_t peek(void* pBuffer, size_t maxbytes);
        void skip(size_t nBytes);
        unsigned long setPollInterval(unsigned long newValue);
        unsigned long getPollInterval();
        size_t availablePutSpace();
        size_t availableData();
        CRingBuffer::Usage getUsage();
        int blockWhile(CRingBuffer::CRingBufferPredicate& pred, unsigned long timeout = ULONG_MAX);
        void While(CRingBufferPredicate& pred);

        off_t getSlot();
        void forceProducerRelease();
        void forceConsumerRelease(unsigned slot);

       }
     

Description

CRingBuffer provides a low level message-based high performance interprocessor communication mechanism called a ring buffer. For information about ring buffers in general, see http://en.wikipedia.org/wiki/Circular_buffer.

CRingBuffer manages its ring buffer in shared memory. The shared memory region includes the buffer a single put pointer and several get pointers. This allows for a single producer (in the NSCL DAQ, this would generally be a readout program or event builder), and several consumers (e.g. data analysis, event recording, scaler displays etc.).

The CRingBuffer class include static methods for creating and reinitializing ring buffers as well as object based members that allow ring buffer clients (both producers and consumers) to transfer data. For more on how to use the ring buffer class see The Ring Buffer Primitives

A unique feature of the CRingBuffer class is predicate based blocking. This allows a caller to block until some arbitrary condition is no longer true. Higher level software that is aware of message structure can use this to select specific messages from the ring, or implement message sampling. For more information on that see the description of blockWhile below, and the "Types and public data" section later in this reference page.

Public member functions

static void create(std::string name, size_t dataBytes = m_defaultDataSize, size_t maxConsumer = m_defaultMaxConsumers, bool tempConnection = false);

Creates a new ring buffer. Each ring buffer has a distinct name. Ring buffer names can be just about anything as long as there are no / characters embedded in them.

std::string name

The name of the ring buffer. If you call this for a ring buffer that already exists, the ring buffer will be re-initialized. This has undesirable effects if clients are connected to the ring.

size_t dataBytes

Determines the number of bytes of data that can exist in the ring buffer without having been consumed and without blocking another put operation. Note that due to the need to page align the shared memory region, this value is a lower bound on the actual number of bytes of storage.

This paramteer is optional and has a default value. setDefaultRingSize can set the default value for this parameter, and getDefaultRingSize can provide the current value for this default.

size_t maxConsumer

Provides the maximum number of consumers that can connect to the ring simultaneously. This is an optional parameter. The value this parameter defaults to can be modifed and read using the setDefaultMaxConsumers and getDefaultMaxConsumers respectively.

bool tempConnection

If this is true, the ring buffer is registered with the Ring master using a temporary, one-time connection which is closed down immediately after registration. This allows children to be created that don't inherit an open socket if done early enough in the game.

Note that no restrictions are imposed on which users can read/write from/to the ring buffer. That is the ring buffer, which is encapsulated in a POSIX shared memory file, has the permissions mode 0666.

static CRingBuffer* createAndProduce(std::string name, size_t dataBytes = m_defaultDataSize, size_t maxConsumer = m_defaultMaxConsumers, bool tempConnection = false);

If the ringbuffer name does not exist it is created. The caller is then attached to the ring as a producer and the CRingBuffer object created is returned to the caller. The return is a pointer to a dynamically allocated CRingBuffer which must be deleted by the client.

Note that if the ring already exists, only the name parameter matters. The other parameters are used for name creation and have the same meaning as for create

static void remove(std::string name);

Marks the shared memory region assocaited with a ring buffer for deletion. Once no more processes are attached to the ring buffer, or have file descriptors open on the ring buffer, the shared memory region is deleted.

static void format(std::string name, size_t maxConsumer = m_defaultMaxConsumers);

Formats a ring buffer. Formatting a ring buffer empties it and resets all of the ring get/put pointers to not-owned. It is a very bad thing to format a ring buffer that is active.

std::string name

Provides the name of the ring buffer. This must be the name of an existing ring buffer.

size_t maxConsumer

Defines the maximum number of consumer programs that can connect to the ring buffer. This is an optional parameter. The default for this parameter can be gotten and set using setDefaultMaxConsumers and getDefaultMaxConsumers respectively.

Since format does not resize the shared memory region containing the buffer and its control data, the size of the data will change if the number of consumers is changed from the value used to create the initial ring.

static bool isRing(std::string name);

Ring buffers are shared memory segments, but not all shared memory segments are rings. Ring buffers have signature bytes in their headers that allow software to unambiguously determine if a shared memory region is a ring buffer or something else.

This function returns true If a ring named name exists. If it does not (either there's no corresponding shared memory region or there is one, but it does not have the correct signature bytes to qualify as a ring buffer), the function returns false

static void setDefaultRingSize(size_t byteCount);

Provides a new value, byteCount for the default ring data size. The function returns the prior default value.

static size_t getDefaultRingSize();

Returns the default size of the data region of a ring.

static void setDefaultMaxConsumers(size_t numConsumers);

Sets the default number of consumers supported by a new ring buffer to numConsumers.

static size_t getDefaultMaxConsumers();

Returns the default maximum number of consumers that will be supported by the creation of a new ring buffer.

static std::string defaultRing();

Returns the default name of the ring for most NSCLDAQ cases. This will be the name of the logged in user.

static std::string defaultRingUrl();

Returns a URL for the default ring for most NSCLDAQ applications. If CRingBuffer::defaultRing returns user, this function would return: tcp://localhost/user.

CRingBuffer(std::string name, ClientMode mode = CRingBuffer::consumer);

Creates a new ring buffer object. Ring buffer objects are used to connect clients to ring buffers created via CRingBuffer::create. A client can be a manager, a producer or a consumer. Only one producer can be attached to the ring buffer at a time. Several consumers and an unlimited number of managers can attach simlutaneously.

The name parameter is the name of the ring buffer to which the object will be attached. This ring buffer must have been created with the CRingBuffer::create member function. mode determines the type of client connection to be formed. This is an optional parameter that defaults to CRingBuffer::consumer, which creates a consumer attachment. The value CRingBuffer::producer creates a producer attachment. CRingBuffer::manager creates a manager attachment.

virtual ~CRingBuffer();

Destroys a ring buffer attachment object. The object's destruction results in a release of the access pointer associated with the ring buffer. If the object was a producer attachment, a new producer can attach. IF the object was a consumer attachment, that consumer pointer becomes available for a new consumer.

size_t put(const void* pBuffer, size_t nBytes, unsigned long timeout = ULONG_MAX);

Puts data into the ring buffer. The object must hold a producer attachment.

void* pBuffer

Pointer to the data to be transferred to the ring buffer.

size_t nBytes

The number of bytes to transfer to the ring buffer from the buffer.

unsigned long timeout

The maximum number of seconds to wait for free space to be available in the ring buffer prior to failing the put operation. This is an optional parameter that defaults to essentially forever (the largest value that can fit in a longword, in a 32 bit longword this is about 136 years.

The return value is the number of bytes actually written. This should either be the value of nBytes, indicating the entire bufer was transferred to the ring buffer or 0 indicating that a wait for free space timed out.

size_t get(void* pBuffer, size_t maxBytes, size_t minBytes = 1, unsigned long timeout = ULONG_MAX);

Retrieves a block of data from the ring buffer. This function allows the caller to specify both a maximum, and a minimum size of the data to return.

void* pBuffer

Points to the storage in which the data from the ring buffer will be stored.

size_t maxBytes

Specifies the maximum number of bytes that can be transferred to the buffer (this is usually, but not always, the size of the buffer).

size_t minBytes

Specifies the smallest message that can be transferred to the caller's buffer. The function will block until minBytes of data is available for this consumer or until the timeout.

unsigned long timeout

Determines the maximum number of seconds the function will block for minBytes to be available. This is an optional parameter and defaults to essentially forever (in the limit as 136 years approaches forever).

The function returns the number of bytes actually read. The value 0 implies the read timed out, or that minBytes was 0 and no bytes were available.

size_t peek(void* pBuffer, size_t maxbytes);

Returns data from the ring buffer without actually removing it. This function never blocks.

void* pBuffer

A pointer to a buffer that will hold the data retrieved from the ring.

size_t maxbytes

Returns the maximum number of bytes that can be transferred to the buffer.

The return value from this function is the actual number of bytes transferred. Since this function never blocks. This can be any value from 0 though maxbytes.

void skip(size_t nBytes);

Skips the consumer client's get pointer forward by nBytes, wrapping as neeed. peek and Skip can be used to conditionally read data from the ring. For example, a peek at the front of the available data could determine the size and type of a message. The message could then either be read with get, or skipped over with skip.

unsigned long setPollInterval(unsigned long newValue);

The blockWhile function polls a predicate blocking between polls for some polling interval. setPollIntervale allows the caller to set the pollinterval for blocking operations on a ring. newValue is the number of milliseconds between polls. The return value is the previous value of the polling interval.

unsigned long getPollInterval();

See setPollInterval above. This method returns the current polling interval.

size_t availablePutSpace();

Returns the number of bytes that can currently be put in the buffer without the producer blocking.

size_t availableData();

Returns the amount of data available to this consumer.

CRingBuffer::Usage getUsage();

Returns a number of pieces of status and statistics information about the ring buffer. See "Types and public data" below for more information about the CRingBuffer::USage structure.

int blockWhile(CRingBuffer::CRingBufferPredicate& pred, unsigned long timeout = ULONG_MAX);

Blocks the caller by calling the operator() method of the pred parameter. Blocking continues as long as this predicate call returns true or until the timeout seconds have passed. timeout is an optional parameter that defaults to 136 years (or essentially forever).

void While(CRingBufferPredicate& pred);

Repeatedly calls the predicate pred until it returns false. The method polls the predicate rather than blocking between calls. One use of this function is to skip forward through the ring buffer until a message that matches some specific pattern is encountered.

off_t getSlot();

Returns the consumer slot that corresponds to the object. If the object is a producer or a manager object, the return value is -1.

void forceProducerRelease();

Forces the ring to no longer have a producer attachment. This is intended to remove producers that have exited abnormally.

void forceConsumerRelease(unsigned slot);

Forces the consumer slot slot to be freee for reallocation. This is intended to be used to remove consumers that have exited abnormally.

Types and public data

The CRingBuffer class defines several nested data types:

CRingBuffer::ClientMode

This is an enumerated type that is used to select the type of connection desired when creating an instance of a CRingBuffer object. Possible values are:

CRingBuffer::producer

Requests a producer connection to the ring buffer.

CRingBuffer::consumer

Requests a consumer connection to the ring buffer.

CRingBuffer::manager

Requests a manager connection to the ring buffer. normal user code should not do this. This sort of connection allows force disconnection of existing connection.

CRingBuffer::Usage

This structure is used to pass information about the status of a ring from the getUsage method. The struct contains the following members:

size_t s_bufferSpace

Contains the size of the ring buffer in bytes.

size_t s_putSpace

Contains the number of bytes the producer could put in the buffer without blocking.

size_t s_maxConsumers

Contains the number of consumers that can be simultaneously connected to the ring buffer.

pid_t s_producer

Contains the process id of the ring buffer.

size_t s_maxGetSpace

Contains the number of bytes that could be retrieved by the consumer that is furthest behind in reading the buffer.

size_t s_minGetSpace

Returns the number of bytes that could be retrieved by the consumer that's most up to date in reading the buffer.

std::vector<std::pid_t, std::pair<pid_t, size_t > > s_consumers

This vector returns information about each client currently attached to the ring buffer. The first element of each pair is the process id of the consumer. The second element of each pair is the number of bytes that consumer could get without blocking.

CRingBuffer::CRingBufferPredicate

CRingBuffer::CRingBufferPredicate provides an abstract baseclass taht defines an interface for blocking predicate. A blocking predicate is a function like object that can be used by blockWhile to know how long to block.

The CRingBuffer::CRingBufferPredicate class defines only a single method:

bool operator()(virtual CRingBuffer& ring);

On each pass through its loop, the method blockWhile invokes its predicates operator() and continues blocking if that method returns true.

Exceptions

Some of the member functions in CRingBuffer will throw exceptions:

CErrnoException

Will be thrown when a system call returns an error in a system call that provides additional information in the errno variable.

CRangeException

Thrown when some value is not in a legal range. For example, a user is requesting a get with minbytes larger than the total number of bytes the ring buffer can hold. Rather than blocking forever, this exception is thrown.

CStateException

Normally thrown if a consumer is attempting functions that are only allowed to producers, and vica versa.

EXAMPLES

The example below shows how to create a predicate that filters messages. We assume that each messages has a two longword header that is defined in the struct header. s_size is the size of the message, including the header, and type is the message type code.

The example predicate allows the application to skip all messages that don't match the desired message type.

Example 1. Message filter predicate


class CMessageFilterPredicate : public CRingBuffer::CRingBufferPredicate (1)
{
private:
    long    m_desiredType;
public:
    CMessageFilterPredicate(long desiredType) :
        m_desiredType(desiredType)                                (2)
        {}
    virtual bool operator()(CRingBuffer& ring);
};
bool
CMessageFilterPredicate::operator()(CRingBuffer& ring)            (3)
{
    // peek the header:
    
    struct header hdr;
    size_t numRead = ring.peek(&hdr, sizeof(hdr));                (4)
    
    if(numRead != sizeof(hdr)) return true;                       (5)
    
    if (hdr.s_type  != m_desiredType) {                            
        ring.skip(hdr.s_size);                                    (6)
        return true;
    }
    return false;                                                 (7)
}

...

CRingBuffer myring(string("myring"));

...

// Wait for a message of type 3...
// then read it.
CMessageFilterPredicate filter(3);                                (8)
myring.blockWhile(filter);                                        (9)
struct header &hdr;
myring.peek(&hdr, sizeof(hdr));                                   (10)
char* message = new message[hdr.s_size];
myring.get(message, hdr.s_size, hdr.s_size);                      (11)

...

            
(1)
Defines a class CMessageFilterPredicate that is derived from the CRingBuffer::CRingPRedicate base class. Objects that are CMessageFilterPredicate objects can therefore be passed as predicates to the CRingBuffer::blockWhile method.
(2)
The constructor of CMessageFilterPredicate will save the desired messsage type in member data it can be used to compare the types of messages in the ring buffer to the desired message type.
(3)
Concrete derivations of the CRingBuffer::CRingBufferPredicate class must implement the operator(). This method is called by the polling loop in CRingBuffer::blockWhile.
(4)
The predicate function operator will peek at message headers.
(5)
If there are no messages in the ring buffer when the function operator is called, it will return true requesting that CRingBuffer::blockWhile continue blocking.
(6)
This code assumes that messages are atomically put in the ring buffer, that is the entire messages is transfered in one put operation. If the header of shows that the message type is not the one we're looking for, the entire message is skipped and CRingBuffer::blockWhile is told to continue to block.
(7)
If, on the other hand, the message matches the desired message type, the function operator returns false which tells CRingBuffer::blockWhile to stop blocking.
(8)
The first step to actually filtering out unwanted message types is to create a CMessageFilterPredicate object passing the constructor the message type code we want to see.
(9)
This call to CRingBuffer::blockWhile will not return until a message of type 3 is found. Due to the skip calls in the predicate, when control returns, the next available data in the ring buffer is the desired message.
(10)
Peeking at the message header allows us to allocate the storage needed for the entire message, and to read the message atomically.
(11)
Reads the message.