/* * Copyright 2008, Axel Dörfler, axeld@pinc-software.de. * Distributed under the terms of the MIT License. */ #include "AdaptiveBuffering.h" #include //#define TRACE(x...) printf(x) #define TRACE(x...) ; AdaptiveBuffering::AdaptiveBuffering(size_t initialBufferSize, size_t maxBufferSize, uint32 count) : fWriterThread(-1), fBuffers(NULL), fReadBytes(NULL), fBufferCount(count), fReadIndex(0), fWriteIndex(0), fReadCount(0), fWriteCount(0), fMaxBufferSize(maxBufferSize), fCurrentBufferSize(initialBufferSize), fReadSem(-1), fWriteSem(-1), fFinishedSem(-1), fWriteStatus(B_OK), fWriteTime(0), fFinished(false), fQuit(false) { } AdaptiveBuffering::~AdaptiveBuffering() { _QuitWriter(); delete_sem(fReadSem); delete_sem(fWriteSem); if (fBuffers != NULL) { for (uint32 i = 0; i < fBufferCount; i++) { if (fBuffers[i] == NULL) break; free(fBuffers[i]); } free(fBuffers); } free(fReadBytes); } status_t AdaptiveBuffering::Init() { fReadBytes = (size_t*)malloc(fBufferCount * sizeof(size_t)); if (fReadBytes == NULL) return B_NO_MEMORY; fBuffers = (uint8**)malloc(fBufferCount * sizeof(uint8*)); if (fBuffers == NULL) return B_NO_MEMORY; for (uint32 i = 0; i < fBufferCount; i++) { fBuffers[i] = (uint8*)malloc(fMaxBufferSize); if (fBuffers[i] == NULL) return B_NO_MEMORY; } fReadSem = create_sem(0, "reader"); if (fReadSem < B_OK) return fReadSem; fWriteSem = create_sem(fBufferCount - 1, "writer"); if (fWriteSem < B_OK) return fWriteSem; fFinishedSem = create_sem(0, "finished"); if (fFinishedSem < B_OK) return fFinishedSem; fWriterThread = spawn_thread(&_Writer, "buffer reader", B_LOW_PRIORITY, this); if (fWriterThread < B_OK) return fWriterThread; return resume_thread(fWriterThread); } status_t AdaptiveBuffering::Read(uint8* /*buffer*/, size_t* _length) { *_length = 0; return B_OK; } status_t AdaptiveBuffering::Write(uint8* /*buffer*/, size_t /*length*/) { return B_OK; } status_t AdaptiveBuffering::Run() { fReadIndex = 0; fWriteIndex = 0; fReadCount = 0; fWriteCount = 0; fWriteStatus = B_OK; fWriteTime = 0; while (fWriteStatus >= B_OK) { bigtime_t start = system_time(); int32 index = fReadIndex; TRACE("%ld. read index %lu, buffer size %lu\n", fReadCount, index, fCurrentBufferSize); fReadBytes[index] = fCurrentBufferSize; status_t status = Read(fBuffers[index], &fReadBytes[index]); if (status < B_OK) return status; TRACE("%ld. read -> %lu bytes\n", fReadCount, fReadBytes[index]); fReadCount++; fReadIndex = (index + 1) % fBufferCount; if (fReadBytes[index] == 0) fFinished = true; release_sem(fReadSem); while (acquire_sem(fWriteSem) == B_INTERRUPTED) ; if (fFinished) break; bigtime_t readTime = system_time() - start; uint32 writeTime = fWriteTime; if (writeTime) { if (writeTime > readTime) { fCurrentBufferSize = fCurrentBufferSize * 8/9; fCurrentBufferSize &= ~65535; } else { fCurrentBufferSize = fCurrentBufferSize * 9/8; fCurrentBufferSize = (fCurrentBufferSize + 65535) & ~65535; if (fCurrentBufferSize > fMaxBufferSize) fCurrentBufferSize = fMaxBufferSize; } } } while (acquire_sem(fFinishedSem) == B_INTERRUPTED) ; return fWriteStatus; } void AdaptiveBuffering::_QuitWriter() { if (fWriterThread >= B_OK) { fQuit = true; release_sem(fReadSem); status_t status; wait_for_thread(fWriterThread, &status); fWriterThread = -1; } } status_t AdaptiveBuffering::_Writer() { while (true) { while (acquire_sem(fReadSem) == B_INTERRUPTED) ; if (fQuit) break; bigtime_t start = system_time(); TRACE("%ld. write index %lu, %p, bytes %lu\n", fWriteCount, fWriteIndex, fBuffers[fWriteIndex], fReadBytes[fWriteIndex]); fWriteStatus = Write(fBuffers[fWriteIndex], fReadBytes[fWriteIndex]); TRACE("%ld. write done\n", fWriteCount); fWriteIndex = (fWriteIndex + 1) % fBufferCount; fWriteTime = uint32(system_time() - start); fWriteCount++; release_sem(fWriteSem); if (fWriteStatus < B_OK) return fWriteStatus; if (fFinished && fWriteCount == fReadCount) release_sem(fFinishedSem); } return B_OK; } /*static*/ status_t AdaptiveBuffering::_Writer(void* self) { return ((AdaptiveBuffering*)self)->_Writer(); }