1 /* 2 * Copyright 2018-2020, Andrew Lindesay <apl@lindesay.co.nz>. 3 * All rights reserved. Distributed under the terms of the MIT License. 4 */ 5 6 7 #include "ProcessCoordinator.h" 8 9 #include <AutoLocker.h> 10 #include <Catalog.h> 11 #include <StringFormat.h> 12 13 #include "Logger.h" 14 15 16 #undef B_TRANSLATION_CONTEXT 17 #define B_TRANSLATION_CONTEXT "ProcessCoordinator" 18 19 20 // #pragma mark - ProcessCoordinatorState implementation 21 22 23 ProcessCoordinatorState::ProcessCoordinatorState( 24 const ProcessCoordinator* processCoordinator, float progress, 25 const BString& message, bool isRunning, status_t errorStatus) 26 : 27 fProcessCoordinator(processCoordinator), 28 fProgress(progress), 29 fMessage(message), 30 fIsRunning(isRunning), 31 fErrorStatus(errorStatus) 32 { 33 } 34 35 36 ProcessCoordinatorState::~ProcessCoordinatorState() 37 { 38 } 39 40 41 const ProcessCoordinator* 42 ProcessCoordinatorState::Coordinator() const 43 { 44 return fProcessCoordinator; 45 } 46 47 48 float 49 ProcessCoordinatorState::Progress() const 50 { 51 return fProgress; 52 } 53 54 55 BString 56 ProcessCoordinatorState::Message() const 57 { 58 return fMessage; 59 } 60 61 62 bool 63 ProcessCoordinatorState::IsRunning() const 64 { 65 return fIsRunning; 66 } 67 68 69 status_t 70 ProcessCoordinatorState::ErrorStatus() const 71 { 72 return fErrorStatus; 73 } 74 75 76 // #pragma mark - ProcessCoordinator implementation 77 78 79 ProcessCoordinator::ProcessCoordinator(const char* name, BMessage* message) 80 : 81 fName(name), 82 fListener(NULL), 83 fMessage(message), 84 fWasStopped(false) 85 { 86 } 87 88 89 ProcessCoordinator::~ProcessCoordinator() 90 { 91 AutoLocker<BLocker> locker(&fLock); 92 for (int32 i = 0; i < fNodes.CountItems(); i++) { 93 AbstractProcessNode* node = fNodes.ItemAt(i); 94 node->Process()->SetListener(NULL); 95 delete node; 96 } 97 delete fMessage; 98 } 99 100 101 void 102 ProcessCoordinator::SetListener(ProcessCoordinatorListener *listener) 103 { 104 fListener = listener; 105 } 106 107 108 void 109 ProcessCoordinator::AddNode(AbstractProcessNode* node) 110 { 111 AutoLocker<BLocker> locker(&fLock); 112 fNodes.AddItem(node); 113 node->Process()->SetListener(this); 114 } 115 116 117 void 118 ProcessCoordinator::ProcessChanged() 119 { 120 _CoordinateAndCallListener(); 121 } 122 123 124 bool 125 ProcessCoordinator::IsRunning() 126 { 127 AutoLocker<BLocker> locker(&fLock); 128 for (int32 i = 0; i < fNodes.CountItems(); i++) { 129 if (_IsRunning(fNodes.ItemAt(i))) 130 return true; 131 } 132 133 return false; 134 } 135 136 137 void 138 ProcessCoordinator::Start() 139 { 140 _CoordinateAndCallListener(); 141 } 142 143 144 void 145 ProcessCoordinator::Stop() 146 { 147 AutoLocker<BLocker> locker(&fLock); 148 if (!fWasStopped) { 149 fWasStopped = true; 150 HDINFO("[Coordinator] will stop process coordinator"); 151 for (int32 i = 0; i < fNodes.CountItems(); i++) { 152 AbstractProcessNode* node = fNodes.ItemAt(i); 153 if (node->Process()->ErrorStatus() != B_OK) { 154 HDINFO("[Coordinator] stopping process [%s] (owing to error)", 155 node->Process()->Name()); 156 } else { 157 HDINFO("[Coordinator] stopping process [%s]", 158 node->Process()->Name()); 159 } 160 node->StopProcess(); 161 } 162 } 163 if (fListener != NULL) { 164 ProcessCoordinatorState state = _CreateStatus(); 165 fListener->CoordinatorChanged(state); 166 } 167 } 168 169 170 status_t 171 ProcessCoordinator::ErrorStatus() 172 { 173 AutoLocker<BLocker> locker(&fLock); 174 for (int32 i = 0; i < fNodes.CountItems(); i++) { 175 status_t result = fNodes.ItemAt(i)->Process()->ErrorStatus(); 176 177 if (result != B_OK) 178 return result; 179 } 180 181 return B_OK; 182 } 183 184 185 float 186 ProcessCoordinator::Progress() 187 { 188 AutoLocker<BLocker> locker(&fLock); 189 float result = 0.0f; 190 191 if (!fWasStopped) { 192 int32 count = fNodes.CountItems(); 193 194 // if there is only one then return it's value directly because this 195 // allows for the indeterminate state of -1. 196 197 if (count == 1) 198 result = fNodes.ItemAt(0)->Process()->Progress(); 199 else { 200 float progressPerNode = 1.0f / ((float) count); 201 202 for (int32 i = count - 1; i >= 0; i--) { 203 AbstractProcess* process = fNodes.ItemAt(i)->Process(); 204 205 switch(process->ProcessState()) { 206 case PROCESS_INITIAL: 207 break; 208 case PROCESS_RUNNING: 209 result += (progressPerNode * fmaxf( 210 0.0f, fminf(1.0, process->Progress()))); 211 break; 212 case PROCESS_COMPLETE: 213 result += progressPerNode; 214 break; 215 } 216 } 217 } 218 } 219 return result; 220 } 221 222 223 const BString& 224 ProcessCoordinator::Name() const 225 { 226 return fName; 227 } 228 229 230 BMessage* 231 ProcessCoordinator::Message() const 232 { 233 return fMessage; 234 } 235 236 237 BString 238 ProcessCoordinator::_CreateStatusMessage() 239 { 240 // work through the nodes and take a description from the first one. If 241 // there are others present then use a 'plus X others' suffix. Go backwards 242 // through the processes so that the most recent activity is shown first. 243 244 BString firstProcessDescription; 245 uint32 additionalRunningProcesses = 0; 246 247 for (int32 i = fNodes.CountItems() - 1; i >= 0; i--) { 248 AbstractProcess* process = fNodes.ItemAt(i)->Process(); 249 250 if (process->ProcessState() == PROCESS_RUNNING) { 251 if (firstProcessDescription.IsEmpty()) { 252 firstProcessDescription = process->Description(); 253 } else { 254 additionalRunningProcesses++; 255 } 256 } 257 } 258 259 if (firstProcessDescription.IsEmpty()) 260 return "???"; 261 262 if (additionalRunningProcesses == 0) 263 return firstProcessDescription; 264 265 static BStringFormat format(B_TRANSLATE( 266 "%FIRST_PROCESS_DESCRIPTION% +" 267 "{0, plural, one{# process} other{# processes}}")); 268 BString result; 269 format.Format(result, additionalRunningProcesses); 270 result.ReplaceAll("%FIRST_PROCESS_DESCRIPTION%", firstProcessDescription); 271 272 return result; 273 } 274 275 276 /*! This method assumes that a lock is held on the coordinator. */ 277 278 ProcessCoordinatorState 279 ProcessCoordinator::_CreateStatus() 280 { 281 return ProcessCoordinatorState( 282 this, Progress(), _CreateStatusMessage(), IsRunning(), ErrorStatus()); 283 } 284 285 286 void 287 ProcessCoordinator::_CoordinateAndCallListener() 288 { 289 ProcessCoordinatorState state = _Coordinate(); 290 291 if (fListener != NULL) 292 fListener->CoordinatorChanged(state); 293 } 294 295 296 ProcessCoordinatorState 297 ProcessCoordinator::_Coordinate() 298 { 299 HDTRACE("[Coordinator] will coordinate nodes"); 300 AutoLocker<BLocker> locker(&fLock); 301 _StopSuccessorNodesToErroredOrStoppedNodes(); 302 303 // go through the nodes and find those that are still to be run and 304 // for which the preconditions are met to start. 305 for (int32 i = 0; i < fNodes.CountItems(); i++) { 306 AbstractProcessNode* node = fNodes.ItemAt(i); 307 308 if (node->Process()->ProcessState() == PROCESS_INITIAL) { 309 if (node->AllPredecessorsComplete()) 310 node->StartProcess(); 311 else { 312 HDTRACE("[Coordinator] all predecessors not complete -> " 313 "[%s] not started", node->Process()->Name()); 314 } 315 } else { 316 HDTRACE("[Coordinator] process [%s] running or complete", 317 node->Process()->Name()); 318 } 319 } 320 321 return _CreateStatus(); 322 } 323 324 325 /*! This method assumes that a lock is held on the coordinator. */ 326 327 void 328 ProcessCoordinator::_StopSuccessorNodesToErroredOrStoppedNodes() 329 { 330 for (int32 i = 0; i < fNodes.CountItems(); i++) { 331 AbstractProcessNode* node = fNodes.ItemAt(i); 332 AbstractProcess* process = node->Process(); 333 334 if (process->WasStopped() || process->ErrorStatus() != B_OK) 335 _StopSuccessorNodes(node); 336 } 337 } 338 339 340 /*! This method assumes that a lock is held on the coordinator. */ 341 342 void 343 ProcessCoordinator::_StopSuccessorNodes(AbstractProcessNode* predecessorNode) 344 { 345 for (int32 i = 0; i < predecessorNode->CountSuccessors(); i++) { 346 AbstractProcessNode* node = predecessorNode->SuccessorAt(i); 347 AbstractProcess* process = node->Process(); 348 349 if (process->ProcessState() == PROCESS_INITIAL) { 350 HDDEBUG("[Coordinator] [%s] (failed) --> [%s] (stopping)", 351 predecessorNode->Process()->Name(), process->Name()); 352 node->StopProcess(); 353 _StopSuccessorNodes(node); 354 } 355 } 356 } 357 358 359 bool 360 ProcessCoordinator::_IsRunning(AbstractProcessNode* node) 361 { 362 return node->Process()->ProcessState() != PROCESS_COMPLETE; 363 } 364 365 366 int32 367 ProcessCoordinator::_CountNodesCompleted() 368 { 369 int32 nodesCompleted = 0; 370 for (int32 i = 0; i < fNodes.CountItems(); i++) { 371 AbstractProcess *process = fNodes.ItemAt(i)->Process(); 372 if (process->ProcessState() == PROCESS_COMPLETE) 373 nodesCompleted++; 374 } 375 return nodesCompleted; 376 } 377