xref: /haiku/src/kits/media/legacy/OldBufferStream.h (revision e81a954787e50e56a7f06f72705b7859b6ab06d1)
1 /******************************************************************************
2 
3 	File:	BufferStream.h
4 
5 	Copyright 1995-97, Be Incorporated
6 
7 ******************************************************************************/
8 #ifndef _BUFFER_STREAM_H
9 #define _BUFFER_STREAM_H
10 
11 #include <stdlib.h>
12 #include <OS.h>
13 #include <SupportDefs.h>
14 #include <Locker.h>
15 #include <Messenger.h>
16 
17 
18 class BSubscriber;
19 
20 
21 /* ================
22    Per-subscriber information.
23    ================ */
24 
25 struct _sbuf_info;
26 
27 typedef struct _sub_info {
28   _sub_info			*fNext;		/* next subscriber in the stream*/
29   _sub_info			*fPrev;		/* previous subscriber in the stream */
30   _sbuf_info		*fRel;		/* next buf to be released */
31   _sbuf_info		*fAcq;		/* next buf to be acquired */
32   sem_id			fSem;		/* semaphore used for blocking */
33   bigtime_t			fTotalTime;	/* accumulated time between acq/rel */
34   int32				fHeld;		/* # of buffers acq'd but not yet rel'd */
35   sem_id			fBlockedOn;	/* the semaphore being waited on */
36 								/* or B_BAD_SEM_ID if not blocked */
37 } *subscriber_id;
38 
39 
40 /* ================
41    Per-buffer information
42    ================ */
43 
44 typedef struct _sbuf_info {
45   _sbuf_info		*fNext;		/* next "newer" buffer in the chain */
46   subscriber_id		fAvailTo;	/* next subscriber to acquire this buffer */
47   subscriber_id		fHeldBy;	/* subscriber that's acquired this buffer */
48   bigtime_t			fAcqTime;	/* time at which this buffer was acquired */
49   area_id			fAreaID;	/* for system memory allocation calls */
50   char				*fAddress;
51   int32				fSize;     /* usable portion can be smaller than ... */
52   int32				fAreaSize; /* ... the size of the area. */
53   bool				fIsFinal;	/* TRUE => stream is stopping */
54 } *buffer_id;
55 
56 
57 /* ================
58    Interface definition for BBufferStream class
59    ================ */
60 
61 /* We've chosen B_MAX_SUBSCRIBER_COUNT and B_MAX_BUFFER_COUNT to be small
62  * enough so that a BBufferStream structure fits in one 4096 byte page.
63  */
64 #define	B_MAX_SUBSCRIBER_COUNT	52
65 #define	B_MAX_BUFFER_COUNT		32
66 
67 class BBufferStream;
68 class BBufferStreamManager;
69 
70 typedef BBufferStream* stream_id;  // for now
71 
72 
73 class BAbstractBufferStream
74 {
75 public:
76 #if __GNUC__ > 3
77 	virtual				~BAbstractBufferStream();
78 #endif
79 
80 	virtual	status_t	GetStreamParameters(size_t *bufferSize,
81 											int32 *bufferCount,
82 											bool *isRunning,
83 											int32 *subscriberCount) const;
84 
85 	virtual	status_t	SetStreamBuffers(size_t bufferSize,
86 										 int32 bufferCount);
87 
88 	virtual	status_t	StartStreaming();
89 	virtual	status_t	StopStreaming();
90 
91 protected:
92 
93 virtual	void		_ReservedAbstractBufferStream1();
94 virtual	void		_ReservedAbstractBufferStream2();
95 virtual	void		_ReservedAbstractBufferStream3();
96 virtual	void		_ReservedAbstractBufferStream4();
97 
98 	friend class BSubscriber;
99 	friend class BBufferStreamManager;
100 
101 	virtual stream_id	StreamID() const;
102 	/* stream identifier for direct access */
103 
104 	/* Create or delete a subscriber id for subsequent operations */
105 	virtual	status_t	Subscribe(char *name,
106 								  subscriber_id *subID,
107 								  sem_id semID);
108 	virtual	status_t	Unsubscribe(subscriber_id subID);
109 
110 /* Enter into or quit the stream */
111 	virtual	status_t	EnterStream(subscriber_id subID,
112 									subscriber_id neighbor,
113 									bool before);
114 
115 	virtual	status_t	ExitStream(subscriber_id subID);
116 
117 	virtual BMessenger*	Server() const;	/* message pipe to server */
118 	status_t 		SendRPC(BMessage* msg, BMessage* reply = NULL) const;
119 };
120 
121 
122 class BBufferStream : public BAbstractBufferStream
123 {
124 public:
125 
126 						BBufferStream(size_t headerSize,
127 									  BBufferStreamManager* controller,
128 									  BSubscriber* headFeeder,
129 									  BSubscriber* tailFeeder);
130 		virtual			~BBufferStream();
131 
132 /* BBufferStreams are allocated on shared memory pages */
133 		void			*operator new(size_t size);
134 		void			operator delete(void *stream, size_t size);
135 
136 /* Return header size */
137 		size_t			HeaderSize() const;
138 
139 /* These four functions are delegated to the stream controller */
140 		status_t		GetStreamParameters(size_t *bufferSize,
141 											int32 *bufferCount,
142 											bool *isRunning,
143 											int32 *subscriberCount) const;
144 
145 		status_t		SetStreamBuffers(size_t bufferSize,
146 										 int32 bufferCount);
147 
148 		status_t		StartStreaming();
149 		status_t		StopStreaming();
150 
151 /* Get the controller for delegation */
152 		BBufferStreamManager *StreamManager() const;
153 
154 /* number of buffers in stream */
155 		int32			CountBuffers() const;
156 
157 /* Create or delete a subscriber id for subsequent operations */
158 		status_t		Subscribe(char *name,
159 								  subscriber_id *subID,
160 								  sem_id semID);
161 
162 		status_t		Unsubscribe(subscriber_id subID);
163 
164 /* Enter into or quit the stream */
165 		status_t		EnterStream(subscriber_id subID,
166 									subscriber_id neighbor,
167 									bool before);
168 
169 		status_t		ExitStream(subscriber_id subID);
170 
171 /* queries about a subscriber */
172 		bool			IsSubscribed(subscriber_id subID);
173 		bool			IsEntered(subscriber_id subID);
174 
175 		status_t		SubscriberInfo(subscriber_id subID,
176 									   char** name,
177 									   stream_id* streamID,
178 									   int32* position);
179 
180 /* Force an error return of a subscriber if it's blocked */
181 		status_t		UnblockSubscriber(subscriber_id subID);
182 
183 /* Acquire and release a buffer */
184 		status_t		AcquireBuffer(subscriber_id subID,
185 									  buffer_id *bufID,
186 									  bigtime_t timeout);
187 		status_t		ReleaseBuffer(subscriber_id subID);
188 
189 /* Get the attributes of a particular buffer */
190 		size_t			BufferSize(buffer_id bufID) const;
191 		char			*BufferData(buffer_id bufID) const;
192 		bool			IsFinalBuffer(buffer_id bufID) const;
193 
194 /* Get attributes of a particular subscriber */
195 		int32			CountBuffersHeld(subscriber_id subID);
196 
197 /* Queries for the BBufferStream */
198 		int32			CountSubscribers() const;
199 		int32			CountEnteredSubscribers() const;
200 
201 		subscriber_id	FirstSubscriber() const;
202 		subscriber_id	LastSubscriber() const;
203 		subscriber_id	NextSubscriber(subscriber_id subID);
204 		subscriber_id	PrevSubscriber(subscriber_id subID);
205 
206 /* debugging aids */
207 		void			PrintStream();
208 		void			PrintBuffers();
209 		void			PrintSubscribers();
210 
211 /* gaining exclusive access to the BBufferStream */
212 		bool 			Lock();
213 		void			Unlock();
214 
215 /* introduce a new buffer into the "newest" end of the chain */
216 		status_t		AddBuffer(buffer_id bufID);
217 
218 /* remove a buffer from the "oldest" end of the chain */
219 		buffer_id		RemoveBuffer(bool force);
220 
221 /* allocate a buffer from shared memory and create a bufID for it. */
222 		buffer_id		CreateBuffer(size_t size, bool isFinal);
223 
224 /* deallocate a buffer and returns its bufID to the freelist */
225 		void			DestroyBuffer(buffer_id bufID);
226 
227 /* remove and destroy any "newest" buffers from the head of the chain
228  * that have not yet been claimed by any subscribers.  If there are
229  * no subscribers, this clears the entire chain.
230  */
231 		void			RescindBuffers();
232 
233 /* ================
234    Private member functions that assume locking already has been done.
235    ================ */
236 
237 private:
238 
239 virtual	void		_ReservedBufferStream1();
240 virtual	void		_ReservedBufferStream2();
241 virtual	void		_ReservedBufferStream3();
242 virtual	void		_ReservedBufferStream4();
243 
244 /* initialize the free list of subscribers */
245 		void			InitSubscribers();
246 
247 /* return TRUE if subID appears valid */
248 		bool			IsSubscribedSafe(subscriber_id subID) const;
249 
250 /* return TRUE if subID is entered into the stream */
251 		bool			IsEnteredSafe(subscriber_id subID) const;
252 
253 /* initialize the free list of buffer IDs */
254 		void			InitBuffers();
255 
256 /* Wake a blocked subscriber */
257 		status_t		WakeSubscriber(subscriber_id subID);
258 
259 /* Give subID all the buffers it can get */
260 		void			InheritBuffers(subscriber_id subID);
261 
262 /* Relinquish any buffers held by subID */
263 		void			BequeathBuffers(subscriber_id subID);
264 
265 /* Fast version of ReleaseBuffer() */
266 		status_t		ReleaseBufferSafe(subscriber_id subID);
267 
268 /* Release a buffer to a subscriber */
269 		status_t		ReleaseBufferTo(buffer_id bufID, subscriber_id subID);
270 
271 /* deallocate all buffers */
272 		void			FreeAllBuffers();
273 
274 /* deallocate all subscribers */
275 		void			FreeAllSubscribers();
276 
277 /* ================
278    Private data members
279    ================ */
280 
281 		BLocker				fLock;
282 		area_id				fAreaID;		/* area id for this BBufferStream */
283 		BBufferStreamManager	*fStreamManager;
284 		BSubscriber			*fHeadFeeder;
285 		BSubscriber			*fTailFeeder;
286 		size_t				fHeaderSize;
287 
288 		/* ================
289 		   subscribers
290 		   ================ */
291 
292 		_sub_info			*fFreeSubs;		/* free list of subscribers */
293 		_sub_info			*fFirstSub;		/* first entered in itinierary */
294 		_sub_info			*fLastSub;		/* last entered in itinerary */
295 
296 		sem_id				fFirstSem;		/* semaphore used by fFirstSub */
297 		int32				fSubCount;
298 		int32				fEnteredSubCount;
299 
300 		_sub_info			fSubscribers[B_MAX_SUBSCRIBER_COUNT];
301 
302 		/* ================
303 		   buffers
304 		   ================ */
305 
306 		_sbuf_info			*fFreeBuffers;
307 		_sbuf_info			*fOldestBuffer;	/* first in line */
308 		_sbuf_info			*fNewestBuffer;	/* fNewest->fNext = NULL */
309 		int32				fCountBuffers;
310 
311 		_sbuf_info			fBuffers[B_MAX_BUFFER_COUNT];
312 
313 		uint32				_reserved[4];
314 };
315 
316 #endif 	// #ifdef _BUFFER_STREAM_H
317