1 /****************************************************************************** 2 3 File: BufferStream.h 4 5 Copyright 1995-97, Be Incorporated 6 7 ******************************************************************************/ 8 #ifndef _BUFFER_STREAM_H 9 #define _BUFFER_STREAM_H 10 11 #include <stdlib.h> 12 #include <OS.h> 13 #include <SupportDefs.h> 14 #include <Locker.h> 15 #include <Messenger.h> 16 17 18 class BSubscriber; 19 20 21 /* ================ 22 Per-subscriber information. 23 ================ */ 24 25 struct _sbuf_info; 26 27 typedef struct _sub_info { 28 _sub_info *fNext; /* next subscriber in the stream*/ 29 _sub_info *fPrev; /* previous subscriber in the stream */ 30 _sbuf_info *fRel; /* next buf to be released */ 31 _sbuf_info *fAcq; /* next buf to be acquired */ 32 sem_id fSem; /* semaphore used for blocking */ 33 bigtime_t fTotalTime; /* accumulated time between acq/rel */ 34 int32 fHeld; /* # of buffers acq'd but not yet rel'd */ 35 sem_id fBlockedOn; /* the semaphore being waited on */ 36 /* or B_BAD_SEM_ID if not blocked */ 37 } *subscriber_id; 38 39 40 /* ================ 41 Per-buffer information 42 ================ */ 43 44 typedef struct _sbuf_info { 45 _sbuf_info *fNext; /* next "newer" buffer in the chain */ 46 subscriber_id fAvailTo; /* next subscriber to acquire this buffer */ 47 subscriber_id fHeldBy; /* subscriber that's acquired this buffer */ 48 bigtime_t fAcqTime; /* time at which this buffer was acquired */ 49 area_id fAreaID; /* for system memory allocation calls */ 50 char *fAddress; 51 int32 fSize; /* usable portion can be smaller than ... */ 52 int32 fAreaSize; /* ... the size of the area. */ 53 bool fIsFinal; /* TRUE => stream is stopping */ 54 } *buffer_id; 55 56 57 /* ================ 58 Interface definition for BBufferStream class 59 ================ */ 60 61 /* We've chosen B_MAX_SUBSCRIBER_COUNT and B_MAX_BUFFER_COUNT to be small 62 * enough so that a BBufferStream structure fits in one 4096 byte page. 63 */ 64 #define B_MAX_SUBSCRIBER_COUNT 52 65 #define B_MAX_BUFFER_COUNT 32 66 67 class BBufferStream; 68 class BBufferStreamManager; 69 70 typedef BBufferStream* stream_id; // for now 71 72 73 class BAbstractBufferStream 74 { 75 public: 76 #if __GNUC__ > 3 77 virtual ~BAbstractBufferStream(); 78 #endif 79 80 virtual status_t GetStreamParameters(size_t *bufferSize, 81 int32 *bufferCount, 82 bool *isRunning, 83 int32 *subscriberCount) const; 84 85 virtual status_t SetStreamBuffers(size_t bufferSize, 86 int32 bufferCount); 87 88 virtual status_t StartStreaming(); 89 virtual status_t StopStreaming(); 90 91 protected: 92 93 virtual void _ReservedAbstractBufferStream1(); 94 virtual void _ReservedAbstractBufferStream2(); 95 virtual void _ReservedAbstractBufferStream3(); 96 virtual void _ReservedAbstractBufferStream4(); 97 98 friend class BSubscriber; 99 friend class BBufferStreamManager; 100 101 virtual stream_id StreamID() const; 102 /* stream identifier for direct access */ 103 104 /* Create or delete a subscriber id for subsequent operations */ 105 virtual status_t Subscribe(char *name, 106 subscriber_id *subID, 107 sem_id semID); 108 virtual status_t Unsubscribe(subscriber_id subID); 109 110 /* Enter into or quit the stream */ 111 virtual status_t EnterStream(subscriber_id subID, 112 subscriber_id neighbor, 113 bool before); 114 115 virtual status_t ExitStream(subscriber_id subID); 116 117 virtual BMessenger* Server() const; /* message pipe to server */ 118 status_t SendRPC(BMessage* msg, BMessage* reply = NULL) const; 119 }; 120 121 122 class BBufferStream : public BAbstractBufferStream 123 { 124 public: 125 126 BBufferStream(size_t headerSize, 127 BBufferStreamManager* controller, 128 BSubscriber* headFeeder, 129 BSubscriber* tailFeeder); 130 virtual ~BBufferStream(); 131 132 /* BBufferStreams are allocated on shared memory pages */ 133 void *operator new(size_t size); 134 void operator delete(void *stream, size_t size); 135 136 /* Return header size */ 137 size_t HeaderSize() const; 138 139 /* These four functions are delegated to the stream controller */ 140 status_t GetStreamParameters(size_t *bufferSize, 141 int32 *bufferCount, 142 bool *isRunning, 143 int32 *subscriberCount) const; 144 145 status_t SetStreamBuffers(size_t bufferSize, 146 int32 bufferCount); 147 148 status_t StartStreaming(); 149 status_t StopStreaming(); 150 151 /* Get the controller for delegation */ 152 BBufferStreamManager *StreamManager() const; 153 154 /* number of buffers in stream */ 155 int32 CountBuffers() const; 156 157 /* Create or delete a subscriber id for subsequent operations */ 158 status_t Subscribe(char *name, 159 subscriber_id *subID, 160 sem_id semID); 161 162 status_t Unsubscribe(subscriber_id subID); 163 164 /* Enter into or quit the stream */ 165 status_t EnterStream(subscriber_id subID, 166 subscriber_id neighbor, 167 bool before); 168 169 status_t ExitStream(subscriber_id subID); 170 171 /* queries about a subscriber */ 172 bool IsSubscribed(subscriber_id subID); 173 bool IsEntered(subscriber_id subID); 174 175 status_t SubscriberInfo(subscriber_id subID, 176 char** name, 177 stream_id* streamID, 178 int32* position); 179 180 /* Force an error return of a subscriber if it's blocked */ 181 status_t UnblockSubscriber(subscriber_id subID); 182 183 /* Acquire and release a buffer */ 184 status_t AcquireBuffer(subscriber_id subID, 185 buffer_id *bufID, 186 bigtime_t timeout); 187 status_t ReleaseBuffer(subscriber_id subID); 188 189 /* Get the attributes of a particular buffer */ 190 size_t BufferSize(buffer_id bufID) const; 191 char *BufferData(buffer_id bufID) const; 192 bool IsFinalBuffer(buffer_id bufID) const; 193 194 /* Get attributes of a particular subscriber */ 195 int32 CountBuffersHeld(subscriber_id subID); 196 197 /* Queries for the BBufferStream */ 198 int32 CountSubscribers() const; 199 int32 CountEnteredSubscribers() const; 200 201 subscriber_id FirstSubscriber() const; 202 subscriber_id LastSubscriber() const; 203 subscriber_id NextSubscriber(subscriber_id subID); 204 subscriber_id PrevSubscriber(subscriber_id subID); 205 206 /* debugging aids */ 207 void PrintStream(); 208 void PrintBuffers(); 209 void PrintSubscribers(); 210 211 /* gaining exclusive access to the BBufferStream */ 212 bool Lock(); 213 void Unlock(); 214 215 /* introduce a new buffer into the "newest" end of the chain */ 216 status_t AddBuffer(buffer_id bufID); 217 218 /* remove a buffer from the "oldest" end of the chain */ 219 buffer_id RemoveBuffer(bool force); 220 221 /* allocate a buffer from shared memory and create a bufID for it. */ 222 buffer_id CreateBuffer(size_t size, bool isFinal); 223 224 /* deallocate a buffer and returns its bufID to the freelist */ 225 void DestroyBuffer(buffer_id bufID); 226 227 /* remove and destroy any "newest" buffers from the head of the chain 228 * that have not yet been claimed by any subscribers. If there are 229 * no subscribers, this clears the entire chain. 230 */ 231 void RescindBuffers(); 232 233 /* ================ 234 Private member functions that assume locking already has been done. 235 ================ */ 236 237 private: 238 239 virtual void _ReservedBufferStream1(); 240 virtual void _ReservedBufferStream2(); 241 virtual void _ReservedBufferStream3(); 242 virtual void _ReservedBufferStream4(); 243 244 /* initialize the free list of subscribers */ 245 void InitSubscribers(); 246 247 /* return TRUE if subID appears valid */ 248 bool IsSubscribedSafe(subscriber_id subID) const; 249 250 /* return TRUE if subID is entered into the stream */ 251 bool IsEnteredSafe(subscriber_id subID) const; 252 253 /* initialize the free list of buffer IDs */ 254 void InitBuffers(); 255 256 /* Wake a blocked subscriber */ 257 status_t WakeSubscriber(subscriber_id subID); 258 259 /* Give subID all the buffers it can get */ 260 void InheritBuffers(subscriber_id subID); 261 262 /* Relinquish any buffers held by subID */ 263 void BequeathBuffers(subscriber_id subID); 264 265 /* Fast version of ReleaseBuffer() */ 266 status_t ReleaseBufferSafe(subscriber_id subID); 267 268 /* Release a buffer to a subscriber */ 269 status_t ReleaseBufferTo(buffer_id bufID, subscriber_id subID); 270 271 /* deallocate all buffers */ 272 void FreeAllBuffers(); 273 274 /* deallocate all subscribers */ 275 void FreeAllSubscribers(); 276 277 /* ================ 278 Private data members 279 ================ */ 280 281 BLocker fLock; 282 area_id fAreaID; /* area id for this BBufferStream */ 283 BBufferStreamManager *fStreamManager; 284 BSubscriber *fHeadFeeder; 285 BSubscriber *fTailFeeder; 286 size_t fHeaderSize; 287 288 /* ================ 289 subscribers 290 ================ */ 291 292 _sub_info *fFreeSubs; /* free list of subscribers */ 293 _sub_info *fFirstSub; /* first entered in itinierary */ 294 _sub_info *fLastSub; /* last entered in itinerary */ 295 296 sem_id fFirstSem; /* semaphore used by fFirstSub */ 297 int32 fSubCount; 298 int32 fEnteredSubCount; 299 300 _sub_info fSubscribers[B_MAX_SUBSCRIBER_COUNT]; 301 302 /* ================ 303 buffers 304 ================ */ 305 306 _sbuf_info *fFreeBuffers; 307 _sbuf_info *fOldestBuffer; /* first in line */ 308 _sbuf_info *fNewestBuffer; /* fNewest->fNext = NULL */ 309 int32 fCountBuffers; 310 311 _sbuf_info fBuffers[B_MAX_BUFFER_COUNT]; 312 313 uint32 _reserved[4]; 314 }; 315 316 #endif // #ifdef _BUFFER_STREAM_H 317