Re: [LAU] Simple, easy multithreaded circular buffer library for Linux?

From: Fons Adriaensen <fons@email-addr-hidden>
Date: Fri Oct 17 2008 - 16:39:43 EEST

On Fri, Oct 17, 2008 at 03:02:30PM +0200, Paul Davis wrote:

> we can and *do* anticipate that context switches will occur there -
> there is no general way to that execute that operation
> (increment-and-mask) in a lock-free atomic way. the idea is that even if
> it does happen, the resulting error prevents the buffer from being
> misused. of course, its a little inefficient. but until we get cpu's
> with atomic inc-and-mask instructions, its the price we pay for going
> lock-free (and its a relatively small cost).

The original test fails here as well (dual i686).

I attach the same test modified to use the simple
C++ class I use normally. It has a slightly different
API (allowing random access in the block read or written),
but the essential difference is that the operations that
modify the state seen by the other end (the commit() calls)
are atomic.

The _nrd and _nwr counters (similar to the r/w pointers
in jack_ringbuffer) are never masked - the mask is
applied in the testing and r/w code only.
It would be quite easy to do something similar in the
jack_ringbuffer code.

This version procudes no errors here.

Ciao,

#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <inttypes.h>
#include <assert.h>

// compile using g++ -I. -o testrb2 -Wall testrb2.cc -lpthread

#define ARRAY_SIZE 64
#define MAX_VALUE 0x10000

class Lfq_u32
{
public:

    Lfq_u32 (int size);
    ~Lfq_u32 (void);

    int write_avail (void) const { return _size - _nwr + _nrd; }
    void write_commit (int n) { _nwr += n; }
    void write (int i, uint32_t v) { _data [(_nwr + i) & _mask] = v; }

    int read_avail (void) const { return _nwr - _nrd; }
    void read_commit (int n) { _nrd += n; }
    uint32_t read (int i) { return _data [(_nrd + i) & _mask]; }

private:

    uint32_t *_data;
    int _size;
    int _mask;
    int _nwr;
    int _nrd;
};

Lfq_u32::Lfq_u32 (int size) : _size (size), _mask (_size - 1), _nwr (0), _nrd (0)
{
    assert (!(_size & _mask));
    _data = new uint32_t [_size];
}

Lfq_u32::~Lfq_u32 (void)
{
    delete[] _data;
}

static Lfq_u32 *Q;

static int fill_int_array (int *array, int start, int count)
{
    int i, j = start;
    for (i = 0; i < count; i++)
    {
        array[i] = j;
        j = (j + 1) % MAX_VALUE;
    }
    return j;
}

static int cmp_array (int *array1, int *array2, int count)
{
    int i;
    for (i = 0; i < count; i++)
        if (array1[i] != array2[i])
        {
            printf("%d != %d at offset %d\n", array1[i], array2[i], i);
            return 0;
        }

    return 1;
}

static void *reader_start (void * arg)
{
    int k, i = 0, a[ARRAY_SIZE], b[ARRAY_SIZE];

    unsigned long j = 0, nfailures = 0;
    printf("reader started on cpu %d\n", sched_getcpu());
    i = fill_int_array (a, i, ARRAY_SIZE);
    while (1)
    {
        if (Q->read_avail() >= ARRAY_SIZE)
        {
            for (k = 0; k < ARRAY_SIZE; k++) b [k] = Q->read (k);
            Q->read_commit (ARRAY_SIZE);
            if (!cmp_array (a, b, ARRAY_SIZE))
            {
                nfailures++;
                printf("failure in chunk %lu - probability: %lu/%lu = %.3f per million\n",
                       j, nfailures, j, (float) nfailures / (j + 1) * 1000000);
                i = (b[0] + ARRAY_SIZE) % MAX_VALUE;
            }
            i = fill_int_array (a, i, ARRAY_SIZE);
            j++;
        }
    }
    return NULL;
}

static void *writer_start (void * arg)
{
    int k, i = 0, a[ARRAY_SIZE];
    printf("writer started on cpu %d\n", sched_getcpu());

    i = fill_int_array (a, i, ARRAY_SIZE);

    while (1)
    {
        if (Q->write_avail() >= ARRAY_SIZE)
        {
            for (k = 0; k < ARRAY_SIZE; k++) Q->write (k, a [k]);
            Q->write_commit (ARRAY_SIZE);
              i = fill_int_array (a, i, ARRAY_SIZE);
        }
    }
    return NULL;
}

int main(int argc, char *argv[])
{
    int size;
    pthread_t reader_thread, writer_thread;

    if (argc < 2) return 1;
    sscanf(argv[1], "%d", &size);
    printf("starting ringbuffer stress test\n");
    printf("buffer size (bytes): %d\n", size);
    printf("array size (bytes): %d\n", sizeof(int) * ARRAY_SIZE);

    Q = new Lfq_u32 (size);

    pthread_create (&reader_thread, NULL, reader_start, NULL);
    pthread_create (&writer_thread, NULL, writer_start, NULL);
    while (1) sleep(1);

    return 0;
}

-- 
FA
Laboratorio di Acustica ed Elettroacustica
Parma, Italia
Lascia la spina, cogli la rosa.
_______________________________________________
Linux-audio-user mailing list
Linux-audio-user@email-addr-hidden
http://lists.linuxaudio.org/mailman/listinfo/linux-audio-user
Received on Fri Oct 17 20:15:02 2008

This archive was generated by hypermail 2.1.8 : Fri Oct 17 2008 - 20:15:02 EEST