1 /* 2 * Copyright (c) 2002, 2003 Marcus Overhagen <Marcus@Overhagen.de> 3 * 4 * Permission is hereby granted, free of charge, to any person obtaining 5 * a copy of this software and associated documentation files or portions 6 * thereof (the "Software"), to deal in the Software without restriction, 7 * including without limitation the rights to use, copy, modify, merge, 8 * publish, distribute, sublicense, and/or sell copies of the Software, 9 * and to permit persons to whom the Software is furnished to do so, subject 10 * to the following conditions: 11 * 12 * * Redistributions of source code must retain the above copyright notice, 13 * this list of conditions and the following disclaimer. 14 * 15 * * Redistributions in binary form must reproduce the above copyright notice 16 * in the binary, as well as this list of conditions and the following 17 * disclaimer in the documentation and/or other materials provided with 18 * the distribution. 19 * 20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 21 * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 26 * THE SOFTWARE. 27 * 28 */ 29 30 #include <OS.h> 31 #include <Locker.h> 32 #include <Autolock.h> 33 #include <Message.h> 34 #include <Messenger.h> 35 #include <MediaNode.h> 36 #include "debug.h" 37 #include "NodeManager.h" 38 #include "DataExchange.h" 39 #include "Notifications.h" 40 #include "NotificationManager.h" 41 #include "media_server.h" 42 #include "Queue.h" 43 44 45 #define NOTIFICATION_THREAD_PRIORITY 19 46 #define TIMEOUT 100000 47 48 NotificationManager::NotificationManager() 49 : fNotificationQueue(new Queue), 50 fNotificationThreadId(-1), 51 fLocker(new BLocker("notification manager locker")), 52 fNotificationList(new List<Notification>) 53 { 54 fNotificationThreadId = spawn_thread(NotificationManager::worker_thread, "notification broadcast", NOTIFICATION_THREAD_PRIORITY, this); 55 resume_thread(fNotificationThreadId); 56 } 57 58 NotificationManager::~NotificationManager() 59 { 60 // properly terminate the queue and wait until the worker thread has finished 61 status_t dummy; 62 fNotificationQueue->Terminate(); 63 wait_for_thread(fNotificationThreadId, &dummy); 64 delete fNotificationQueue; 65 delete fLocker; 66 delete fNotificationList; 67 } 68 69 void 70 NotificationManager::EnqueueMessage(BMessage *msg) 71 { 72 // queue a copy of the message to be processed later 73 fNotificationQueue->AddItem(new BMessage(*msg)); 74 } 75 76 void 77 NotificationManager::RequestNotifications(BMessage *msg) 78 { 79 BMessenger messenger; 80 const media_node *node; 81 ssize_t nodesize; 82 team_id team; 83 int32 what; 84 85 msg->FindMessenger(NOTIFICATION_PARAM_MESSENGER, &messenger); 86 msg->FindInt32(NOTIFICATION_PARAM_TEAM, &team); 87 msg->FindInt32(NOTIFICATION_PARAM_WHAT, &what); 88 msg->FindData("node", B_RAW_TYPE, reinterpret_cast<const void **>(&node), &nodesize); 89 ASSERT(nodesize == sizeof(media_node)); 90 91 Notification n; 92 n.messenger = messenger; 93 n.node = *node; 94 n.what = what; 95 n.team = team; 96 97 TRACE("NotificationManager::RequestNotifications node %ld, team %ld, what %#lx\n",node->node, team, what); 98 99 fLocker->Lock(); 100 fNotificationList->Insert(n); 101 fLocker->Unlock(); 102 103 // send the initial B_MEDIA_NODE_CREATED containing all existing live nodes 104 BMessage initmsg(B_MEDIA_NODE_CREATED); 105 if (B_OK == gNodeManager->GetLiveNodes(&initmsg)) { 106 messenger.SendMessage(&initmsg, static_cast<BHandler *>(NULL), TIMEOUT); 107 } 108 } 109 110 void 111 NotificationManager::CancelNotifications(BMessage *msg) 112 { 113 BMessenger messenger; 114 const media_node *node; 115 ssize_t nodesize; 116 team_id team; 117 int32 what; 118 119 msg->FindMessenger(NOTIFICATION_PARAM_MESSENGER, &messenger); 120 msg->FindInt32(NOTIFICATION_PARAM_TEAM, &team); 121 msg->FindInt32(NOTIFICATION_PARAM_WHAT, &what); 122 msg->FindData("node", B_RAW_TYPE, reinterpret_cast<const void **>(&node), &nodesize); 123 ASSERT(nodesize == sizeof(media_node)); 124 125 TRACE("NotificationManager::CancelNotifications node %ld, team %ld, what %#lx\n",node->node, team, what); 126 127 /* if what == B_MEDIA_WILDCARD && node == media_node::null 128 * => delete all notifications for the matching team & messenger 129 * else if what != B_MEDIA_WILDCARD && node == media_node::null 130 * => delete all notifications for the matching what & team & messenger 131 * else if what == B_MEDIA_WILDCARD && node != media_node::null 132 * => delete all notifications for the matching team & messenger & node 133 * else if what != B_MEDIA_WILDCARD && node != media_node::null 134 * => delete all notifications for the matching what & team & messenger & node 135 */ 136 137 fLocker->Lock(); 138 139 Notification *n; 140 for (fNotificationList->Rewind(); fNotificationList->GetNext(&n); ) { 141 bool remove; 142 if (what == B_MEDIA_WILDCARD && *node == media_node::null && team == n->team && messenger == n->messenger) 143 remove = true; 144 else if (what != B_MEDIA_WILDCARD && *node == media_node::null && what == n->what && team == n->team && messenger == n->messenger) 145 remove = true; 146 else if (what == B_MEDIA_WILDCARD && *node != media_node::null && team == n->team && messenger == n->messenger && n->node == *node) 147 remove = true; 148 else if (what != B_MEDIA_WILDCARD && *node != media_node::null && what == n->what && team == n->team && messenger == n->messenger && n->node == *node) 149 remove = true; 150 else 151 remove = false; 152 if (remove) { 153 if (fNotificationList->RemoveCurrent()) { 154 } else { 155 ASSERT(false); 156 } 157 } 158 } 159 160 fLocker->Unlock(); 161 } 162 163 void 164 NotificationManager::SendNotifications(BMessage *msg) 165 { 166 const media_source *source; 167 const media_destination *destination; 168 const media_node *node; 169 ssize_t size; 170 int32 what; 171 172 msg->FindInt32(NOTIFICATION_PARAM_WHAT, &what); 173 msg->RemoveName(NOTIFICATION_PARAM_WHAT); 174 msg->what = what; 175 176 TRACE("NotificationManager::SendNotifications what %#lx\n", what); 177 178 fLocker->Lock(); 179 180 Notification *n; 181 for (fNotificationList->Rewind(); fNotificationList->GetNext(&n); ) { 182 if (n->what != B_MEDIA_WILDCARD && n->what != what) 183 continue; 184 185 switch (what) { 186 case B_MEDIA_NODE_CREATED: 187 case B_MEDIA_NODE_DELETED: 188 case B_MEDIA_CONNECTION_MADE: 189 case B_MEDIA_CONNECTION_BROKEN: 190 case B_MEDIA_BUFFER_CREATED: 191 case B_MEDIA_BUFFER_DELETED: 192 case B_MEDIA_TRANSPORT_STATE: 193 case B_MEDIA_DEFAULT_CHANGED: 194 case B_MEDIA_FLAVORS_CHANGED: 195 if (n->node != media_node::null) 196 continue; 197 break; 198 199 case B_MEDIA_NEW_PARAMETER_VALUE: 200 case B_MEDIA_PARAMETER_CHANGED: 201 case B_MEDIA_NODE_STOPPED: 202 case B_MEDIA_WEB_CHANGED: 203 msg->FindData("node", B_RAW_TYPE, reinterpret_cast<const void **>(&node), &size); 204 ASSERT(size == sizeof(media_node)); 205 if (n->node != *node) 206 continue; 207 break; 208 209 case B_MEDIA_FORMAT_CHANGED: 210 msg->FindData("source", B_RAW_TYPE, reinterpret_cast<const void **>(&source), &size); 211 ASSERT(size == sizeof(media_source)); 212 msg->FindData("destination", B_RAW_TYPE, reinterpret_cast<const void **>(&destination), &size); 213 ASSERT(size == sizeof(media_destination)); 214 if (n->node.port != source->port && n->node.port != destination->port) 215 continue; 216 break; 217 } 218 219 TRACE("NotificationManager::SendNotifications sending\n"); 220 n->messenger.SendMessage(msg, static_cast<BHandler *>(NULL), TIMEOUT); 221 } 222 223 fLocker->Unlock(); 224 } 225 226 void 227 NotificationManager::CleanupTeam(team_id team) 228 { 229 TRACE("NotificationManager::CleanupTeam team %ld\n", team); 230 fLocker->Lock(); 231 232 int debugcount = 0; 233 Notification *n; 234 for (fNotificationList->Rewind(); fNotificationList->GetNext(&n); ) { 235 if (n->team == team) { 236 if (fNotificationList->RemoveCurrent()) { 237 debugcount++; 238 } else { 239 ASSERT(false); 240 } 241 } 242 } 243 244 if (debugcount != 0) 245 ERROR("NotificationManager::CleanupTeam: removed %d notifications for team %ld\n", debugcount, team); 246 247 fLocker->Unlock(); 248 } 249 250 void 251 NotificationManager::WorkerThread() 252 { 253 BMessage *msg; 254 while (NULL != (msg = static_cast<BMessage *>(fNotificationQueue->RemoveItem()))) { 255 switch (msg->what) { 256 case MEDIA_SERVER_REQUEST_NOTIFICATIONS: 257 RequestNotifications(msg); 258 break; 259 case MEDIA_SERVER_CANCEL_NOTIFICATIONS: 260 CancelNotifications(msg); 261 break; 262 case MEDIA_SERVER_SEND_NOTIFICATIONS: 263 SendNotifications(msg); 264 break; 265 default: 266 debugger("bad notification message\n"); 267 } 268 delete msg; 269 } 270 } 271 272 int32 273 NotificationManager::worker_thread(void *arg) 274 { 275 static_cast<NotificationManager *>(arg)->WorkerThread(); 276 return 0; 277 } 278 279 void 280 NotificationManager::Dump() 281 { 282 BAutolock lock(fLocker); 283 printf("\n"); 284 printf("NotificationManager: list of subscribers follows:\n"); 285 Notification *n; 286 for (fNotificationList->Rewind(); fNotificationList->GetNext(&n); ) { 287 printf(" team %ld, what %#08lx, node-id %ld, node-port %ld, messenger %svalid\n", 288 n->team, n->what, n->node.node, n->node.port, n->messenger.IsValid() ? "" : "NOT "); 289 } 290 printf("NotificationManager: list end\n"); 291 } 292