1 /* 2 * Copyright 2018-2022, Andrew Lindesay <apl@lindesay.co.nz>. 3 * All rights reserved. Distributed under the terms of the MIT License. 4 */ 5 6 7 #include "ProcessCoordinator.h" 8 9 #include <AutoLocker.h> 10 #include <Catalog.h> 11 #include <StringFormat.h> 12 #include <Uuid.h> 13 14 #include "Logger.h" 15 16 17 #undef B_TRANSLATION_CONTEXT 18 #define B_TRANSLATION_CONTEXT "ProcessCoordinator" 19 20 #define LOCK_TIMEOUT_MICROS (1000 * 1000) 21 22 // These are keys that are used to store the ProcessCoordinatorState data into 23 // a BMessage instance. 24 25 #define KEY_PROCESS_COORDINATOR_IDENTIFIER "processCoordinatorIdentifier" 26 #define KEY_PROGRESS "progress" 27 #define KEY_MESSAGE "message" 28 #define KEY_IS_RUNNING "isRunning" 29 #define KEY_ERROR_STATUS "errorStatus" 30 31 32 // #pragma mark - ProcessCoordinatorState implementation 33 34 35 ProcessCoordinatorState::ProcessCoordinatorState(BMessage* from) 36 { 37 if (from->FindString(KEY_PROCESS_COORDINATOR_IDENTIFIER, 38 &fProcessCoordinatorIdentifier) != B_OK) { 39 HDFATAL("unable to find the key [%s]", 40 KEY_PROCESS_COORDINATOR_IDENTIFIER); 41 } 42 43 if (from->FindFloat(KEY_PROGRESS, &fProgress) != B_OK) { 44 HDFATAL("unable to find the key [%s]", KEY_PROGRESS); 45 } 46 47 if (from->FindString(KEY_MESSAGE, &fMessage) != B_OK) { 48 HDFATAL("unable to find the key [%s]", KEY_MESSAGE); 49 } 50 51 if (from->FindBool(KEY_IS_RUNNING, &fIsRunning) != B_OK) { 52 HDFATAL("unable to find the key [%s]", KEY_IS_RUNNING); 53 } 54 55 int64 errorStatusNumeric; 56 if (from->FindInt64(KEY_ERROR_STATUS, &errorStatusNumeric) != B_OK) { 57 HDFATAL("unable to find the key [%s]", KEY_ERROR_STATUS); 58 } 59 fErrorStatus = static_cast<status_t>(errorStatusNumeric); 60 } 61 62 63 ProcessCoordinatorState::ProcessCoordinatorState( 64 const ProcessCoordinator* processCoordinator, float progress, 65 const BString& message, bool isRunning, status_t errorStatus) 66 : 67 fProcessCoordinatorIdentifier(processCoordinator->Identifier()), 68 fProgress(progress), 69 fMessage(message), 70 fIsRunning(isRunning), 71 fErrorStatus(errorStatus) 72 { 73 } 74 75 76 ProcessCoordinatorState::~ProcessCoordinatorState() 77 { 78 } 79 80 81 const BString 82 ProcessCoordinatorState::ProcessCoordinatorIdentifier() const 83 { 84 return fProcessCoordinatorIdentifier; 85 } 86 87 88 float 89 ProcessCoordinatorState::Progress() const 90 { 91 return fProgress; 92 } 93 94 95 BString 96 ProcessCoordinatorState::Message() const 97 { 98 return fMessage; 99 } 100 101 102 bool 103 ProcessCoordinatorState::IsRunning() const 104 { 105 return fIsRunning; 106 } 107 108 109 status_t 110 ProcessCoordinatorState::ErrorStatus() const 111 { 112 return fErrorStatus; 113 } 114 115 116 status_t 117 ProcessCoordinatorState::Archive(BMessage* into, bool deep) const 118 { 119 status_t result = B_OK; 120 if (result == B_OK) { 121 result = into->AddString(KEY_PROCESS_COORDINATOR_IDENTIFIER, 122 fProcessCoordinatorIdentifier); 123 } 124 if (result == B_OK) 125 result = into->AddFloat(KEY_PROGRESS, fProgress); 126 if (result == B_OK) 127 result = into->AddString(KEY_MESSAGE, fMessage); 128 if (result == B_OK) 129 result = into->AddBool(KEY_IS_RUNNING, fIsRunning); 130 if (result == B_OK) 131 result = into->AddInt64(KEY_ERROR_STATUS, static_cast<int64>(fErrorStatus)); 132 return result; 133 } 134 135 136 // #pragma mark - ProcessCoordinator implementation 137 138 139 ProcessCoordinator::ProcessCoordinator(const char* name, BMessage* message) 140 : 141 fName(name), 142 fLock(), 143 fCoordinateAndCallListenerRerun(false), 144 fCoordinateAndCallListenerRerunLock(), 145 fListener(NULL), 146 fMessage(message), 147 fWasStopped(false), 148 fIdentifier(BUuid().ToString()) 149 { 150 } 151 152 153 ProcessCoordinator::~ProcessCoordinator() 154 { 155 AutoLocker<BLocker> locker(&fLock); 156 for (int32 i = 0; i < fNodes.CountItems(); i++) { 157 AbstractProcessNode* node = fNodes.ItemAt(i); 158 node->Process()->SetListener(NULL); 159 delete node; 160 } 161 delete fMessage; 162 } 163 164 const BString& 165 ProcessCoordinator::Identifier() const 166 { 167 return fIdentifier; 168 } 169 170 171 void 172 ProcessCoordinator::SetListener(ProcessCoordinatorListener* listener) 173 { 174 fListener = listener; 175 } 176 177 178 void 179 ProcessCoordinator::AddNode(AbstractProcessNode* node) 180 { 181 AutoLocker<BLocker> locker(&fLock); 182 fNodes.AddItem(node); 183 node->SetListener(this); 184 node->Process()->SetListener(this); 185 } 186 187 188 void 189 ProcessCoordinator::ProcessChanged() 190 { 191 _CoordinateAndCallListener(); 192 } 193 194 195 bool 196 ProcessCoordinator::IsRunning() 197 { 198 AutoLocker<BLocker> locker(&fLock); 199 for (int32 i = 0; i < fNodes.CountItems(); i++) { 200 AbstractProcessNode* node = fNodes.ItemAt(i); 201 if (node->IsRunning()) 202 return true; 203 } 204 205 return false; 206 } 207 208 209 void 210 ProcessCoordinator::Start() 211 { 212 _CoordinateAndCallListener(); 213 } 214 215 216 void 217 ProcessCoordinator::RequestStop() 218 { 219 AutoLocker<BLocker> locker(&fLock); 220 if (!fWasStopped) { 221 fWasStopped = true; 222 HDINFO("[Coordinator] will stop process coordinator"); 223 for (int32 i = 0; i < fNodes.CountItems(); i++) { 224 AbstractProcessNode* node = fNodes.ItemAt(i); 225 HDINFO("[Coordinator] stopping process [%s]", 226 node->Process()->Name()); 227 node->RequestStop(); 228 } 229 } 230 } 231 232 233 status_t 234 ProcessCoordinator::ErrorStatus() 235 { 236 AutoLocker<BLocker> locker(&fLock); 237 for (int32 i = 0; i < fNodes.CountItems(); i++) { 238 status_t result = fNodes.ItemAt(i)->Process()->ErrorStatus(); 239 240 if (result != B_OK) 241 return result; 242 } 243 244 return B_OK; 245 } 246 247 248 float 249 ProcessCoordinator::Progress() 250 { 251 AutoLocker<BLocker> locker(&fLock); 252 float result = 0.0f; 253 254 if (!fWasStopped) { 255 int32 count = fNodes.CountItems(); 256 257 // if there is only one then return it's value directly because this 258 // allows for the indeterminate state of -1. 259 260 if (count == 1) 261 result = fNodes.ItemAt(0)->Process()->Progress(); 262 else { 263 float progressPerNode = 1.0f / ((float) count); 264 265 for (int32 i = count - 1; i >= 0; i--) { 266 AbstractProcess* process = fNodes.ItemAt(i)->Process(); 267 268 switch(process->ProcessState()) { 269 case PROCESS_INITIAL: 270 break; 271 case PROCESS_RUNNING: 272 result += (progressPerNode * fmaxf( 273 0.0f, fminf(1.0, process->Progress()))); 274 break; 275 case PROCESS_COMPLETE: 276 result += progressPerNode; 277 break; 278 } 279 } 280 } 281 } 282 return result; 283 } 284 285 286 const BString& 287 ProcessCoordinator::Name() const 288 { 289 return fName; 290 } 291 292 293 BMessage* 294 ProcessCoordinator::Message() const 295 { 296 return fMessage; 297 } 298 299 300 BString 301 ProcessCoordinator::_CreateStatusMessage() 302 { 303 // work through the nodes and take a description from the first one. If 304 // there are others present then use a 'plus X others' suffix. Go backwards 305 // through the processes so that the most recent activity is shown first. 306 307 BString firstProcessDescription; 308 uint32 additionalRunningProcesses = 0; 309 310 for (int32 i = fNodes.CountItems() - 1; i >= 0; i--) { 311 AbstractProcess* process = fNodes.ItemAt(i)->Process(); 312 if (process->ProcessState() == PROCESS_RUNNING) { 313 if (firstProcessDescription.IsEmpty()) { 314 if (strlen(process->Description()) != 0) 315 firstProcessDescription = process->Description(); 316 else 317 additionalRunningProcesses++; 318 } 319 else 320 additionalRunningProcesses++; 321 } 322 } 323 324 if (firstProcessDescription.IsEmpty()) 325 return "???"; 326 327 if (additionalRunningProcesses == 0) 328 return firstProcessDescription; 329 330 static BStringFormat format(B_TRANSLATE( 331 "%FIRST_PROCESS_DESCRIPTION% +" 332 "{0, plural, one{# process} other{# processes}}")); 333 BString result; 334 format.Format(result, additionalRunningProcesses); 335 result.ReplaceAll("%FIRST_PROCESS_DESCRIPTION%", firstProcessDescription); 336 337 return result; 338 } 339 340 341 /*! This method assumes that a lock is held on the coordinator. */ 342 343 ProcessCoordinatorState 344 ProcessCoordinator::_CreateStatus() 345 { 346 return ProcessCoordinatorState( 347 this, Progress(), _CreateStatusMessage(), IsRunning(), ErrorStatus()); 348 } 349 350 351 /*! This will try to obtain the lock and if it cannot obtain the lock then 352 it will flag that when the coordinator has finished its current 353 coordination, it should initiate another coordination. 354 */ 355 void 356 ProcessCoordinator::_CoordinateAndCallListener() 357 { 358 if (fLock.LockWithTimeout(LOCK_TIMEOUT_MICROS) != B_OK) { 359 HDDEBUG("[Coordinator] would coordinate nodes, but coordination is " 360 "in progress - will defer"); 361 AutoLocker<BLocker> locker(&fCoordinateAndCallListenerRerunLock); 362 fCoordinateAndCallListenerRerun = true; 363 return; 364 } 365 366 ProcessCoordinatorState state = _Coordinate(); 367 368 if (fListener != NULL) 369 fListener->CoordinatorChanged(state); 370 371 fLock.Unlock(); 372 373 bool coordinateAndCallListenerRerun = false; 374 375 { 376 AutoLocker<BLocker> locker(&fCoordinateAndCallListenerRerunLock); 377 coordinateAndCallListenerRerun = fCoordinateAndCallListenerRerun; 378 fCoordinateAndCallListenerRerun = false; 379 } 380 381 if (coordinateAndCallListenerRerun) { 382 HDDEBUG("[Coordinator] will run deferred coordination"); 383 _CoordinateAndCallListener(); 384 } 385 } 386 387 388 ProcessCoordinatorState 389 ProcessCoordinator::_Coordinate() 390 { 391 HDTRACE("[Coordinator] will coordinate nodes"); 392 AutoLocker<BLocker> locker(&fLock); 393 _StopSuccessorNodesToErroredOrStoppedNodes(); 394 395 // go through the nodes and find those that are still to be run and 396 // for which the preconditions are met to start. 397 for (int32 i = 0; i < fNodes.CountItems(); i++) { 398 AbstractProcessNode* node = fNodes.ItemAt(i); 399 400 if (node->Process()->ProcessState() == PROCESS_INITIAL) { 401 if (node->AllPredecessorsComplete()) 402 node->Start(); 403 else { 404 HDTRACE("[Coordinator] all predecessors not complete -> " 405 "[%s] not started", node->Process()->Name()); 406 } 407 } else { 408 HDTRACE("[Coordinator] process [%s] running or complete", 409 node->Process()->Name()); 410 } 411 } 412 413 return _CreateStatus(); 414 } 415 416 417 /*! This method assumes that a lock is held on the coordinator. */ 418 419 void 420 ProcessCoordinator::_StopSuccessorNodesToErroredOrStoppedNodes() 421 { 422 for (int32 i = 0; i < fNodes.CountItems(); i++) { 423 AbstractProcessNode* node = fNodes.ItemAt(i); 424 AbstractProcess* process = node->Process(); 425 426 if (process->WasStopped() || process->ErrorStatus() != B_OK) 427 _StopSuccessorNodes(node); 428 } 429 } 430 431 432 /*! This method assumes that a lock is held on the coordinator. */ 433 434 void 435 ProcessCoordinator::_StopSuccessorNodes(AbstractProcessNode* predecessorNode) 436 { 437 for (int32 i = 0; i < predecessorNode->CountSuccessors(); i++) { 438 AbstractProcessNode* node = predecessorNode->SuccessorAt(i); 439 AbstractProcess* process = node->Process(); 440 441 if (process->ProcessState() == PROCESS_INITIAL) { 442 HDDEBUG("[Coordinator] [%s] (failed) --> [%s] (stopping)", 443 predecessorNode->Process()->Name(), process->Name()); 444 node->RequestStop(); 445 _StopSuccessorNodes(node); 446 } 447 } 448 } 449 450 451 int32 452 ProcessCoordinator::_CountNodesCompleted() 453 { 454 int32 nodesCompleted = 0; 455 for (int32 i = 0; i < fNodes.CountItems(); i++) { 456 AbstractProcess *process = fNodes.ItemAt(i)->Process(); 457 if (process->ProcessState() == PROCESS_COMPLETE) 458 nodesCompleted++; 459 } 460 return nodesCompleted; 461 } 462 463 464 BString 465 ProcessCoordinator::LogReport() 466 { 467 BString result; 468 AutoLocker<BLocker> locker(&fLock); 469 470 for (int32 i = 0; i < fNodes.CountItems(); i++) { 471 if (0 != result.Length()) 472 result.Append("\n"); 473 AbstractProcessNode* node = fNodes.ItemAt(i); 474 result.Append(node->LogReport()); 475 } 476 477 return result; 478 } 479