xref: /haiku/src/tests/add-ons/kernel/file_systems/shared/consistency_check/AdaptiveBuffering.cpp (revision 13581b3d2a71545960b98fefebc5225b5bf29072)
1 /*
2  * Copyright 2008, Axel Dörfler, axeld@pinc-software.de.
3  * Distributed under the terms of the MIT License.
4  */
5 
6 
7 #include "AdaptiveBuffering.h"
8 
9 #include <stdlib.h>
10 
11 
12 //#define TRACE(x...) printf(x)
13 #define TRACE(x...) ;
14 
15 
16 AdaptiveBuffering::AdaptiveBuffering(size_t initialBufferSize,
17 		size_t maxBufferSize, uint32 count)
18 	:
19 	fWriterThread(-1),
20 	fBuffers(NULL),
21 	fReadBytes(NULL),
22 	fBufferCount(count),
23 	fReadIndex(0),
24 	fWriteIndex(0),
25 	fReadCount(0),
26 	fWriteCount(0),
27 	fMaxBufferSize(maxBufferSize),
28 	fCurrentBufferSize(initialBufferSize),
29 	fReadSem(-1),
30 	fWriteSem(-1),
31 	fFinishedSem(-1),
32 	fWriteStatus(B_OK),
33 	fWriteTime(0),
34 	fFinished(false),
35 	fQuit(false)
36 {
37 }
38 
39 
40 AdaptiveBuffering::~AdaptiveBuffering()
41 {
42 	_QuitWriter();
43 
44 	delete_sem(fReadSem);
45 	delete_sem(fWriteSem);
46 
47 	if (fBuffers != NULL) {
48 		for (uint32 i = 0; i < fBufferCount; i++) {
49 			if (fBuffers[i] == NULL)
50 				break;
51 
52 			free(fBuffers[i]);
53 		}
54 
55 		free(fBuffers);
56 	}
57 
58 	free(fReadBytes);
59 }
60 
61 
62 status_t
63 AdaptiveBuffering::Init()
64 {
65 	fReadBytes = (size_t*)malloc(fBufferCount * sizeof(size_t));
66 	if (fReadBytes == NULL)
67 		return B_NO_MEMORY;
68 
69 	fBuffers = (uint8**)malloc(fBufferCount * sizeof(uint8*));
70 	if (fBuffers == NULL)
71 		return B_NO_MEMORY;
72 
73 	for (uint32 i = 0; i < fBufferCount; i++) {
74 		fBuffers[i] = (uint8*)malloc(fMaxBufferSize);
75 		if (fBuffers[i] == NULL)
76 			return B_NO_MEMORY;
77 	}
78 
79 	fReadSem = create_sem(0, "reader");
80 	if (fReadSem < B_OK)
81 		return fReadSem;
82 
83 	fWriteSem = create_sem(fBufferCount - 1, "writer");
84 	if (fWriteSem < B_OK)
85 		return fWriteSem;
86 
87 	fFinishedSem = create_sem(0, "finished");
88 	if (fFinishedSem < B_OK)
89 		return fFinishedSem;
90 
91 	fWriterThread = spawn_thread(&_Writer, "buffer reader", B_LOW_PRIORITY,
92 		this);
93 	if (fWriterThread < B_OK)
94 		return fWriterThread;
95 
96 	return resume_thread(fWriterThread);
97 }
98 
99 
100 status_t
101 AdaptiveBuffering::Read(uint8* /*buffer*/, size_t* _length)
102 {
103 	*_length = 0;
104 	return B_OK;
105 }
106 
107 
108 status_t
109 AdaptiveBuffering::Write(uint8* /*buffer*/, size_t /*length*/)
110 {
111 	return B_OK;
112 }
113 
114 
115 status_t
116 AdaptiveBuffering::Run()
117 {
118 	fReadIndex = 0;
119 	fWriteIndex = 0;
120 	fReadCount = 0;
121 	fWriteCount = 0;
122 	fWriteStatus = B_OK;
123 	fWriteTime = 0;
124 
125 	while (fWriteStatus >= B_OK) {
126 		bigtime_t start = system_time();
127 		int32 index = fReadIndex;
128 
129 		TRACE("%ld. read index %lu, buffer size %lu\n", fReadCount, index,
130 			fCurrentBufferSize);
131 
132 		fReadBytes[index] = fCurrentBufferSize;
133 		status_t status = Read(fBuffers[index], &fReadBytes[index]);
134 		if (status < B_OK)
135 			return status;
136 
137 		TRACE("%ld. read -> %lu bytes\n", fReadCount, fReadBytes[index]);
138 
139 		fReadCount++;
140 		fReadIndex = (index + 1) % fBufferCount;
141 		if (fReadBytes[index] == 0)
142 			fFinished = true;
143 		release_sem(fReadSem);
144 
145 		while (acquire_sem(fWriteSem) == B_INTERRUPTED)
146 			;
147 
148 		if (fFinished)
149 			break;
150 
151 		bigtime_t readTime = system_time() - start;
152 		uint32 writeTime = fWriteTime;
153 		if (writeTime) {
154 			if (writeTime > readTime) {
155 				fCurrentBufferSize = fCurrentBufferSize * 8/9;
156 				fCurrentBufferSize &= ~65535;
157 			} else {
158 				fCurrentBufferSize = fCurrentBufferSize * 9/8;
159 				fCurrentBufferSize = (fCurrentBufferSize + 65535) & ~65535;
160 
161 				if (fCurrentBufferSize > fMaxBufferSize)
162 					fCurrentBufferSize = fMaxBufferSize;
163 			}
164 		}
165 	}
166 
167 	while (acquire_sem(fFinishedSem) == B_INTERRUPTED)
168 		;
169 
170 	return fWriteStatus;
171 }
172 
173 
174 void
175 AdaptiveBuffering::_QuitWriter()
176 {
177 	if (fWriterThread >= B_OK) {
178 		fQuit = true;
179 		release_sem(fReadSem);
180 
181 		status_t status;
182 		wait_for_thread(fWriterThread, &status);
183 
184 		fWriterThread = -1;
185 	}
186 }
187 
188 
189 status_t
190 AdaptiveBuffering::_Writer()
191 {
192 	while (true) {
193 		while (acquire_sem(fReadSem) == B_INTERRUPTED)
194 			;
195 		if (fQuit)
196 			break;
197 
198 		bigtime_t start = system_time();
199 
200 		TRACE("%ld. write index %lu, %p, bytes %lu\n", fWriteCount, fWriteIndex,
201 			fBuffers[fWriteIndex], fReadBytes[fWriteIndex]);
202 
203 		fWriteStatus = Write(fBuffers[fWriteIndex], fReadBytes[fWriteIndex]);
204 
205 		TRACE("%ld. write done\n", fWriteCount);
206 
207 		fWriteIndex = (fWriteIndex + 1) % fBufferCount;
208 		fWriteTime = uint32(system_time() - start);
209 		fWriteCount++;
210 
211 		release_sem(fWriteSem);
212 
213 		if (fWriteStatus < B_OK)
214 			return fWriteStatus;
215 		if (fFinished && fWriteCount == fReadCount)
216 			release_sem(fFinishedSem);
217 	}
218 
219 	return B_OK;
220 }
221 
222 
223 /*static*/ status_t
224 AdaptiveBuffering::_Writer(void* self)
225 {
226 	return ((AdaptiveBuffering*)self)->_Writer();
227 }
228 
229