1 /* 2 * Copyright (c) 2002, 2003 Marcus Overhagen <Marcus@Overhagen.de> 3 * 4 * Permission is hereby granted, free of charge, to any person obtaining 5 * a copy of this software and associated documentation files or portions 6 * thereof (the "Software"), to deal in the Software without restriction, 7 * including without limitation the rights to use, copy, modify, merge, 8 * publish, distribute, sublicense, and/or sell copies of the Software, 9 * and to permit persons to whom the Software is furnished to do so, subject 10 * to the following conditions: 11 * 12 * * Redistributions of source code must retain the above copyright notice, 13 * this list of conditions and the following disclaimer. 14 * 15 * * Redistributions in binary form must reproduce the above copyright notice 16 * in the binary, as well as this list of conditions and the following 17 * disclaimer in the documentation and/or other materials provided with 18 * the distribution. 19 * 20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 21 * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 26 * THE SOFTWARE. 27 * 28 */ 29 30 #include "NotificationManager.h" 31 32 #include <Autolock.h> 33 #include <Locker.h> 34 #include <Message.h> 35 #include <OS.h> 36 37 #include "DataExchange.h" 38 #include "debug.h" 39 #include "media_server.h" 40 #include "NodeManager.h" 41 #include "Notifications.h" 42 43 44 #define NOTIFICATION_THREAD_PRIORITY 19 45 #define TIMEOUT 100000 46 47 48 NotificationManager::NotificationManager() 49 : 50 fNotificationThreadId(-1), 51 fLocker("notification manager locker") 52 { 53 fNotificationThreadId = spawn_thread(NotificationManager::worker_thread, 54 "notification broadcast", NOTIFICATION_THREAD_PRIORITY, this); 55 resume_thread(fNotificationThreadId); 56 } 57 58 59 NotificationManager::~NotificationManager() 60 { 61 // properly terminate the queue and wait until the worker thread has finished 62 fNotificationQueue.Terminate(); 63 64 status_t dummy; 65 wait_for_thread(fNotificationThreadId, &dummy); 66 } 67 68 69 void 70 NotificationManager::EnqueueMessage(BMessage *msg) 71 { 72 // queue a copy of the message to be processed later 73 fNotificationQueue.AddItem(new BMessage(*msg)); 74 } 75 76 77 void 78 NotificationManager::RequestNotifications(BMessage *msg) 79 { 80 BMessenger messenger; 81 const media_node *node; 82 ssize_t nodeSize; 83 team_id team; 84 int32 what; 85 86 msg->FindMessenger(NOTIFICATION_PARAM_MESSENGER, &messenger); 87 msg->FindInt32(NOTIFICATION_PARAM_TEAM, &team); 88 msg->FindInt32(NOTIFICATION_PARAM_WHAT, &what); 89 msg->FindData("node", B_RAW_TYPE, reinterpret_cast<const void **>(&node), 90 &nodeSize); 91 ASSERT(nodeSize == sizeof(media_node)); 92 93 Notification n; 94 n.messenger = messenger; 95 n.node = *node; 96 n.what = what; 97 n.team = team; 98 99 TRACE("NotificationManager::RequestNotifications node %ld, team %ld, " 100 "what %#lx\n",node->node, team, what); 101 102 fLocker.Lock(); 103 fNotificationList.Insert(n); 104 fLocker.Unlock(); 105 106 // send the initial B_MEDIA_NODE_CREATED containing all existing live nodes 107 BMessage initmsg(B_MEDIA_NODE_CREATED); 108 if (gNodeManager->GetLiveNodes(&initmsg) == B_OK) 109 messenger.SendMessage(&initmsg, static_cast<BHandler *>(NULL), TIMEOUT); 110 } 111 112 113 void 114 NotificationManager::CancelNotifications(BMessage *msg) 115 { 116 BMessenger messenger; 117 const media_node *node; 118 ssize_t nodeSize; 119 team_id team; 120 int32 what; 121 122 msg->FindMessenger(NOTIFICATION_PARAM_MESSENGER, &messenger); 123 msg->FindInt32(NOTIFICATION_PARAM_TEAM, &team); 124 msg->FindInt32(NOTIFICATION_PARAM_WHAT, &what); 125 msg->FindData("node", B_RAW_TYPE, reinterpret_cast<const void **>(&node), 126 &nodeSize); 127 ASSERT(nodeSize == sizeof(media_node)); 128 129 TRACE("NotificationManager::CancelNotifications node %ld, team %ld, what " 130 "%#lx\n", node->node, team, what); 131 132 /* if what == B_MEDIA_WILDCARD && node == media_node::null 133 * => delete all notifications for the matching team & messenger 134 * else if what != B_MEDIA_WILDCARD && node == media_node::null 135 * => delete all notifications for the matching what & team & messenger 136 * else if what == B_MEDIA_WILDCARD && node != media_node::null 137 * => delete all notifications for the matching team & messenger & node 138 * else if what != B_MEDIA_WILDCARD && node != media_node::null 139 * => delete all notifications for the matching what & team & messenger 140 * & node 141 */ 142 143 BAutolock _(fLocker); 144 145 Notification *n; 146 for (fNotificationList.Rewind(); fNotificationList.GetNext(&n); ) { 147 bool remove; 148 if (what == B_MEDIA_WILDCARD && *node == media_node::null 149 && team == n->team && messenger == n->messenger) 150 remove = true; 151 else if (what != B_MEDIA_WILDCARD && *node == media_node::null 152 && what == n->what && team == n->team && messenger == n->messenger) 153 remove = true; 154 else if (what == B_MEDIA_WILDCARD && *node != media_node::null 155 && team == n->team && messenger == n->messenger && n->node == *node) 156 remove = true; 157 else if (what != B_MEDIA_WILDCARD && *node != media_node::null 158 && what == n->what && team == n->team && messenger == n->messenger 159 && n->node == *node) 160 remove = true; 161 else 162 remove = false; 163 164 if (remove) { 165 if (!fNotificationList.RemoveCurrent()) { 166 ASSERT(false); 167 } 168 } 169 } 170 } 171 172 173 void 174 NotificationManager::SendNotifications(BMessage *msg) 175 { 176 const media_source *source; 177 const media_destination *destination; 178 const media_node *node; 179 ssize_t size; 180 int32 what; 181 182 msg->FindInt32(NOTIFICATION_PARAM_WHAT, &what); 183 msg->RemoveName(NOTIFICATION_PARAM_WHAT); 184 msg->what = what; 185 186 TRACE("NotificationManager::SendNotifications what %#lx\n", what); 187 188 BAutolock _(fLocker); 189 190 Notification *n; 191 for (fNotificationList.Rewind(); fNotificationList.GetNext(&n); ) { 192 if (n->what != B_MEDIA_WILDCARD && n->what != what) 193 continue; 194 195 switch (what) { 196 case B_MEDIA_NODE_CREATED: 197 case B_MEDIA_NODE_DELETED: 198 case B_MEDIA_CONNECTION_MADE: 199 case B_MEDIA_CONNECTION_BROKEN: 200 case B_MEDIA_BUFFER_CREATED: 201 case B_MEDIA_BUFFER_DELETED: 202 case B_MEDIA_TRANSPORT_STATE: 203 case B_MEDIA_DEFAULT_CHANGED: 204 case B_MEDIA_FLAVORS_CHANGED: 205 if (n->node != media_node::null) 206 continue; 207 break; 208 209 case B_MEDIA_NEW_PARAMETER_VALUE: 210 case B_MEDIA_PARAMETER_CHANGED: 211 case B_MEDIA_NODE_STOPPED: 212 case B_MEDIA_WEB_CHANGED: 213 msg->FindData("node", B_RAW_TYPE, 214 reinterpret_cast<const void **>(&node), &size); 215 ASSERT(size == sizeof(media_node)); 216 if (n->node != *node) 217 continue; 218 break; 219 220 case B_MEDIA_FORMAT_CHANGED: 221 msg->FindData("source", B_RAW_TYPE, 222 reinterpret_cast<const void **>(&source), &size); 223 ASSERT(size == sizeof(media_source)); 224 msg->FindData("destination", B_RAW_TYPE, 225 reinterpret_cast<const void **>(&destination), &size); 226 ASSERT(size == sizeof(media_destination)); 227 228 if (n->node.port != source->port 229 && n->node.port != destination->port) 230 continue; 231 break; 232 } 233 234 TRACE("NotificationManager::SendNotifications sending\n"); 235 n->messenger.SendMessage(msg, static_cast<BHandler *>(NULL), TIMEOUT); 236 } 237 } 238 239 240 void 241 NotificationManager::CleanupTeam(team_id team) 242 { 243 TRACE("NotificationManager::CleanupTeam team %ld\n", team); 244 BAutolock _(fLocker); 245 246 int debugCount = 0; 247 Notification *n; 248 for (fNotificationList.Rewind(); fNotificationList.GetNext(&n); ) { 249 if (n->team == team) { 250 if (fNotificationList.RemoveCurrent()) { 251 debugCount++; 252 } else { 253 ASSERT(false); 254 } 255 } 256 } 257 258 if (debugCount != 0) { 259 ERROR("NotificationManager::CleanupTeam: removed %d notifications for " 260 "team %" B_PRId32 "\n", debugCount, team); 261 } 262 } 263 264 265 void 266 NotificationManager::WorkerThread() 267 { 268 while (BMessage *msg 269 = static_cast<BMessage *>(fNotificationQueue.RemoveItem())) { 270 switch (msg->what) { 271 case MEDIA_SERVER_REQUEST_NOTIFICATIONS: 272 RequestNotifications(msg); 273 break; 274 case MEDIA_SERVER_CANCEL_NOTIFICATIONS: 275 CancelNotifications(msg); 276 break; 277 case MEDIA_SERVER_SEND_NOTIFICATIONS: 278 SendNotifications(msg); 279 break; 280 default: 281 debugger("bad notification message\n"); 282 } 283 delete msg; 284 } 285 } 286 287 288 int32 289 NotificationManager::worker_thread(void *arg) 290 { 291 static_cast<NotificationManager *>(arg)->WorkerThread(); 292 return 0; 293 } 294 295 296 void 297 NotificationManager::Dump() 298 { 299 BAutolock lock(fLocker); 300 301 printf("\n"); 302 printf("NotificationManager: list of subscribers follows:\n"); 303 Notification *n; 304 for (fNotificationList.Rewind(); fNotificationList.GetNext(&n); ) { 305 printf(" team %" B_PRId32 ", what %#08" B_PRIx32 ", node-id %" B_PRId32 306 ", node-port %" B_PRId32 ", messenger %svalid\n", n->team, n->what, 307 n->node.node, n->node.port, n->messenger.IsValid() ? "" : "NOT "); 308 } 309 printf("NotificationManager: list end\n"); 310 } 311