A faster ringbuffer implementation

Post your cool example code here.
User avatar
Rick Kimball
Posts: 1038
Joined: Tue Apr 28, 2015 1:26 am
Location: Eastern NC, US
Contact:

A faster ringbuffer implementation

Post by Rick Kimball » Thu Jul 14, 2016 5:21 pm

A while back I had created a template based ring buffer (circular buffer) using C++ templates for the msp430 mcu. It was faster than the c based one used in Arduino/Energia. I gave it a try on stm32duino and compared the speed to the libmaple/ring_buffer.h version. The ringbuffer_t below seems to take 36% less time.

For me the time to insert an item is most important as I was using it to buffer incoming serial bytes. I wanted it to be fast as possible. The maple ring_buffer insert takes about 500 nano seconds. The insert function in the code below (push_bach) only takes about 320 nano seconds.

(BTW: this also highlights how kick ass it is to have a 72MHz ARM cpu, on the msp430 an insert takes about ~1680 ns)

ringbuffer.h

Code: Select all

/*
   ringbuffer.h - yet another version of ringbuffer_t c++ template

   Desc: This template class creates a ringbuffer with arbitrary types. Provides push
         and pop methods for adding and removing items.  Access function for checking
         the number of items in the buffer, capacity and full or empty methods
         provide safe access to the info.

         This version has been optimized for the msp430-gcc and stm32. It doesn't disable
         or enable any interrupts. It is safe nonetheless for use when there is a single
         writer and single reader. This is common when using an interrupt and the main
         line thread working together to move data in and out of peripherals on demand.

   Version: 1.0.3
   Created: Jul-24-2012
    Author: rick@kimballsoftware.com
      Date: 02-28-2013

   =========================================================================
    Copyright © 2012-2016 Rick Kimball

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.

    Jul-14-2016: rick@kimballsoftware.com removed extra stuff not needed for stm32
                 formatted using auto format in arduino ide

*/

#ifndef RINGBUFFER_T_H_
#define RINGBUFFER_T_H_
#include <stdint.h>

/**
   struct_is_power_of_two - power of 2 checker
     enforce at compile time that SIZE is power of 2 and >= 2
*/

template<unsigned SIZE>
struct is_power_of_two {
  enum {val = (SIZE >= 2) & (SIZE > 1) & !(SIZE & (SIZE - 1))};
  static const unsigned badSIZE[(val == 1) ? 1 : -1]; // SIZE is not a power of 2 if you an error here.
};

/**
   uint16x2_t - a union containing 16 bit head and tail offsets into the ring buffer. The union
                allows the c code to grab both values with one assembler instruction access.

*/
union uint16x2_t {
  // access as 32 bit
  unsigned long both;
  // -- or as 2 16 bit values --
  struct {
    unsigned head: 16;
    unsigned tail: 16;
  };
};

/**
   ringbuffer_t - provide a circular_buffer without disabling interrupts
      expects a power of 2 container, and only one reader and one writer
      container capacity SIZE-1
*/

template <
  typename T,                     /* works with any type */
  uint16_t SIZE,                  /* how many elements-1 must be power of 2 */
  typename POP_T = int16_t,       /* return type of pop_front */
  POP_T EMPTY_ELEM = (POP_T) - 1  /* default return value when empty */
  >
struct ringbuffer_t {

  // --- private structure data ---
  // although variables are accessible because this is a struct
  
  volatile uint16x2_t offsets; // comes first so we can use 0 offset to variables
                               // for fastest access
  T elements[SIZE];         

  enum { CAPACITY = SIZE - 1 }; // leave one slot open

  is_power_of_two<SIZE> check_buffer_size; // your SIZE is not a power of 2, if you get an error here

  // --- public methods ---

  // write access zeros out head and tail
  inline void clear(void ) {
    offsets.both = 0;
  }

  // return the count of used slots
  size_t available() {
    register uint16x2_t temp = { offsets.both };

    temp.both = (temp.head - temp.tail) & CAPACITY;

    return temp.both;
  }

  // return maximum number of slots available
  size_t capacity() {
    return CAPACITY;
  }

  // returns true when there is no used slots
  bool inline empty() {
    return !available();
  }

  // returns true when all slots used
  bool inline full() {
    return available() == capacity();
  }

  /*
      push_back() - adds an element to the end of the queue

      Note: affects head, reads tail, element ignored if overflow ~300 ns @72MHz
  */
  void push_back(const T element) {
    register uint16_t temp_head = offsets.head;

    elements[temp_head++] = element;
    temp_head &= CAPACITY;
    if ( !(temp_head == offsets.tail) ) { // !full
      offsets.head = temp_head;
    }

    return;
  }

  // no bounds check version, affects head ~250 ns @72MHz
  void push_back_nc(const T element) {
    register uint16_t temp_head = offsets.head;

    elements[temp_head++] = element;
    offsets.head = temp_head & CAPACITY;
    return;
  }


  // affects tail, reads head
  POP_T pop_front(void) {
    register uint16x2_t temp = { offsets.both };

    if ( (temp.head - temp.tail) & CAPACITY ) { // !empty
      POP_T elem = elements[temp.tail++];
      offsets.tail = temp.tail & CAPACITY;
      return elem;
    }

    return EMPTY_ELEM; // on underflow return default element
  }

  // no bounds check, affects tail
  POP_T pop_front_nc(void) {
    register uint16_t temp_tail = offsets.tail;

    POP_T elem = elements[temp_tail++];
    offsets.tail = temp_tail & CAPACITY;
    return elem;
  }

};

#endif /* RINGBUFFER_T_H_ */
Example code:

Code: Select all

#include "ringbuffer.h"
#include "streaming.h"

typedef ringbuffer_t<uint8_t, 32> ringbuffer_med_t;

static ringbuffer_med_t message_buffer;

void queue_message(const char *str) {
  while (*str) {
    GPIOC_BASE->BRR = (1 << 13);  // time the function using a gpio toggle
    message_buffer.push_back(*str++);
    GPIOC_BASE->BSRR = (1 << 13); 
    // put a scope on PC13 to see how much time push_back() actually takes
    // I measured ~320 nano seconds
  }
}

void setup() {
  Serial.begin(115200);
  pinMode(PC13, OUTPUT); // toggle pin for timing function
  queue_message("Hello World\r\n");
}

void loop() {
  if ( message_buffer.available() ) {
[url][/url]
    Serial << "message size=" << message_buffer.available()
           <<     " head=" << message_buffer.offsets.head
           <<     " tail=" << message_buffer.offsets.tail
           << "\r\n";

    Serial << "[";

    do {
      int c = message_buffer.pop_front_nc();
      Serial.write(c);
    } while (message_buffer.available());

    Serial << "]\r\n";

  }

  delay(1000);

  queue_message("Hello Again\r\n");
}
Note: as this stuff is template based, you could change the element buffer from an array of uint8_t data to something completely different. It could be an ring buffer of custom structure, a ring buffer of int16_t, or even a ring buffer of function pointers.

[Here is the example above ringbuffer_example.ino]
https://gist.github.com/RickKimball/a04 ... faa6bbfc7f

[Here is the libmaple version I compared against maple_example.ino from gist]
https://gist.github.com/RickKimball/f30 ... fc62915acd

[Here is an example using the ringbuffer from edogaldo]
https://gist.github.com/RickKimball/ebf ... b9db479254

Enjoy!
-rick
Last edited by Rick Kimball on Thu Jul 14, 2016 10:10 pm, edited 4 times in total.
-rick

User avatar
Slammer
Posts: 251
Joined: Tue Mar 01, 2016 10:35 pm
Location: Athens, Greece

Re: A faster ringbuffer implementation

Post by Slammer » Thu Jul 14, 2016 8:45 pm

Thanks Rick! Very Nice!

edogaldo
Posts: 280
Joined: Fri Jun 03, 2016 8:19 am

Re: A faster ringbuffer implementation

Post by edogaldo » Thu Jul 14, 2016 8:52 pm

Hei Rick, interesting solution.
One request: would it be possible for you to benchmark also my version of ring_buffer?

Anyway one question: I didn't understand if you said your solution is race conditions safe even without the need to disable interrupts and, if yes, why, could you please explain?


Thanks, E.

User avatar
Rick Kimball
Posts: 1038
Joined: Tue Apr 28, 2015 1:26 am
Location: Eastern NC, US
Contact:

Re: A faster ringbuffer implementation

Post by Rick Kimball » Thu Jul 14, 2016 9:11 pm

edogaldo wrote:Anyway one question: I didn't understand if you said your solution is race conditions safe even without the need to disable interrupts and, if yes, why, could you please explain?
When I access the head and tail tail variables, I do it by loading a single 32 bit register. Take the available function:

Code: Select all

  size_t available() {
    register uint16x2_t temp = { offsets.both };

    temp.both = (temp.head - temp.tail) & CAPACITY;

    return temp.both;
  }
The assembler code for that shows that it loads the value using one assembler instruction. This can't be interrupted:

Code: Select all

  // return the count of used slots
  size_t available() {
    register uint16x2_t temp = { offsets.both };
 800015a:       6803            ldr     r3, [r0, #0]

    temp.both = (temp.head - temp.tail) & CAPACITY;
 800015c:       b29a            uxth    r2, r3
 800015e:       eba2 4013       sub.w   r0, r2, r3, lsr #16

    return temp.both;
  }
 8000162:       f000 000f       and.w   r0, r0, #15
 8000166:       4770            bx      lr
In the case above the statement 'ldr r3,[r0, #0]' loads the head and tail in one instruction and then any math is done against that snapshot in time.

-rick
Last edited by Rick Kimball on Thu Jul 14, 2016 9:24 pm, edited 1 time in total.
-rick

User avatar
Rick Kimball
Posts: 1038
Joined: Tue Apr 28, 2015 1:26 am
Location: Eastern NC, US
Contact:

Re: A faster ringbuffer implementation

Post by Rick Kimball » Thu Jul 14, 2016 9:21 pm

edogaldo wrote:One request: would it be possible for you to benchmark also my version of ring_buffer?
Your code seems to run slightly faster but isn't as consistent, it bounces between 450 ns and 480 ns.

-rick

Code: Select all

#include "ringbuffer_e.h"
#include "streaming.h"

uint8_t buffer[32];

ring_buffer message_buffer = { buffer, 0, 0, sizeof(buffer)  };

void queue_message(const char *str) {
  while (*str) {
    GPIOC_BASE->BRR = (1 << 13);  // time function using gpio toggle
    rb_safe_insert(&message_buffer, *str++);
    GPIOC_BASE->BSRR = (1 << 13); 
    // put a scope on PC13 to see how much time rb_safe_insert() actually takes
    // I measured ~450 and ~480 nano seconds
  }
}

void setup() {
  Serial.begin(115200);
  pinMode(PC13, OUTPUT);
  queue_message("Hello World\r\n");
}

void loop() {
  if ( !rb_is_empty(&message_buffer)) {

    Serial << "message size=" << rb_full_count(&message_buffer)
           <<     " head=" << message_buffer.head
           <<     " used=" << message_buffer.used
           << "\r\n";

    Serial << "[";

    do {
      int c = rb_remove(&message_buffer);
      Serial.write(c);
    } while (!rb_is_empty(&message_buffer));

    Serial << "]\r\n";

  }

  delay(1000);

  queue_message("Hello Again\r\n");
}
-rick

User avatar
Rick Kimball
Posts: 1038
Joined: Tue Apr 28, 2015 1:26 am
Location: Eastern NC, US
Contact:

Re: A faster ringbuffer implementation

Post by Rick Kimball » Sun Mar 12, 2017 7:27 pm

I've been moving over to arm-none-eabi-gcc 6.2.1. I was looking at my asm output and noticed the compiler is getting even smarter.
In the original code I had posted, the available() function used 3 registers, r0,r2,r3. In 6.2.1 generated code, it is accomplished with one register and a single asm call:

arm-none-eabi-gcc 4.8.3

Code: Select all

  // return the count of used slots
  size_t available() {
    register uint16x2_t temp = { offsets.both };
 800015a:       6803            ldr     r3, [r0, #0]

    temp.both = (temp.head - temp.tail) & CAPACITY;
 800015c:       b29a            uxth    r2, r3
 800015e:       eba2 4013       sub.w   r0, r2, r3, lsr #16

    return temp.both;
  }
 8000162:       f000 000f       and.w   r0, r0, #15
 8000166:       4770            bx      lr
arm-none-eabi-gcc 6.2.1 generated code:

Code: Select all

08000760 <ringbuffer_t<(unsigned short)16, unsigned char, int, -1>::available()>:
    register uint16x2_t temp = { offsets.both };
 8000760:       6800            ldr     r0, [r0, #0]
    temp.both = (temp.head - temp.tail) & CAPACITY; // compute elements available 
 8000762:       eba0 4010       sub.w   r0, r0, r0, lsr #16
  }
 8000766:       f000 000f       and.w   r0, r0, #15
 800076a:       4770            bx      lr
The uxth instruction is gone, and replaced by the single sub.w:
sub.w r0, r0, r0, lsr #16

The nice thing about living in this ARM universe, the open source stuff keeps moving forward making things better while we sleep : )
-rick

C_D
Posts: 62
Joined: Mon May 11, 2015 3:27 am
Location: New Zealand

Re: A faster ringbuffer implementation

Post by C_D » Wed Jul 05, 2017 9:20 pm

I'm really interested in using something like this in one of my projects, I currently have a number of ring buffers storing sensor values and calculating moving averages. A template class like this is going to be great for tidying up my sketch.

I have a quick question though, what happens if you try to use it with an array? From doing some reading I see that template classes do work with arrays as the datatype, but I assume I would have to write special functions for copying in and returning data?

EDIT:
In hindsight passing whole arrays in and out of a ringbuffer is kind of an odd thing to want to do. My solution in the end was to add a discard_front() method which could remove invalid messages from the front of queue.

C_D
Posts: 62
Joined: Mon May 11, 2015 3:27 am
Location: New Zealand

Re: A faster ringbuffer implementation

Post by C_D » Wed Oct 04, 2017 2:26 am

Just came back to my project using these neat ringbuffers, and found theres a bit of bug in the posted code.

Code: Select all

typename POP_T = int16_t,       /* return type of pop_front */

...

  // affects tail, reads head
  POP_T pop_front(void) {
    register uint16x2_t temp = { offsets.both };

    if ( (temp.head - temp.tail) & CAPACITY ) { // !empty
      POP_T elem = elements[temp.tail++];
      offsets.tail = temp.tail & CAPACITY;
      return elem;
    }

    return EMPTY_ELEM; // on underflow return default element
  }
Despite being a template class which stores any type you like, the elements are always returned as an int16_t. Produced some wonderfully bizzare results when I was trying to store uint32_t data.

My corrected code looks something like this:

Code: Select all

  // affects tail, reads head
  T pop_front(void) {
    register uint16x2_t temp = { offsets.both };
    T elem;  //initialise a 'T' with the default constructor

    // if the queue isnt empty pop an element
    if ( (temp.head - temp.tail) & CAPACITY ) {
      elem = elements[temp.tail++];
      offsets.tail = temp.tail & CAPACITY;
    }
    
    return elem;
  }
Also thought I might share my Moving Mean template class which makes use of Ricks ringbuffer.h

Code: Select all

// MovingMean Class - template declaration
template <typename T, uint16_t buffer_size>
class MovingMean
{
  public:
    MovingMean(void);
    void addSample(T data);
    T getMean(void);

  private:
    uint16_t num_elements;
    T running_sum;
    ringbuffer_t<T, buffer_size> values;
};

// MovingMean Class - template definition
template <typename T, uint16_t buffer_size>
MovingMean<T, buffer_size>::MovingMean(void)
: running_sum(0), num_elements(0)
{
    
}

template <typename T, uint16_t buffer_size>
void MovingMean<T, buffer_size>::addSample(T data)
{
  // the ringbuffer can only hold size - 1 values
  if (num_elements == (buffer_size - 1))
  {
    // drop an element to make space, no change to num_elements
    running_sum -= values.pop_front();
  }
  else 
  {
    // num_elements increases by 1
    num_elements++;
  }

  // add the new element to the list and the sum
  running_sum += data;
  values.push_back(data);
}

template <typename T, uint16_t buffer_size>
T MovingMean<T, buffer_size>::getMean(void)
{ 
  if (num_elements > 0)
  {
    return running_sum / num_elements;
  }
  else
  {
    return 0;
  }
}

User avatar
Rick Kimball
Posts: 1038
Joined: Tue Apr 28, 2015 1:26 am
Location: Eastern NC, US
Contact:

Re: A faster ringbuffer implementation

Post by Rick Kimball » Wed Oct 04, 2017 3:54 am

You just need to provide the POP_T data type and make it the same as the T data type in your template definition.

ringbuffer_t<T, buffer_size, T, -1> values; /* <<<< note this is different than yours */

For the POP_T, you can use the same type as used for the element T or a different one.

I used this "feature" to my advantage using uint8_t as the element data type but a int for the POP_T with an empty value of -1.

This allows me to loop on the return value:

Code: Select all

   ringbuffer_t<uint8_t, 64, int, -1> buffer;
   ...
   int c;
   while((c=buffer.pop_front()) != -1) {
    Serial.print((char)c);
   }
Using this approach, I can still handle the 0xFF characters and also know when the buffer is empty. If I returned the same datatype, I wouldn't be able to use -1 as the empty buffer indicator, as a 0xFF would be -1 if I returned a uint8_t.

BTW: You should probably call values.clear() in your constructor so it will work as a local variable on the stack.

Code: Select all

/* tested on linux */

#include <cstdio>
#include <iostream>
#include <stdint.h>
#include <unistd.h>
#include "ringbuffer.h"
using namespace std;

// MovingMean Class - template declaration
template <typename T, uint16_t buffer_size>
class MovingMean
{
  public:
    MovingMean(void);
    void addSample(T data);
    T getMean(void);

  private:
    T running_sum;
    uint16_t num_elements;
    ringbuffer_t<T, buffer_size, T, -1> values; /* <<<< note this is different than yours */
};

// MovingMean Class - template definition
template <typename T, uint16_t buffer_size>
MovingMean<T, buffer_size>::MovingMean(void)
: running_sum(0), num_elements(0)
{
  values.clear();
}

template <typename T, uint16_t buffer_size>
void MovingMean<T, buffer_size>::addSample(T data)
{
  // the ringbuffer can only hold size - 1 values
  if (num_elements == (buffer_size - 1))
  {
    // drop an element to make space, no change to num_elements
    running_sum -= values.pop_front();
  }
  else 
  {
    // num_elements increases by 1
    num_elements++;
  }

  // add the new element to the list and the sum
  running_sum += data;
  values.push_back(data);
}

template <typename T, uint16_t buffer_size>
T MovingMean<T, buffer_size>::getMean(void)
{ 
  if (num_elements > 0)
  {
    return running_sum / num_elements;
  }
  else
  {
    return 0;
  }
}

int main() {
  MovingMean<int32_t, 16> movingmean;

  while(1) {
    movingmean.addSample(69999UL);
    cout << "movingmean.getMean()=" << movingmean.getMean() << endl;
    movingmean.addSample(70000UL);
    cout << "movingmean.getMean()=" << movingmean.getMean() << endl;
    movingmean.addSample(70001UL);
    cout << "movingmean.getMean()=" << movingmean.getMean() << endl;
    sleep(2);
  }
}

-rick

User avatar
mrburnette
Posts: 1829
Joined: Mon Apr 27, 2015 12:50 pm
Location: Greater Atlanta
Contact:

Re: A faster ringbuffer implementation

Post by mrburnette » Wed Oct 04, 2017 1:44 pm

Rick Kimball wrote:
Thu Jul 14, 2016 5:21 pm
...
Note: as this stuff is template based, you could change the element buffer from an array of uint8_t data to something completely different. It could be an ring buffer of custom structure, a ring buffer of int16_t, or even a ring buffer of function pointers.
...
Rick,
Wow ... seriously cool. We mortals are unworthy, but we will use it anyway!

Ray

Post Reply