1 /*
2 * Copyright 2008, Axel Dörfler, axeld@pinc-software.de.
3 * Distributed under the terms of the MIT License.
4 */
5
6
7 #include "AdaptiveBuffering.h"
8
9 #include <stdlib.h>
10
11
12 //#define TRACE(x...) printf(x)
13 #define TRACE(x...) ;
14
15
AdaptiveBuffering(size_t initialBufferSize,size_t maxBufferSize,uint32 count)16 AdaptiveBuffering::AdaptiveBuffering(size_t initialBufferSize,
17 size_t maxBufferSize, uint32 count)
18 :
19 fWriterThread(-1),
20 fBuffers(NULL),
21 fReadBytes(NULL),
22 fBufferCount(count),
23 fReadIndex(0),
24 fWriteIndex(0),
25 fReadCount(0),
26 fWriteCount(0),
27 fMaxBufferSize(maxBufferSize),
28 fCurrentBufferSize(initialBufferSize),
29 fReadSem(-1),
30 fWriteSem(-1),
31 fFinishedSem(-1),
32 fWriteStatus(B_OK),
33 fWriteTime(0),
34 fFinished(false),
35 fQuit(false)
36 {
37 }
38
39
~AdaptiveBuffering()40 AdaptiveBuffering::~AdaptiveBuffering()
41 {
42 _QuitWriter();
43
44 delete_sem(fReadSem);
45 delete_sem(fWriteSem);
46
47 if (fBuffers != NULL) {
48 for (uint32 i = 0; i < fBufferCount; i++) {
49 if (fBuffers[i] == NULL)
50 break;
51
52 free(fBuffers[i]);
53 }
54
55 free(fBuffers);
56 }
57
58 free(fReadBytes);
59 }
60
61
62 status_t
Init()63 AdaptiveBuffering::Init()
64 {
65 fReadBytes = (size_t*)malloc(fBufferCount * sizeof(size_t));
66 if (fReadBytes == NULL)
67 return B_NO_MEMORY;
68
69 fBuffers = (uint8**)malloc(fBufferCount * sizeof(uint8*));
70 if (fBuffers == NULL)
71 return B_NO_MEMORY;
72
73 for (uint32 i = 0; i < fBufferCount; i++) {
74 fBuffers[i] = (uint8*)malloc(fMaxBufferSize);
75 if (fBuffers[i] == NULL)
76 return B_NO_MEMORY;
77 }
78
79 fReadSem = create_sem(0, "reader");
80 if (fReadSem < B_OK)
81 return fReadSem;
82
83 fWriteSem = create_sem(fBufferCount - 1, "writer");
84 if (fWriteSem < B_OK)
85 return fWriteSem;
86
87 fFinishedSem = create_sem(0, "finished");
88 if (fFinishedSem < B_OK)
89 return fFinishedSem;
90
91 fWriterThread = spawn_thread(&_Writer, "buffer reader", B_LOW_PRIORITY,
92 this);
93 if (fWriterThread < B_OK)
94 return fWriterThread;
95
96 return resume_thread(fWriterThread);
97 }
98
99
100 status_t
Read(uint8 *,size_t * _length)101 AdaptiveBuffering::Read(uint8* /*buffer*/, size_t* _length)
102 {
103 *_length = 0;
104 return B_OK;
105 }
106
107
108 status_t
Write(uint8 *,size_t)109 AdaptiveBuffering::Write(uint8* /*buffer*/, size_t /*length*/)
110 {
111 return B_OK;
112 }
113
114
115 status_t
Run()116 AdaptiveBuffering::Run()
117 {
118 fReadIndex = 0;
119 fWriteIndex = 0;
120 fReadCount = 0;
121 fWriteCount = 0;
122 fWriteStatus = B_OK;
123 fWriteTime = 0;
124
125 while (fWriteStatus >= B_OK) {
126 bigtime_t start = system_time();
127 int32 index = fReadIndex;
128
129 TRACE("%ld. read index %lu, buffer size %lu\n", fReadCount, index,
130 fCurrentBufferSize);
131
132 fReadBytes[index] = fCurrentBufferSize;
133 status_t status = Read(fBuffers[index], &fReadBytes[index]);
134 if (status < B_OK)
135 return status;
136
137 TRACE("%ld. read -> %lu bytes\n", fReadCount, fReadBytes[index]);
138
139 fReadCount++;
140 fReadIndex = (index + 1) % fBufferCount;
141 if (fReadBytes[index] == 0)
142 fFinished = true;
143 release_sem(fReadSem);
144
145 while (acquire_sem(fWriteSem) == B_INTERRUPTED)
146 ;
147
148 if (fFinished)
149 break;
150
151 bigtime_t readTime = system_time() - start;
152 uint32 writeTime = fWriteTime;
153 if (writeTime) {
154 if (writeTime > readTime) {
155 fCurrentBufferSize = fCurrentBufferSize * 8/9;
156 fCurrentBufferSize &= ~65535;
157 } else {
158 fCurrentBufferSize = fCurrentBufferSize * 9/8;
159 fCurrentBufferSize = (fCurrentBufferSize + 65535) & ~65535;
160
161 if (fCurrentBufferSize > fMaxBufferSize)
162 fCurrentBufferSize = fMaxBufferSize;
163 }
164 }
165 }
166
167 while (acquire_sem(fFinishedSem) == B_INTERRUPTED)
168 ;
169
170 return fWriteStatus;
171 }
172
173
174 void
_QuitWriter()175 AdaptiveBuffering::_QuitWriter()
176 {
177 if (fWriterThread >= B_OK) {
178 fQuit = true;
179 release_sem(fReadSem);
180
181 status_t status;
182 wait_for_thread(fWriterThread, &status);
183
184 fWriterThread = -1;
185 }
186 }
187
188
189 status_t
_Writer()190 AdaptiveBuffering::_Writer()
191 {
192 while (true) {
193 while (acquire_sem(fReadSem) == B_INTERRUPTED)
194 ;
195 if (fQuit)
196 break;
197
198 bigtime_t start = system_time();
199
200 TRACE("%ld. write index %lu, %p, bytes %lu\n", fWriteCount, fWriteIndex,
201 fBuffers[fWriteIndex], fReadBytes[fWriteIndex]);
202
203 fWriteStatus = Write(fBuffers[fWriteIndex], fReadBytes[fWriteIndex]);
204
205 TRACE("%ld. write done\n", fWriteCount);
206
207 fWriteIndex = (fWriteIndex + 1) % fBufferCount;
208 fWriteTime = uint32(system_time() - start);
209 fWriteCount++;
210
211 release_sem(fWriteSem);
212
213 if (fWriteStatus < B_OK)
214 return fWriteStatus;
215 if (fFinished && fWriteCount == fReadCount)
216 release_sem(fFinishedSem);
217 }
218
219 return B_OK;
220 }
221
222
223 /*static*/ status_t
_Writer(void * self)224 AdaptiveBuffering::_Writer(void* self)
225 {
226 return ((AdaptiveBuffering*)self)->_Writer();
227 }
228
229