/* * Copyright 2004, Ingo Weinhold, ingo_weinhold@gmx.de. * All rights reserved. Distributed under the terms of the MIT license. */ #ifndef BLOCKING_QUEUE_H #define BLOCKING_QUEUE_H #include #include #include "AutoLocker.h" using std::vector; typedef BLocker Locker; template class BlockingQueue : public Locker { public: BlockingQueue(const char* name = NULL); ~BlockingQueue(); status_t InitCheck() const; status_t Close(bool deleteElements, const vector** elements = NULL); status_t Push(Element* element); status_t Pop(Element** element, bigtime_t timeout = B_INFINITE_TIMEOUT); status_t Peek(Element** element); status_t Remove(Element* element); int32 Size(); private: vector fElements; sem_id fElementSemaphore; }; // constructor template BlockingQueue::BlockingQueue(const char* name) : fElements(), fElementSemaphore(-1) { fElementSemaphore = create_sem(0, (name ? name : "blocking queue")); } // destructor template BlockingQueue::~BlockingQueue() { if (fElementSemaphore >= 0) delete_sem(fElementSemaphore); } // InitCheck template status_t BlockingQueue::InitCheck() const { return (fElementSemaphore < 0 ? fElementSemaphore : B_OK); } // Close template status_t BlockingQueue::Close(bool deleteElements, const vector** elements) { AutoLocker _(this); status_t error = delete_sem(fElementSemaphore); if (error != B_OK) return error; fElementSemaphore = -1; if (elements) *elements = &fElements; if (deleteElements) { int32 count = fElements.size(); for (int32 i = 0; i < count; i++) delete fElements[i]; } return error; } // Push template status_t BlockingQueue::Push(Element* element) { AutoLocker _(this); if (fElementSemaphore < 0) return B_NO_INIT; try { fElements.push_back(element); } catch (std::bad_alloc&) { return B_NO_MEMORY; } status_t error = release_sem(fElementSemaphore); if (error != B_OK) fElements.erase(fElements.begin() + fElements.size() - 1); return error; } // Pop template status_t BlockingQueue::Pop(Element** element, bigtime_t timeout) { status_t error = acquire_sem_etc(fElementSemaphore, 1, B_RELATIVE_TIMEOUT, timeout); if (error != B_OK) return error; AutoLocker _(this); if (fElementSemaphore < 0) return B_NO_INIT; int32 count = fElements.size(); if (count == 0) return B_ERROR; *element = fElements[0]; fElements.erase(fElements.begin()); return B_OK; } // Peek template status_t BlockingQueue::Peek(Element** element) { AutoLocker _(this); if (fElementSemaphore < 0) return B_NO_INIT; int32 count = fElements.size(); if (count == 0) return B_ENTRY_NOT_FOUND; *element = fElements[0]; return B_OK; } // Remove template status_t BlockingQueue::Remove(Element* element) { status_t error = acquire_sem_etc(fElementSemaphore, 1, B_RELATIVE_TIMEOUT, 0); if (error != B_OK) return error; AutoLocker _(this); if (fElementSemaphore < 0) return B_NO_INIT; int32 count = 0; for (int32 i = fElements.size() - 1; i >= 0; i--) { if (fElements[i] == element) { fElements.erase(fElements.begin() + i); count++; } } if (count == 0) { release_sem(fElementSemaphore); return B_ENTRY_NOT_FOUND; } #if 0 if (count > 1) { ERROR(("ERROR: BlockingQueue::Remove(): Removed %ld elements!\n", count)); } #endif return error; } // Size template int32 BlockingQueue::Size() { AutoLocker _(this); return (fElements.size()); } #endif // BLOCKING_QUEUE_H