1 /* 2 * Copyright 2008, Axel Dörfler, axeld@pinc-software.de. 3 * Distributed under the terms of the MIT License. 4 */ 5 6 7 #include "AdaptiveBuffering.h" 8 9 #include <stdlib.h> 10 11 12 //#define TRACE(x...) printf(x) 13 #define TRACE(x...) ; 14 15 16 AdaptiveBuffering::AdaptiveBuffering(size_t initialBufferSize, 17 size_t maxBufferSize, uint32 count) 18 : 19 fWriterThread(-1), 20 fBuffers(NULL), 21 fReadBytes(NULL), 22 fBufferCount(count), 23 fReadIndex(0), 24 fWriteIndex(0), 25 fReadCount(0), 26 fWriteCount(0), 27 fMaxBufferSize(maxBufferSize), 28 fCurrentBufferSize(initialBufferSize), 29 fReadSem(-1), 30 fWriteSem(-1), 31 fFinishedSem(-1), 32 fWriteStatus(B_OK), 33 fWriteTime(0), 34 fFinished(false), 35 fQuit(false) 36 { 37 } 38 39 40 AdaptiveBuffering::~AdaptiveBuffering() 41 { 42 _QuitWriter(); 43 44 delete_sem(fReadSem); 45 delete_sem(fWriteSem); 46 47 if (fBuffers != NULL) { 48 for (uint32 i = 0; i < fBufferCount; i++) { 49 if (fBuffers[i] == NULL) 50 break; 51 52 free(fBuffers[i]); 53 } 54 55 free(fBuffers); 56 } 57 58 free(fReadBytes); 59 } 60 61 62 status_t 63 AdaptiveBuffering::Init() 64 { 65 fReadBytes = (size_t*)malloc(fBufferCount * sizeof(size_t)); 66 if (fReadBytes == NULL) 67 return B_NO_MEMORY; 68 69 fBuffers = (uint8**)malloc(fBufferCount * sizeof(uint8*)); 70 if (fBuffers == NULL) 71 return B_NO_MEMORY; 72 73 for (uint32 i = 0; i < fBufferCount; i++) { 74 fBuffers[i] = (uint8*)malloc(fMaxBufferSize); 75 if (fBuffers[i] == NULL) 76 return B_NO_MEMORY; 77 } 78 79 fReadSem = create_sem(0, "reader"); 80 if (fReadSem < B_OK) 81 return fReadSem; 82 83 fWriteSem = create_sem(fBufferCount - 1, "writer"); 84 if (fWriteSem < B_OK) 85 return fWriteSem; 86 87 fFinishedSem = create_sem(0, "finished"); 88 if (fFinishedSem < B_OK) 89 return fFinishedSem; 90 91 fWriterThread = spawn_thread(&_Writer, "buffer reader", B_LOW_PRIORITY, 92 this); 93 if (fWriterThread < B_OK) 94 return fWriterThread; 95 96 return resume_thread(fWriterThread); 97 } 98 99 100 status_t 101 AdaptiveBuffering::Read(uint8* /*buffer*/, size_t* _length) 102 { 103 *_length = 0; 104 return B_OK; 105 } 106 107 108 status_t 109 AdaptiveBuffering::Write(uint8* /*buffer*/, size_t /*length*/) 110 { 111 return B_OK; 112 } 113 114 115 status_t 116 AdaptiveBuffering::Run() 117 { 118 fReadIndex = 0; 119 fWriteIndex = 0; 120 fReadCount = 0; 121 fWriteCount = 0; 122 fWriteStatus = B_OK; 123 fWriteTime = 0; 124 125 while (fWriteStatus >= B_OK) { 126 bigtime_t start = system_time(); 127 int32 index = fReadIndex; 128 129 TRACE("%ld. read index %lu, buffer size %lu\n", fReadCount, index, 130 fCurrentBufferSize); 131 132 fReadBytes[index] = fCurrentBufferSize; 133 status_t status = Read(fBuffers[index], &fReadBytes[index]); 134 if (status < B_OK) 135 return status; 136 137 TRACE("%ld. read -> %lu bytes\n", fReadCount, fReadBytes[index]); 138 139 fReadCount++; 140 fReadIndex = (index + 1) % fBufferCount; 141 if (fReadBytes[index] == 0) 142 fFinished = true; 143 release_sem(fReadSem); 144 145 while (acquire_sem(fWriteSem) == B_INTERRUPTED) 146 ; 147 148 if (fFinished) 149 break; 150 151 bigtime_t readTime = system_time() - start; 152 uint32 writeTime = fWriteTime; 153 if (writeTime) { 154 if (writeTime > readTime) { 155 fCurrentBufferSize = fCurrentBufferSize * 8/9; 156 fCurrentBufferSize &= ~65535; 157 } else { 158 fCurrentBufferSize = fCurrentBufferSize * 9/8; 159 fCurrentBufferSize = (fCurrentBufferSize + 65535) & ~65535; 160 161 if (fCurrentBufferSize > fMaxBufferSize) 162 fCurrentBufferSize = fMaxBufferSize; 163 } 164 } 165 } 166 167 while (acquire_sem(fFinishedSem) == B_INTERRUPTED) 168 ; 169 170 return fWriteStatus; 171 } 172 173 174 void 175 AdaptiveBuffering::_QuitWriter() 176 { 177 if (fWriterThread >= B_OK) { 178 fQuit = true; 179 release_sem(fReadSem); 180 181 status_t status; 182 wait_for_thread(fWriterThread, &status); 183 184 fWriterThread = -1; 185 } 186 } 187 188 189 status_t 190 AdaptiveBuffering::_Writer() 191 { 192 while (true) { 193 while (acquire_sem(fReadSem) == B_INTERRUPTED) 194 ; 195 if (fQuit) 196 break; 197 198 bigtime_t start = system_time(); 199 200 TRACE("%ld. write index %lu, %p, bytes %lu\n", fWriteCount, fWriteIndex, 201 fBuffers[fWriteIndex], fReadBytes[fWriteIndex]); 202 203 fWriteStatus = Write(fBuffers[fWriteIndex], fReadBytes[fWriteIndex]); 204 205 TRACE("%ld. write done\n", fWriteCount); 206 207 fWriteIndex = (fWriteIndex + 1) % fBufferCount; 208 fWriteTime = uint32(system_time() - start); 209 fWriteCount++; 210 211 release_sem(fWriteSem); 212 213 if (fWriteStatus < B_OK) 214 return fWriteStatus; 215 if (fFinished && fWriteCount == fReadCount) 216 release_sem(fFinishedSem); 217 } 218 219 return B_OK; 220 } 221 222 223 /*static*/ status_t 224 AdaptiveBuffering::_Writer(void* self) 225 { 226 return ((AdaptiveBuffering*)self)->_Writer(); 227 } 228 229