xref: /haiku/src/servers/registrar/MessagingService.cpp (revision 25a7b01d15612846f332751841da3579db313082)
1 /*
2  * Copyright 2005, Ingo Weinhold, bonefish@users.sf.net. All rights reserved.
3  * Distributed under the terms of the MIT License.
4  */
5 
6 #include <map>
7 #include <new>
8 
9 #include <string.h>
10 
11 #include <Autolock.h>
12 
13 #include <syscalls.h>
14 
15 #include "Debug.h"
16 #include "MessageDeliverer.h"
17 #include "MessagingService.h"
18 
19 using std::map;
20 using std::nothrow;
21 
22 // sService -- the singleton instance
23 MessagingService *MessagingService::sService = NULL;
24 
25 /*!	\class MessagingArea
26 	\brief Represents an area of the messaging service shared between kernel
27 		   and registrar.
28 
29 	The main purpose of the class is to retrieve (and remove) commands from
30 	the area.
31 */
32 
33 // constructor
MessagingArea()34 MessagingArea::MessagingArea()
35 {
36 }
37 
38 // destructor
~MessagingArea()39 MessagingArea::~MessagingArea()
40 {
41 	if (fID >= 0)
42 		delete_area(fID);
43 }
44 
45 // Create
46 status_t
Create(area_id kernelAreaID,sem_id lockSem,sem_id counterSem,MessagingArea * & _area)47 MessagingArea::Create(area_id kernelAreaID, sem_id lockSem, sem_id counterSem,
48 	MessagingArea *&_area)
49 {
50 	// allocate the object on the heap
51 	MessagingArea *area = new(nothrow) MessagingArea;
52 	if (!area)
53 		return B_NO_MEMORY;
54 
55 	// clone the kernel area
56 	area_id areaID = clone_area("messaging", (void**)&area->fHeader,
57 		B_ANY_ADDRESS, B_READ_AREA | B_WRITE_AREA, kernelAreaID);
58 	if (areaID < 0) {
59 		delete area;
60 		return areaID;
61 	}
62 
63 	// finish the initialization of the object
64 	area->fID = areaID;
65 	area->fSize = area->fHeader->size;
66 	area->fLockSem = lockSem;
67 	area->fCounterSem = counterSem;
68 	area->fNextArea = NULL;
69 
70 	_area = area;
71 	return B_OK;
72 }
73 
74 // Lock
75 bool
Lock()76 MessagingArea::Lock()
77 {
78 	// benaphore-like locking
79 	if (atomic_add(&fHeader->lock_counter, 1) == 0)
80 		return true;
81 
82 	return (acquire_sem(fLockSem) == B_OK);
83 }
84 
85 // Unlock
86 void
Unlock()87 MessagingArea::Unlock()
88 {
89 	if (atomic_add(&fHeader->lock_counter, -1) > 1)
90 		release_sem(fLockSem);
91 }
92 
93 // ID
94 area_id
ID() const95 MessagingArea::ID() const
96 {
97 	return fID;
98 }
99 
100 // Size
101 int32
Size() const102 MessagingArea::Size() const
103 {
104 	return fSize;
105 }
106 
107 // CountCommands
108 int32
CountCommands() const109 MessagingArea::CountCommands() const
110 {
111 	return fHeader->command_count;
112 }
113 
114 // PopCommand
115 const messaging_command *
PopCommand()116 MessagingArea::PopCommand()
117 {
118 	if (fHeader->command_count == 0)
119 		return NULL;
120 
121 	// get the command
122 	messaging_command *command
123 		= (messaging_command*)((char*)fHeader + fHeader->first_command);
124 
125 	// remove it from the area
126 	// (as long as the area is still locked, noone will overwrite the contents)
127 	if (--fHeader->command_count == 0)
128 		fHeader->first_command = fHeader->last_command = 0;
129 	else
130 		fHeader->first_command = command->next_command;
131 
132 	return command;
133 }
134 
135 // Discard
136 void
Discard()137 MessagingArea::Discard()
138 {
139 	fHeader->size = 0;
140 }
141 
142 // NextKernelAreaID
143 area_id
NextKernelAreaID() const144 MessagingArea::NextKernelAreaID() const
145 {
146 	return fHeader->next_kernel_area;
147 }
148 
149 // SetNextArea
150 void
SetNextArea(MessagingArea * area)151 MessagingArea::SetNextArea(MessagingArea *area)
152 {
153 	fNextArea = area;
154 }
155 
156 // NextArea
157 MessagingArea *
NextArea() const158 MessagingArea::NextArea() const
159 {
160 	return fNextArea;
161 }
162 
163 
164 // #pragma mark -
165 
166 // constructor
MessagingCommandHandler()167 MessagingCommandHandler::MessagingCommandHandler()
168 {
169 }
170 
171 // destructor
~MessagingCommandHandler()172 MessagingCommandHandler::~MessagingCommandHandler()
173 {
174 }
175 
176 
177 // #pragma mark -
178 
179 // DefaultSendCommandHandler
180 class MessagingService::DefaultSendCommandHandler
181 	: public MessagingCommandHandler {
182 
HandleMessagingCommand(uint32 _command,const void * data,int32 dataSize)183 	virtual void HandleMessagingCommand(uint32 _command, const void *data,
184 		int32 dataSize)
185 	{
186 		const messaging_command_send_message *sendData
187 			= (const messaging_command_send_message*)data;
188 		const void *messageData = (uint8*)data
189 			+ sizeof(messaging_command_send_message)
190 			+ sizeof(messaging_target) * sendData->target_count;
191 
192 		DefaultMessagingTargetSet set(sendData->targets,
193 			sendData->target_count);
194 		MessageDeliverer::Default()->DeliverMessage(messageData,
195 			sendData->message_size, set);
196 	}
197 };
198 
199 // CommandHandlerMap
200 struct MessagingService::CommandHandlerMap
201 	: map<uint32, MessagingCommandHandler*> {
202 };
203 
204 
205 /*! \class MessagingService
206 	\brief Userland implementation of the kernel -> userland messaging service.
207 
208 	This service provides a way for the kernel to send BMessages (usually
209 	notification (e.g. node monitoring) messages) to userland applications.
210 
211 	The kernel could write the messages directly to the respective target ports,
212 	but this has the disadvantage, that a message needs to be dropped, if the
213 	port is full at the moment of sending. By transferring the message to the
214 	registrar, it is possible to use the MessageDeliverer which retries sending
215 	messages on full ports.
216 
217 	The message transfer is implemented via areas shared between kernel
218 	and registrar. By default one area is used as a ring buffer. The kernel
219 	adds messages to it, the registrar removes them. If the area is full, the
220 	kernel creates a new one and adds it to the area list.
221 
222 	While the service is called `messaging service' and we were speaking of
223 	`messages' being passed through the areas, the service is actually more
224 	general. In fact `commands' are passed through the areas. Currently the
225 	only implemented command type is to send a message, but it is very easy
226 	to add further command types (e.g. one for alerting the user in case of
227 	errors).
228 
229 	The MessagingService maintains a mapping of command types to command
230 	handlers (MessagingCommandHandler, which perform the actual processing
231 	of the commands), that can be altered via
232 	MessagingService::SetCommandHandler().
233 */
234 
235 // constructor
MessagingService()236 MessagingService::MessagingService()
237 	: fLock("messaging service"),
238 	  fLockSem(-1),
239 	  fCounterSem(-1),
240 	  fFirstArea(NULL),
241 	  fCommandHandlers(NULL),
242 	  fCommandProcessor(-1),
243 	  fTerminating(false)
244 {
245 }
246 
247 // destructor
~MessagingService()248 MessagingService::~MessagingService()
249 {
250 	fTerminating = true;
251 
252 	if (fLockSem >= 0)
253 		delete_sem(fLockSem);
254 	if (fCounterSem >= 0)
255 		delete_sem(fCounterSem);
256 
257 	if (fCommandProcessor >= 0) {
258 		int32 result;
259 		wait_for_thread(fCommandProcessor, &result);
260 	}
261 
262 	delete fCommandHandlers;
263 
264 	delete fFirstArea;
265 }
266 
267 // Init
268 status_t
Init()269 MessagingService::Init()
270 {
271 	// create the semaphores
272 	fLockSem = create_sem(0, "messaging lock");
273 	if (fLockSem < 0)
274 		return fLockSem;
275 
276 	fCounterSem = create_sem(0, "messaging counter");
277 	if (fCounterSem < 0)
278 		return fCounterSem;
279 
280 	// create the command handler map
281 	fCommandHandlers = new(nothrow) CommandHandlerMap;
282 	if (!fCommandHandlers)
283 		return B_NO_MEMORY;
284 
285 	// spawn the command processor
286 	fCommandProcessor = spawn_thread(MessagingService::_CommandProcessorEntry,
287 		"messaging command processor", B_DISPLAY_PRIORITY, this);
288 	if (fCommandProcessor < 0)
289 		return fCommandProcessor;
290 
291 	// register with the kernel
292 	area_id areaID = _kern_register_messaging_service(fLockSem, fCounterSem);
293 	if (areaID < 0)
294 		return areaID;
295 
296 	// create the area
297 	status_t error = MessagingArea::Create(areaID, fLockSem, fCounterSem,
298 		fFirstArea);
299 	if (error != B_OK) {
300 		_kern_unregister_messaging_service();
301 		return error;
302 	}
303 
304 	// resume the command processor
305 	resume_thread(fCommandProcessor);
306 
307 	// install the default send message command handler
308 	MessagingCommandHandler *handler = new(nothrow) DefaultSendCommandHandler;
309 	if (!handler)
310 		return B_NO_MEMORY;
311 	SetCommandHandler(MESSAGING_COMMAND_SEND_MESSAGE, handler);
312 
313 	return B_OK;
314 }
315 
316 // CreateDefault
317 status_t
CreateDefault()318 MessagingService::CreateDefault()
319 {
320 	if (sService)
321 		return B_OK;
322 
323 	// create the service
324 	MessagingService *service = new(nothrow) MessagingService;
325 	if (!service)
326 		return B_NO_MEMORY;
327 
328 	// init it
329 	status_t error = service->Init();
330 	if (error != B_OK) {
331 		delete service;
332 		return error;
333 	}
334 
335 	sService = service;
336 	return B_OK;
337 }
338 
339 // DeleteDefault
340 void
DeleteDefault()341 MessagingService::DeleteDefault()
342 {
343 	if (sService) {
344 		delete sService;
345 		sService = NULL;
346 	}
347 }
348 
349 // Default
350 MessagingService *
Default()351 MessagingService::Default()
352 {
353 	return sService;
354 }
355 
356 // SetCommandHandler
357 void
SetCommandHandler(uint32 command,MessagingCommandHandler * handler)358 MessagingService::SetCommandHandler(uint32 command,
359 	MessagingCommandHandler *handler)
360 {
361 	BAutolock _(fLock);
362 
363 	if (handler) {
364 		(*fCommandHandlers)[command] = handler;
365 	} else {
366 		// no handler: remove and existing entry
367 		CommandHandlerMap::iterator it = fCommandHandlers->find(command);
368 		if (it != fCommandHandlers->end())
369 			fCommandHandlers->erase(it);
370 	}
371 }
372 
373 // _GetCommandHandler
374 MessagingCommandHandler *
_GetCommandHandler(uint32 command) const375 MessagingService::_GetCommandHandler(uint32 command) const
376 {
377 	BAutolock _(fLock);
378 
379 	CommandHandlerMap::iterator it = fCommandHandlers->find(command);
380 	return (it != fCommandHandlers->end() ? it->second : NULL);
381 }
382 
383 // _CommandProcessorEntry
384 int32
_CommandProcessorEntry(void * data)385 MessagingService::_CommandProcessorEntry(void *data)
386 {
387 	return ((MessagingService*)data)->_CommandProcessor();
388 }
389 
390 // _CommandProcessor
391 int32
_CommandProcessor()392 MessagingService::_CommandProcessor()
393 {
394 	bool commandWaiting = false;
395 	while (!fTerminating) {
396 		// wait for the next command
397 		if (!commandWaiting) {
398 			status_t error = acquire_sem(fCounterSem);
399 			if (error != B_OK)
400 				continue;
401 		} else
402 			commandWaiting = false;
403 
404 		// get it from the first area
405 		MessagingArea *area = fFirstArea;
406 		area->Lock();
407 		while (area->CountCommands() > 0) {
408 			const messaging_command *command = area->PopCommand();
409 			if (!command) {
410 				// something's seriously wrong
411 				ERROR("MessagingService::_CommandProcessor(): area %p (%"
412 					B_PRId32 ") has command count %" B_PRId32 ", but doesn't "
413 					"return any more commands.", area, area->ID(),
414 					area->CountCommands());
415 				break;
416 			}
417 PRINT("MessagingService::_CommandProcessor(): got command %" B_PRIu32 "\n",
418 command->command);
419 
420 			// dispatch the command
421 			MessagingCommandHandler *handler
422 				= _GetCommandHandler(command->command);
423 			if (handler) {
424 				handler->HandleMessagingCommand(command->command, command->data,
425 					command->size - sizeof(messaging_command));
426 			} else {
427 				WARNING("MessagingService::_CommandProcessor(): No handler "
428 					"found for command %" B_PRIu32 "\n", command->command);
429 			}
430 		}
431 
432 		// there is a new area we don't know yet
433 		if (!area->NextArea() && area->NextKernelAreaID() >= 0) {
434 			// create it
435 			MessagingArea *nextArea;
436 			status_t error = MessagingArea::Create(area->NextKernelAreaID(),
437 				fLockSem, fCounterSem, nextArea);
438 			if (error == B_OK) {
439 				area->SetNextArea(nextArea);
440 				commandWaiting = true;
441 			} else {
442 				// Bad, but what can we do?
443 				ERROR("MessagingService::_CommandProcessor(): Failed to clone "
444 					"kernel area %" B_PRId32 ": %s\n", area->NextKernelAreaID(),
445 					strerror(error));
446 			}
447 
448 		}
449 
450 		// if the current area is empty and there is a next one, we discard the
451 		// current one
452 		if (area->NextArea() && area->CountCommands() == 0) {
453 			fFirstArea = area->NextArea();
454 			area->Discard();
455 			area->Unlock();
456 			delete area;
457 		} else {
458 			area->Unlock();
459 		}
460 	}
461 
462 	return 0;
463 }
464 
465