1 /* 2 * Copyright 2018-2020, Andrew Lindesay <apl@lindesay.co.nz>. 3 * All rights reserved. Distributed under the terms of the MIT License. 4 */ 5 6 7 #include "ProcessCoordinator.h" 8 9 #include <AutoLocker.h> 10 #include <Catalog.h> 11 #include <StringFormat.h> 12 13 #include "Logger.h" 14 15 16 #undef B_TRANSLATION_CONTEXT 17 #define B_TRANSLATION_CONTEXT "ProcessCoordinator" 18 19 20 // #pragma mark - ProcessCoordinatorState implementation 21 22 23 ProcessCoordinatorState::ProcessCoordinatorState( 24 const ProcessCoordinator* processCoordinator, float progress, 25 const BString& message, bool isRunning, status_t errorStatus) 26 : 27 fProcessCoordinator(processCoordinator), 28 fProgress(progress), 29 fMessage(message), 30 fIsRunning(isRunning), 31 fErrorStatus(errorStatus) 32 { 33 } 34 35 36 ProcessCoordinatorState::~ProcessCoordinatorState() 37 { 38 } 39 40 41 const ProcessCoordinator* 42 ProcessCoordinatorState::Coordinator() const 43 { 44 return fProcessCoordinator; 45 } 46 47 48 float 49 ProcessCoordinatorState::Progress() const 50 { 51 return fProgress; 52 } 53 54 55 BString 56 ProcessCoordinatorState::Message() const 57 { 58 return fMessage; 59 } 60 61 62 bool 63 ProcessCoordinatorState::IsRunning() const 64 { 65 return fIsRunning; 66 } 67 68 69 status_t 70 ProcessCoordinatorState::ErrorStatus() const 71 { 72 return fErrorStatus; 73 } 74 75 76 // #pragma mark - ProcessCoordinator implementation 77 78 79 ProcessCoordinator::ProcessCoordinator(const char* name, BMessage* message) 80 : 81 fName(name), 82 fListener(NULL), 83 fMessage(message), 84 fWasStopped(false) 85 { 86 } 87 88 89 ProcessCoordinator::~ProcessCoordinator() 90 { 91 AutoLocker<BLocker> locker(&fLock); 92 for (int32 i = 0; i < fNodes.CountItems(); i++) { 93 AbstractProcessNode* node = fNodes.ItemAt(i); 94 node->Process()->SetListener(NULL); 95 delete node; 96 } 97 delete fMessage; 98 } 99 100 101 void 102 ProcessCoordinator::SetListener(ProcessCoordinatorListener *listener) 103 { 104 fListener = listener; 105 } 106 107 108 void 109 ProcessCoordinator::AddNode(AbstractProcessNode* node) 110 { 111 AutoLocker<BLocker> locker(&fLock); 112 fNodes.AddItem(node); 113 node->Process()->SetListener(this); 114 } 115 116 117 void 118 ProcessCoordinator::ProcessChanged() 119 { 120 _CoordinateAndCallListener(); 121 } 122 123 124 bool 125 ProcessCoordinator::IsRunning() 126 { 127 AutoLocker<BLocker> locker(&fLock); 128 for (int32 i = 0; i < fNodes.CountItems(); i++) { 129 if (_IsRunning(fNodes.ItemAt(i))) 130 return true; 131 } 132 133 return false; 134 } 135 136 137 void 138 ProcessCoordinator::Start() 139 { 140 _CoordinateAndCallListener(); 141 } 142 143 144 void 145 ProcessCoordinator::RequestStop() 146 { 147 AutoLocker<BLocker> locker(&fLock); 148 if (!fWasStopped) { 149 fWasStopped = true; 150 HDINFO("[Coordinator] will stop process coordinator"); 151 for (int32 i = 0; i < fNodes.CountItems(); i++) { 152 AbstractProcessNode* node = fNodes.ItemAt(i); 153 HDINFO("[Coordinator] stopping process [%s]", 154 node->Process()->Name()); 155 node->RequestStop(); 156 } 157 } 158 } 159 160 161 status_t 162 ProcessCoordinator::ErrorStatus() 163 { 164 AutoLocker<BLocker> locker(&fLock); 165 for (int32 i = 0; i < fNodes.CountItems(); i++) { 166 status_t result = fNodes.ItemAt(i)->Process()->ErrorStatus(); 167 168 if (result != B_OK) 169 return result; 170 } 171 172 return B_OK; 173 } 174 175 176 float 177 ProcessCoordinator::Progress() 178 { 179 AutoLocker<BLocker> locker(&fLock); 180 float result = 0.0f; 181 182 if (!fWasStopped) { 183 int32 count = fNodes.CountItems(); 184 185 // if there is only one then return it's value directly because this 186 // allows for the indeterminate state of -1. 187 188 if (count == 1) 189 result = fNodes.ItemAt(0)->Process()->Progress(); 190 else { 191 float progressPerNode = 1.0f / ((float) count); 192 193 for (int32 i = count - 1; i >= 0; i--) { 194 AbstractProcess* process = fNodes.ItemAt(i)->Process(); 195 196 switch(process->ProcessState()) { 197 case PROCESS_INITIAL: 198 break; 199 case PROCESS_RUNNING: 200 result += (progressPerNode * fmaxf( 201 0.0f, fminf(1.0, process->Progress()))); 202 break; 203 case PROCESS_COMPLETE: 204 result += progressPerNode; 205 break; 206 } 207 } 208 } 209 } 210 return result; 211 } 212 213 214 const BString& 215 ProcessCoordinator::Name() const 216 { 217 return fName; 218 } 219 220 221 BMessage* 222 ProcessCoordinator::Message() const 223 { 224 return fMessage; 225 } 226 227 228 BString 229 ProcessCoordinator::_CreateStatusMessage() 230 { 231 // work through the nodes and take a description from the first one. If 232 // there are others present then use a 'plus X others' suffix. Go backwards 233 // through the processes so that the most recent activity is shown first. 234 235 BString firstProcessDescription; 236 uint32 additionalRunningProcesses = 0; 237 238 for (int32 i = fNodes.CountItems() - 1; i >= 0; i--) { 239 AbstractProcess* process = fNodes.ItemAt(i)->Process(); 240 241 if (process->ProcessState() == PROCESS_RUNNING) { 242 if (firstProcessDescription.IsEmpty()) { 243 firstProcessDescription = process->Description(); 244 } else { 245 additionalRunningProcesses++; 246 } 247 } 248 } 249 250 if (firstProcessDescription.IsEmpty()) 251 return "???"; 252 253 if (additionalRunningProcesses == 0) 254 return firstProcessDescription; 255 256 static BStringFormat format(B_TRANSLATE( 257 "%FIRST_PROCESS_DESCRIPTION% +" 258 "{0, plural, one{# process} other{# processes}}")); 259 BString result; 260 format.Format(result, additionalRunningProcesses); 261 result.ReplaceAll("%FIRST_PROCESS_DESCRIPTION%", firstProcessDescription); 262 263 return result; 264 } 265 266 267 /*! This method assumes that a lock is held on the coordinator. */ 268 269 ProcessCoordinatorState 270 ProcessCoordinator::_CreateStatus() 271 { 272 return ProcessCoordinatorState( 273 this, Progress(), _CreateStatusMessage(), IsRunning(), ErrorStatus()); 274 } 275 276 277 void 278 ProcessCoordinator::_CoordinateAndCallListener() 279 { 280 ProcessCoordinatorState state = _Coordinate(); 281 282 if (fListener != NULL) 283 fListener->CoordinatorChanged(state); 284 } 285 286 287 ProcessCoordinatorState 288 ProcessCoordinator::_Coordinate() 289 { 290 HDTRACE("[Coordinator] will coordinate nodes"); 291 AutoLocker<BLocker> locker(&fLock); 292 _StopSuccessorNodesToErroredOrStoppedNodes(); 293 294 // go through the nodes and find those that are still to be run and 295 // for which the preconditions are met to start. 296 for (int32 i = 0; i < fNodes.CountItems(); i++) { 297 AbstractProcessNode* node = fNodes.ItemAt(i); 298 299 if (node->Process()->ProcessState() == PROCESS_INITIAL) { 300 if (node->AllPredecessorsComplete()) 301 node->Start(); 302 else { 303 HDTRACE("[Coordinator] all predecessors not complete -> " 304 "[%s] not started", node->Process()->Name()); 305 } 306 } else { 307 HDTRACE("[Coordinator] process [%s] running or complete", 308 node->Process()->Name()); 309 } 310 } 311 312 return _CreateStatus(); 313 } 314 315 316 /*! This method assumes that a lock is held on the coordinator. */ 317 318 void 319 ProcessCoordinator::_StopSuccessorNodesToErroredOrStoppedNodes() 320 { 321 for (int32 i = 0; i < fNodes.CountItems(); i++) { 322 AbstractProcessNode* node = fNodes.ItemAt(i); 323 AbstractProcess* process = node->Process(); 324 325 if (process->WasStopped() || process->ErrorStatus() != B_OK) 326 _StopSuccessorNodes(node); 327 } 328 } 329 330 331 /*! This method assumes that a lock is held on the coordinator. */ 332 333 void 334 ProcessCoordinator::_StopSuccessorNodes(AbstractProcessNode* predecessorNode) 335 { 336 for (int32 i = 0; i < predecessorNode->CountSuccessors(); i++) { 337 AbstractProcessNode* node = predecessorNode->SuccessorAt(i); 338 AbstractProcess* process = node->Process(); 339 340 if (process->ProcessState() == PROCESS_INITIAL) { 341 HDDEBUG("[Coordinator] [%s] (failed) --> [%s] (stopping)", 342 predecessorNode->Process()->Name(), process->Name()); 343 node->RequestStop(); 344 _StopSuccessorNodes(node); 345 } 346 } 347 } 348 349 350 bool 351 ProcessCoordinator::_IsRunning(AbstractProcessNode* node) 352 { 353 return node->Process()->ProcessState() != PROCESS_COMPLETE; 354 } 355 356 357 int32 358 ProcessCoordinator::_CountNodesCompleted() 359 { 360 int32 nodesCompleted = 0; 361 for (int32 i = 0; i < fNodes.CountItems(); i++) { 362 AbstractProcess *process = fNodes.ItemAt(i)->Process(); 363 if (process->ProcessState() == PROCESS_COMPLETE) 364 nodesCompleted++; 365 } 366 return nodesCompleted; 367 } 368