You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

35 lines
992 B
Python

import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class RingBuffer:
"""Implements a basic Ring Buffer type."""
def __init__(self, size):
self._data = bytearray(size)
self.size = size
self.put_index = 0
self.get_index = 0
def put(self, value):
_next_index = (self.put_index + 1) % self.size
# check for overflow
if self.get_index != _next_index:
self._data[self.put_index] = value
self.put_index = _next_index
logger.debug(f'get_index:{self.get_index}, put_index:{self.put_index}')
logger.debug(str(self._data))
return value
else:
return None
def get(self):
if self.get_index == self.put_index:
return None ## buffer empty
else:
value = self._data[self.get_index]
self.get_index = (self.get_index + 1) % self.size
return value