xref: /haiku/src/add-ons/media/media-add-ons/dvb/PacketQueue.cpp (revision 93a78ecaa45114d68952d08c4778f073515102f2)
1 /*
2  * Copyright (c) 2004-2007 Marcus Overhagen <marcus@overhagen.de>
3  *
4  * Permission is hereby granted, free of charge, to any person
5  * obtaining a copy of this software and associated documentation
6  * files (the "Software"), to deal in the Software without restriction,
7  * including without limitation the rights to use, copy, modify,
8  * merge, publish, distribute, sublicense, and/or sell copies of
9  * the Software, and to permit persons to whom the Software is
10  * furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be
13  * included in all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
17  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
19  * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
20  * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
22  * OTHER DEALINGS IN THE SOFTWARE.
23  */
24 
25 #include <stdio.h>
26 #include <OS.h>
27 
28 #include "Packet.h"
29 #include "PacketQueue.h"
30 
31 #define TRACE_PACKET_QUEUE
32 #ifdef TRACE_PACKET_QUEUE
33   #define TRACE printf
34 #else
35   #define TRACE(a...)
36 #endif
37 
38 
39 PacketQueue::PacketQueue(int max_packets)
40  :	fQueue(new Packet* [max_packets])
41  ,	fSem(create_sem(0, "packet queue sem"))
42  ,	fTerminate(false)
43  ,	fWriteIndex(0)
44  ,	fReadIndex(0)
45  ,	fMaxPackets(max_packets)
46  ,	fPacketCount(0)
47 {
48 }
49 
50 
51 PacketQueue::~PacketQueue()
52 {
53 	Empty();
54 	delete_sem(fSem);
55 	delete [] fQueue;
56 }
57 
58 
59 void
60 PacketQueue::Empty()
61 {
62 	while (fPacketCount--) {
63 		delete fQueue[fReadIndex];
64 		fReadIndex = (fReadIndex + 1) % fMaxPackets;
65 	}
66 }
67 
68 
69 status_t
70 PacketQueue::Insert(Packet *packet)
71 {
72 	if (fTerminate) {
73 		return B_NOT_ALLOWED;
74 	}
75 	if (atomic_add(&fPacketCount, 1) == fMaxPackets) {
76 		atomic_add(&fPacketCount, -1);
77 		return B_WOULD_BLOCK;
78 	}
79 	fQueue[fWriteIndex] = packet;
80 	fWriteIndex = (fWriteIndex + 1) % fMaxPackets;
81 	release_sem(fSem);
82 	return B_OK;
83 }
84 
85 
86 status_t
87 PacketQueue::Remove(Packet **packet)
88 {
89 	if (fTerminate) {
90 		return B_NOT_ALLOWED;
91 	}
92 	if (acquire_sem(fSem) != B_OK)
93 		return B_ERROR;
94 	if (fTerminate) {
95 		return B_NOT_ALLOWED;
96 	}
97 	*packet = fQueue[fReadIndex];
98 	atomic_add(&fPacketCount, -1);
99 	fReadIndex = (fReadIndex + 1) % fMaxPackets;
100 	return B_OK;
101 }
102 
103 
104 void
105 PacketQueue::Flush(bigtime_t timeout)
106 {
107 	if (fTerminate) {
108 		return;
109 	}
110 
111 	timeout += system_time();
112 
113 	while (acquire_sem_etc(fSem, 1, B_ABSOLUTE_TIMEOUT, timeout) == B_OK) {
114 		if (fTerminate) {
115 			return;
116 		}
117 		Packet *packet = fQueue[fReadIndex];
118 		atomic_add(&fPacketCount, -1);
119 		fReadIndex = (fReadIndex + 1) % fMaxPackets;
120 		delete packet;
121 	}
122 }
123 
124 
125 // peeks into queue and delivers a copy
126 status_t
127 PacketQueue::Peek(Packet **packet)
128 {
129 	if (fTerminate) {
130 		return B_NOT_ALLOWED;
131 	}
132 	if (acquire_sem(fSem) != B_OK)
133 		return B_ERROR;
134 	if (fTerminate) {
135 		return B_NOT_ALLOWED;
136 	}
137 	*packet = new Packet(*fQueue[fReadIndex]);
138 	release_sem(fSem);
139 	return B_OK;
140 }
141 
142 
143 void
144 PacketQueue::Terminate()
145 {
146 	fTerminate = true;
147 	release_sem(fSem);
148 }
149 
150 
151 void
152 PacketQueue::Restart()
153 {
154 	Empty();
155 
156 	delete_sem(fSem);
157 	fSem = create_sem(0, "packet queue sem");
158 	fTerminate = false;
159 	fWriteIndex = 0;
160 	fReadIndex = 0;
161 	fPacketCount = 0;
162 }
163