1 /* 2 * Copyright (c) 2004-2007 Marcus Overhagen <marcus@overhagen.de> 3 * 4 * Permission is hereby granted, free of charge, to any person 5 * obtaining a copy of this software and associated documentation 6 * files (the "Software"), to deal in the Software without restriction, 7 * including without limitation the rights to use, copy, modify, 8 * merge, publish, distribute, sublicense, and/or sell copies of 9 * the Software, and to permit persons to whom the Software is 10 * furnished to do so, subject to the following conditions: 11 * 12 * The above copyright notice and this permission notice shall be 13 * included in all copies or substantial portions of the Software. 14 * 15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 16 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 17 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 18 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 19 * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 20 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 22 * OTHER DEALINGS IN THE SOFTWARE. 23 */ 24 25 #include <stdio.h> 26 #include <OS.h> 27 28 #include "Packet.h" 29 #include "PacketQueue.h" 30 31 #define TRACE_PACKET_QUEUE 32 #ifdef TRACE_PACKET_QUEUE 33 #define TRACE printf 34 #else 35 #define TRACE(a...) 36 #endif 37 38 39 PacketQueue::PacketQueue(int max_packets) 40 : fQueue(new Packet* [max_packets]) 41 , fSem(create_sem(0, "packet queue sem")) 42 , fTerminate(false) 43 , fWriteIndex(0) 44 , fReadIndex(0) 45 , fMaxPackets(max_packets) 46 , fPacketCount(0) 47 { 48 } 49 50 51 PacketQueue::~PacketQueue() 52 { 53 Empty(); 54 delete_sem(fSem); 55 delete [] fQueue; 56 } 57 58 59 void 60 PacketQueue::Empty() 61 { 62 while (fPacketCount--) { 63 delete fQueue[fReadIndex]; 64 fReadIndex = (fReadIndex + 1) % fMaxPackets; 65 } 66 } 67 68 69 status_t 70 PacketQueue::Insert(Packet *packet) 71 { 72 if (fTerminate) { 73 return B_NOT_ALLOWED; 74 } 75 if (atomic_add(&fPacketCount, 1) == fMaxPackets) { 76 atomic_add(&fPacketCount, -1); 77 return B_WOULD_BLOCK; 78 } 79 fQueue[fWriteIndex] = packet; 80 fWriteIndex = (fWriteIndex + 1) % fMaxPackets; 81 release_sem(fSem); 82 return B_OK; 83 } 84 85 86 status_t 87 PacketQueue::Remove(Packet **packet) 88 { 89 if (fTerminate) { 90 return B_NOT_ALLOWED; 91 } 92 if (acquire_sem(fSem) != B_OK) 93 return B_ERROR; 94 if (fTerminate) { 95 return B_NOT_ALLOWED; 96 } 97 *packet = fQueue[fReadIndex]; 98 atomic_add(&fPacketCount, -1); 99 fReadIndex = (fReadIndex + 1) % fMaxPackets; 100 return B_OK; 101 } 102 103 104 void 105 PacketQueue::Flush(bigtime_t timeout) 106 { 107 if (fTerminate) { 108 return; 109 } 110 111 timeout += system_time(); 112 113 while (acquire_sem_etc(fSem, 1, B_ABSOLUTE_TIMEOUT, timeout) == B_OK) { 114 if (fTerminate) { 115 return; 116 } 117 Packet *packet = fQueue[fReadIndex]; 118 atomic_add(&fPacketCount, -1); 119 fReadIndex = (fReadIndex + 1) % fMaxPackets; 120 delete packet; 121 } 122 } 123 124 125 // peeks into queue and delivers a copy 126 status_t 127 PacketQueue::Peek(Packet **packet) 128 { 129 if (fTerminate) { 130 return B_NOT_ALLOWED; 131 } 132 if (acquire_sem(fSem) != B_OK) 133 return B_ERROR; 134 if (fTerminate) { 135 return B_NOT_ALLOWED; 136 } 137 *packet = new Packet(*fQueue[fReadIndex]); 138 release_sem(fSem); 139 return B_OK; 140 } 141 142 143 void 144 PacketQueue::Terminate() 145 { 146 fTerminate = true; 147 release_sem(fSem); 148 } 149 150 151 void 152 PacketQueue::Restart() 153 { 154 Empty(); 155 156 delete_sem(fSem); 157 fSem = create_sem(0, "packet queue sem"); 158 fTerminate = false; 159 fWriteIndex = 0; 160 fReadIndex = 0; 161 fPacketCount = 0; 162 } 163