// BlockingQueue.h // // Copyright (c) 2004, Ingo Weinhold (bonefish@cs.tu-berlin.de) // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL // THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. // // Except as contained in this notice, the name of a copyright holder shall // not be used in advertising or otherwise to promote the sale, use or other // dealings in this Software without prior written authorization of the // copyright holder. #ifndef BLOCKING_QUEUE_H #define BLOCKING_QUEUE_H #include #include #include "AutoLocker.h" using std::vector; typedef BLocker Locker; template class BlockingQueue : public Locker { public: BlockingQueue(const char* name = NULL); ~BlockingQueue(); status_t InitCheck() const; status_t Close(bool deleteElements, const vector** elements = NULL); status_t Push(Element* element); status_t Pop(Element** element, bigtime_t timeout = B_INFINITE_TIMEOUT); status_t Peek(Element** element); status_t Remove(Element* element); int32 Size(); private: vector fElements; sem_id fElementSemaphore; }; // constructor template BlockingQueue::BlockingQueue(const char* name) : fElements(), fElementSemaphore(-1) { fElementSemaphore = create_sem(0, (name ? name : "blocking queue")); } // destructor template BlockingQueue::~BlockingQueue() { if (fElementSemaphore >= 0) delete_sem(fElementSemaphore); } // InitCheck template status_t BlockingQueue::InitCheck() const { return (fElementSemaphore < 0 ? fElementSemaphore : B_OK); } // Close template status_t BlockingQueue::Close(bool deleteElements, const vector** elements) { AutoLocker _(this); status_t error = delete_sem(fElementSemaphore); if (error != B_OK) return error; fElementSemaphore = -1; if (elements) *elements = &fElements; if (deleteElements) { int32 count = fElements.size(); for (int32 i = 0; i < count; i++) delete fElements[i]; } return error; } // Push template status_t BlockingQueue::Push(Element* element) { AutoLocker _(this); if (fElementSemaphore < 0) return B_NO_INIT; try { fElements.push_back(element); } catch (std::bad_alloc&) { return B_NO_MEMORY; } status_t error = release_sem(fElementSemaphore); if (error != B_OK) fElements.erase(fElements.begin() + fElements.size() - 1); return error; } // Pop template status_t BlockingQueue::Pop(Element** element, bigtime_t timeout) { status_t error = acquire_sem_etc(fElementSemaphore, 1, B_RELATIVE_TIMEOUT, timeout); if (error != B_OK) return error; AutoLocker _(this); if (fElementSemaphore < 0) return B_NO_INIT; int32 count = fElements.size(); if (count == 0) return B_ERROR; *element = fElements[0]; fElements.erase(fElements.begin()); return B_OK; } // Peek template status_t BlockingQueue::Peek(Element** element) { AutoLocker _(this); if (fElementSemaphore < 0) return B_NO_INIT; int32 count = fElements.size(); if (count == 0) return B_ENTRY_NOT_FOUND; *element = fElements[0]; return B_OK; } // Remove template status_t BlockingQueue::Remove(Element* element) { status_t error = acquire_sem_etc(fElementSemaphore, 1, B_RELATIVE_TIMEOUT, 0); if (error != B_OK) return error; AutoLocker _(this); if (fElementSemaphore < 0) return B_NO_INIT; int32 count = 0; for (int32 i = fElements.size() - 1; i >= 0; i--) { if (fElements[i] == element) { fElements.erase(fElements.begin() + i); count++; } } if (count == 0) { release_sem(fElementSemaphore); return B_ENTRY_NOT_FOUND; } #if 0 if (count > 1) { ERROR(("ERROR: BlockingQueue::Remove(): Removed %ld elements!\n", count)); } #endif return error; } // Size template int32 BlockingQueue::Size() { AutoLocker _(this); return (fElements.size()); } #endif // BLOCKING_QUEUE_H