1 /*
2 * Copyright (c) 2004-2007 Marcus Overhagen <marcus@overhagen.de>
3 *
4 * Permission is hereby granted, free of charge, to any person
5 * obtaining a copy of this software and associated documentation
6 * files (the "Software"), to deal in the Software without restriction,
7 * including without limitation the rights to use, copy, modify,
8 * merge, publish, distribute, sublicense, and/or sell copies of
9 * the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be
13 * included in all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
17 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
19 * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
20 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
22 * OTHER DEALINGS IN THE SOFTWARE.
23 */
24
25 #include <stdio.h>
26 #include <OS.h>
27
28 #include "Packet.h"
29 #include "PacketQueue.h"
30
31 #define TRACE_PACKET_QUEUE
32 #ifdef TRACE_PACKET_QUEUE
33 #define TRACE printf
34 #else
35 #define TRACE(a...)
36 #endif
37
38
PacketQueue(int max_packets)39 PacketQueue::PacketQueue(int max_packets)
40 : fQueue(new Packet* [max_packets])
41 , fSem(create_sem(0, "packet queue sem"))
42 , fTerminate(false)
43 , fWriteIndex(0)
44 , fReadIndex(0)
45 , fMaxPackets(max_packets)
46 , fPacketCount(0)
47 {
48 }
49
50
~PacketQueue()51 PacketQueue::~PacketQueue()
52 {
53 Empty();
54 delete_sem(fSem);
55 delete [] fQueue;
56 }
57
58
59 void
Empty()60 PacketQueue::Empty()
61 {
62 while (fPacketCount--) {
63 delete fQueue[fReadIndex];
64 fReadIndex = (fReadIndex + 1) % fMaxPackets;
65 }
66 }
67
68
69 status_t
Insert(Packet * packet)70 PacketQueue::Insert(Packet *packet)
71 {
72 if (fTerminate) {
73 return B_NOT_ALLOWED;
74 }
75 if (atomic_add(&fPacketCount, 1) == fMaxPackets) {
76 atomic_add(&fPacketCount, -1);
77 return B_WOULD_BLOCK;
78 }
79 fQueue[fWriteIndex] = packet;
80 fWriteIndex = (fWriteIndex + 1) % fMaxPackets;
81 release_sem(fSem);
82 return B_OK;
83 }
84
85
86 status_t
Remove(Packet ** packet)87 PacketQueue::Remove(Packet **packet)
88 {
89 if (fTerminate) {
90 return B_NOT_ALLOWED;
91 }
92 if (acquire_sem(fSem) != B_OK)
93 return B_ERROR;
94 if (fTerminate) {
95 return B_NOT_ALLOWED;
96 }
97 *packet = fQueue[fReadIndex];
98 atomic_add(&fPacketCount, -1);
99 fReadIndex = (fReadIndex + 1) % fMaxPackets;
100 return B_OK;
101 }
102
103
104 void
Flush(bigtime_t timeout)105 PacketQueue::Flush(bigtime_t timeout)
106 {
107 if (fTerminate) {
108 return;
109 }
110
111 timeout += system_time();
112
113 while (acquire_sem_etc(fSem, 1, B_ABSOLUTE_TIMEOUT, timeout) == B_OK) {
114 if (fTerminate) {
115 return;
116 }
117 Packet *packet = fQueue[fReadIndex];
118 atomic_add(&fPacketCount, -1);
119 fReadIndex = (fReadIndex + 1) % fMaxPackets;
120 delete packet;
121 }
122 }
123
124
125 // peeks into queue and delivers a copy
126 status_t
Peek(Packet ** packet)127 PacketQueue::Peek(Packet **packet)
128 {
129 if (fTerminate) {
130 return B_NOT_ALLOWED;
131 }
132 if (acquire_sem(fSem) != B_OK)
133 return B_ERROR;
134 if (fTerminate) {
135 return B_NOT_ALLOWED;
136 }
137 *packet = new Packet(*fQueue[fReadIndex]);
138 release_sem(fSem);
139 return B_OK;
140 }
141
142
143 void
Terminate()144 PacketQueue::Terminate()
145 {
146 fTerminate = true;
147 release_sem(fSem);
148 }
149
150
151 void
Restart()152 PacketQueue::Restart()
153 {
154 Empty();
155
156 delete_sem(fSem);
157 fSem = create_sem(0, "packet queue sem");
158 fTerminate = false;
159 fWriteIndex = 0;
160 fReadIndex = 0;
161 fPacketCount = 0;
162 }
163