Implementing a ring buffer / circular buffer for communnication between PRU and ARM on BBB.

I’m sampling data from the ADC and doing some processing on the PRU unit. After that, I pass the processed data to a linux app in the ARM processor using shared memory. Sometimes when the ARM processor is busy, I miss some of the samples so I’d like to use a ring buffer.
I was reading this which explains clearly how to implement a single reader, single writer circular buffer. I think I can translate that to C without problems. However, they state that read and write operations to the head and tail indexes MUST be atomic for the buffer to work correctly.

If I’m writing and reading from the PRU and the ARM to a position in shared ram, how do I make sure I’m doing it atomically?

Perhaps there is already a circular buffer implementation for ARM+PRU that I can take a look at for reference?

I also found this this which suggests read operations are atomic on the linux side and write operations should be atomic on most platforms. Is that the case here?

Thanks!

Use 32-bit (or smaller) naturally aligned values in the PRU data ram and
accesses will be atomic on both the PRU and the ARM side.

You can also use other 32-bit memory regions (DDR, on-board SRAM), but
you have to deal with memory management and provide the PRU with
physical addresses (not the logical address you get from malloc). Plus
the memory access latency could stall the PRU.

Use 32-bit (or smaller) naturally aligned values in the PRU data ram and
accesses will be atomic on both the PRU and the ARM side.

So, something like this will do the trick (for atomic access)? on the PRU:

`
// Address 0x10000 is refered to as “shared data” in the PRU local data memory map.
volatile unsigned int* shared_ram = (volatile unsigned int *)0x10000;
shared_ram[100] = 0xFFFF;

`

And on the ARM:

`
void* p;
prussdrv_map_prumem(PRUSS0_SHARED_DATARAM, &p);
shared_ram = (volatile unsigned int*)p;
unsigned int foobar = shared_ram[100];

`

Thanks for the quick reply :slight_smile:

Yes, something like that. I program the PRU in assembly, but the C code
looks correct on visual inspection. :slight_smile:

Note that even for a simple unidirectional ring buffer like you're
proposing (one reader, one writer), you can potentially have problems
with memory access ordering. This isn't an issue on the PRU side (no
cache and no out-of-order execution), but the ARM has a weak memory
model and just because you do something in your code like:

  read A
  read B
  write C

...doesn't mean the hardware won't actually do something like:

  read A
  write C
  read B

...which can be a problem if the "write C" updates your ring buffer
pointer and indicates that "data B" was read and consumed by the reader
thread. You can probably just ignore the problem and things will likely
work fine (unless you've got a really small ring buffer and are
particularly unlucky), but the proper solution to this is to use memory
barriers. You can see how this is done in the Kernel code, and you're
lucky enough that GCC now has intrinsics for handling this, so you no
longer need to write assembly code on the ARM side to make sure your
ring buffer will *ALWAYS* work properly. :slight_smile:

Now to read up on memory barriers :slight_smile:

Here’s my implementation, in case someone needs something similar or in case someone wants to review and criticize.

On the ARM Side:

`
/////////////////////////////////////////////////////////////////////
// Ring buffer.
//

// Communication with PRU is through a ring buffer in the
// PRU shared memory area.
// shared_ram[0] to shared_ram[127] is the buffer data.
// shared_ram[128] is the start (read) pointer.
// shared_ram[129] is the end (write) pointer.
//
// Messages are 32 bit unsigned ints.
//
// Read these:
// * http://en.wikipedia.org/wiki/Circular_buffer#Use_a_Fill_Count
// * https://groups.google.com/forum/#!category-topic/beagleboard/F9JI8_vQ-mE

static volatile unsigned int* shared_ram = NULL;
void* p;
prussdrv_map_prumem(PRUSS0_SHARED_DATARAM, &p);
shared_ram = (volatile unsigned int*)p;

unsigned int buffer_size;
volatile unsigned int *buffer_start;
volatile unsigned int *buffer_end;

void buffer_init(){
buffer_size = 128;
buffer_start = &(shared_ram[128]); // value inited to 0 in pru
buffer_end = &(shared_ram[129]); // value inited to 0 in pru
}

static inline int buffer_is_empty(){
return (*buffer_start == *buffer_end);
}

static inline void buffer_read(unsigned int* message){
*message = shared_ram[*buffer_start & (buffer_size-1)];

// Don’t write buffer start before reading message (mem barrier)
// http://stackoverflow.com/questions/982129/what-does-sync-synchronize-do
// https://en.wikipedia.org/wiki/Memory_ordering#Compiler_memory_barrier
__sync_synchronize();

// Increment buffer start, wrap around size
*buffer_start = (buffer_start+1) & (2buffer_size - 1);
}

void main(){
buffer_init();
unsigned int message;
while(!buffer_is_empty()){
buffer_read(&message);
}
}

`

On the PRU side:

`
/////////////////////////////////////////////////////////////////////
// RING BUFFER
//

// Communication with ARM processor is througn a ring buffer in the
// PRU shared memory area.
// shared_ram[0] to shared_ram[127] is the buffer data.
// shared_ram[128] is the start (read) pointer.
// shared_ram[129] is the end (write) pointer.
//
// Messages are 32 bit unsigned ints.
//
// Read these:
// * http://en.wikipedia.org/wiki/Circular_buffer#Use_a_Fill_Count
// * https://groups.google.com/forum/#!category-topic/beagleboard/F9JI8_vQ-mE

volatile unsigned int* shared_ram = (volatile unsigned int *)0x10000;

unsigned int buffer_size;
volatile unsigned int *buffer_start;
volatile unsigned int *buffer_end;

void init_buffer(){
// data in shared_ram[0] to shared_ram[127]
buffer_size = 128;
buffer_start = &(shared_ram[128]);
buffer_end = &(shared_ram[129]);
*buffer_start = 0;
*buffer_end = 0;
}

inline void buffer_write(unsigned int message){
shared_ram[*buffer_end & (buffer_size-1)] = message;
unsigned int is_full = (*buffer_end == (*buffer_start^buffer_size)); // ^ is orex
if(is_full){
// Increment buffer start, wrap around size
*buffer_start = (buffer_start+1) & (2buffer_size - 1);
}
// Increment buffer end, wrap around size
*buffer_end = (buffer_end+1) & (2buffer_size - 1);
}

void main(){
buffer_init();
buffer_write(0xff0000ff);
buffer_write(0x11335577);
//…
}

`

I think you will run into concurrency issues.
The operations to increment the buffer need to be atomic, but they are not.
While you perform the read / add / modulus / write in the Arm, then PRU can add a new message and change the value.
The ring buffer can get corrupt.

I would set a flag in shared memory stating the arm read a value from the ring buffer.
You then wait for the flag to clear before reading the next value.
The PRU would then change the ring buffer pointers and reset the flag.

The trick is for the PRU to be the only core manipulating the buffer pointers.

Thanks for the input Peter, you made me realize I had a mistake:

The idea here is that ONLY the PRU changes the end pointer (write position pointer) and ONLY the ARM changes the start pointer (read position pointer). Also, the pointers are updated AFTER the data is read or written (thus the memory barrier on the ARM side).

When the ARM is reading/updating the start pointer, the PRU could write messages to fill the buffer, changing the start pointer and corrupting the buffer. To avoid this, I have changed the PRU code as follows. Note that if the buffer is full, new messages will be dropped, you can lower the chances of this happening by making the buffer larger.

inline void buffer_write(unsigned int message){ unsigned int is_full = (*buffer_end == (*buffer_start^buffer_size)); // ^ is orex if(!is_full){`````shared_ram[*buffer_end & (buffer_size-1)] = message;``` ```*buffer_end = (*buffer_end+1) & (2*buffer_size - 1);```} }`

`

Careful with the buffer size handling! It's hard to tell the difference
between empty and full, since in both cases the read and write pointers
are identical.

It looks like you're trying to use an extra bit of buffer_end to track
empty/full, but I don't think your logic is correct. I don't see how
the MSB (indicating buffer full) ever gets cleared once set.

I've found it's usually much less hassle to live with a maximum buffer
size of N-1 (ie: full = 127 elements, not 128) rather than try to
properly handle the complexity of properly tracking empty/full. Note
that there are *LOTS * of subtle ways you can mess up the empty/full
logic (it's even harder to do properly than the basic pointer handling
for reads/writes), so if you do try to use all buffer elements, be
*VERY* careful with your code. I recommend reviewing the Linux kernel
code if you need a reference implementation.

Rafael,

This is good advice from Charles! I've been running this n-1 scheme with my own
PRU code and BeBoPr software for over two years now and it has proven rock stable.
No concurrency or cache coherency issues, no locking needed and no data re-ordering
problems!

You can find the C-code in the following file:
https://github.com/modmaker/BeBoPr/blob/master/pruss_stepper.c
(start with 'pruss_command')
The PRUSS code is not open source, but I think the C-code shows enough detail.

Cheers,
-- Bas

I have not done this with the PRU, but I work with all sorts of other chips that have to use ring buffers. It is usually easiest to start with head = tail = 0 as the starting (buffer empty) case. Then drop bytes on the sender side if head++ would == tail. It gets really simple if the head and tail indexes (not pointers) are a data size that equals the size of your ring buffer, for example a 256 byte buffer with unsigned char indexes, because then you just increment the indexes right across the 255=>0 wrap point.

Thanks everyone for the feedback! This is indeed more tricky than I had thought.
I’ll post back later with a revised implementation
:slight_smile:

It might be interesting to compare this with the implementation of the ARM/PRU ring buffer in http://beaglelogic.net. Ideally, we’ll end up with a standard Linux vring implementation for efficiently communicating with the PRUs. Doing this communications efficiently is big advantage to us looking at moving to remote_proc rather than using uio_pruss where userspace needs to get involved and might slow down the data movement.

Hello Again.

I’ve double (and triple checked) my logic and I’ll keep it as it is for my project. Unless someone points a different mistake, that is… (atomicity, memory barriers…)

The trick I’m using to differentiate an empty buffer from a full one is explained here and the specific case (simpler) when buffer size is a power of two is further down in that page. Short version is: you keep an extra bit for both the start and end pointers that represents the parity of how many times the pointer has circled the buffer. If both pointers point to the same slot and have circled the same number of times, the buffer is empty. If they point to the same slot but the end buffer has circled one more time (parity is different) then the buffer is full. Here are the checks:

`
unsigned int is_full = (*buffer_end == (*buffer_start^buffer_size));

`

`
int buffer_is_empty(){
return (*buffer_start == *buffer_end);
}

`

Note that when incrementing the pointers, they don’t go back to zero when they reach buffer_size, but when they reach 2*size-1:

`
*buffer_start = (buffer_start+1) & (2buffer_size - 1);

`

`
*buffer_end = (buffer_end+1) & (2buffer_size - 1);

`

Thanks everyone for their input!! It rocks to have a solid and helpful community around the tools you use :slight_smile: