环形缓冲区(ring buffer)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""1.3.39 Ring ring_buffer
A ring ring_buffer, or circular queue, is a FIFO data structure of a fixed size N.
It is useful for transferring data between asynchronous processes or for
storing log files.
When the ring_buffer is empty, the consumer waits until data is deposited;
when the ring_buffer is full, the producer waits to deposit data.
Develop an API for a Ringring_buffer and an implementation that uses an array
representation (with circular wrap-around).
"""
import unittest
import collections


class RingBufferIterator(collections.Iterator):

def __init__(self, first, buffer):
self.__first = first
self.__count = 0
self.__ring_buffer = buffer

def __iter__(self):
return self

def __next__(self):
if not self.has_next(): raise StopIteration
item = self.__ring_buffer[self.__first]
if self.__first == len(self.__ring_buffer) - 1:
self.__first = 0 # wrap around
else:
self.__first += 1
self.__count += 1
return item

def has_next(self):
return self.__count < len(self.__ring_buffer)


class RingBuffer:

def __init__(self, size):
self.__max_size = size # 缓冲区最大空间
self.__ring_buffer = [None] * size # 缓冲区
self.__first = -1 # 读指针
self.__last = -1 # 写指针
self.__size = 0 # 当前缓冲大小
self.__producer_aux_ring_buffer = [] # 候补缓冲区队列
self.data_count_to_be_consumed = 0 # 消费者等待数量

def __iter__(self):
return RingBufferIterator(self.__first, self.__ring_buffer)

def size(self):
# 获取缓冲区大小
return self.__size

def is_empty(self):
# 缓冲区是否为空
return self.__size == 0

def is_full(self):
# 缓冲区是否满了
return self.__size == self.__max_size

def produce(self, item):
# 生产
if self.data_count_to_be_consumed > 0:
# 消费者等待数据数量
self.consume_data(item)
self.data_count_to_be_consumed -= 1
else:
if self.is_empty():
# 缓冲去为空
self.__ring_buffer[self.__size] = item
self.__first = 0
self.__last = 0
self.__size += 1
else:
if not self.is_full():
# 缓冲区未满
if self.__last == len(self.__ring_buffer) - 1:
# 写指针到达环形缓冲区结尾元素
self.__last = 0 # 形成环,从头开始写入数据
else:
self.__last += 1
self.__ring_buffer[self.__last] = item # 将数据写入缓冲区中
self.__size += 1
else:
# 缓冲区满时,将元素放入到后部缓冲区中
self.__producer_aux_ring_buffer.append(item)

def consume_data(self, item):
print("Data consumed: ", item)

def consume(self):
if self.is_empty():
# 消费者进行数据消费时,缓冲区为空,消费者处于等待中,这时统计要消费者等待数量
self.data_count_to_be_consumed += 1
return
item = self.__ring_buffer[self.__first] # 从读指针获取数据
self.__ring_buffer[self.__first] = None # 将当前指针指向的元素设置为None,保持缓冲区长度,avoid loitering

if self.__first == len(self.__ring_buffer) - 1:
# 读指针到达缓存末尾元素
self.__first = 0 # 形成环,从头开始读
else:
# 移动读指针到下一个元素
self.__first += 1
self.__size -= 1

if len(self.__producer_aux_ring_buffer):
# 将等待候补缓冲区队列中的元素出队并添加到缓冲区中
self.produce(self.__producer_aux_ring_buffer.pop(0))
return item


class TestRingBuffer(unittest.TestCase):

@staticmethod
def create_ring_buffer(size):
return RingBuffer(size=size)

def test_ring_buffer(self):
ring_buffer = self.create_ring_buffer(4)
ring_buffer.produce(0)
ring_buffer.produce(1)
ring_buffer.produce(2)
ring_buffer.produce(3)
ring_buffer.produce(4)
ring_buffer.produce(5)


item1 = ring_buffer.consume()
self.assertEqual(0, item1)

item2 = ring_buffer.consume()
self.assertEqual(1, item2)

ring_buffer.produce(6)
ring_buffer.produce(7)

result = [2, 3, 4, 5]
ans = list(ring_buffer)
self.assertEqual(result, ans)


if __name__ == "__main__":
unittest.main()