1 /* 2 * Copyright 2011-2013, Axel Dörfler, axeld@pinc-software.de. 3 * Distributed under the terms of the MIT License. 4 */ 5 6 7 #include "IMAPConnectionWorker.h" 8 9 #include <Autolock.h> 10 11 #include <AutoDeleter.h> 12 13 #include "IMAPFolder.h" 14 #include "IMAPMailbox.h" 15 #include "IMAPProtocol.h" 16 17 18 using IMAP::MessageUIDList; 19 20 21 static const uint32 kMaxFetchEntries = 500; 22 static const uint32 kMaxDirectDownloadSize = 4096; 23 24 25 class WorkerPrivate { 26 public: 27 WorkerPrivate(IMAPConnectionWorker& worker) 28 : 29 fWorker(worker) 30 { 31 } 32 33 IMAP::Protocol& Protocol() 34 { 35 return fWorker.fProtocol; 36 } 37 38 status_t AddFolders(BObjectList<IMAPFolder>& folders) 39 { 40 IMAPConnectionWorker::MailboxMap::iterator iterator 41 = fWorker.fMailboxes.begin(); 42 for (; iterator != fWorker.fMailboxes.end(); iterator++) { 43 IMAPFolder* folder = iterator->first; 44 if (!folders.AddItem(folder)) 45 return B_NO_MEMORY; 46 } 47 return B_OK; 48 } 49 50 status_t SelectMailbox(IMAPFolder& folder) 51 { 52 return fWorker._SelectMailbox(folder, NULL); 53 } 54 55 status_t SelectMailbox(IMAPFolder& folder, uint32& nextUID) 56 { 57 return fWorker._SelectMailbox(folder, &nextUID); 58 } 59 60 IMAPMailbox* MailboxFor(IMAPFolder& folder) 61 { 62 return fWorker._MailboxFor(folder); 63 } 64 65 int32 BodyFetchLimit() const 66 { 67 return fWorker.fSettings.BodyFetchLimit(); 68 } 69 70 uint32 MessagesExist() const 71 { 72 return fWorker._MessagesExist(); 73 } 74 75 status_t EnqueueCommand(WorkerCommand* command) 76 { 77 return fWorker._EnqueueCommand(command); 78 } 79 80 void SyncCommandDone() 81 { 82 fWorker._SyncCommandDone(); 83 } 84 85 void Quit() 86 { 87 fWorker.fStopped = true; 88 } 89 90 private: 91 IMAPConnectionWorker& fWorker; 92 }; 93 94 95 class WorkerCommand { 96 public: 97 WorkerCommand() {} 98 virtual ~WorkerCommand() {} 99 100 virtual status_t Process(IMAPConnectionWorker& worker) = 0; 101 virtual bool IsDone() const { return true; } 102 }; 103 104 105 /*! All commands that inherit from this class will automatically maintain the 106 worker's fSyncPending member, and will thus prevent syncing more than once 107 concurrently. 108 */ 109 class SyncCommand : public WorkerCommand { 110 }; 111 112 113 class QuitCommand : public WorkerCommand { 114 public: 115 QuitCommand() 116 { 117 } 118 119 virtual status_t Process(IMAPConnectionWorker& worker) 120 { 121 WorkerPrivate(worker).Quit(); 122 return B_OK; 123 } 124 }; 125 126 127 class CheckSubscribedFoldersCommand : public WorkerCommand { 128 public: 129 virtual status_t Process(IMAPConnectionWorker& worker) 130 { 131 IMAP::Protocol& protocol = WorkerPrivate(worker).Protocol(); 132 133 // The main worker checks the subscribed folders, and creates 134 // other workers as needed 135 return worker.Owner().CheckSubscribedFolders(protocol, 136 worker.UsesIdle()); 137 } 138 }; 139 140 141 class FetchBodiesCommand : public SyncCommand, public IMAP::FetchListener { 142 public: 143 FetchBodiesCommand(IMAPFolder& folder, IMAPMailbox& mailbox, 144 std::vector<uint32>& entries) 145 : 146 fFolder(folder), 147 fMailbox(mailbox), 148 fEntries(entries) 149 { 150 } 151 152 virtual status_t Process(IMAPConnectionWorker& worker) 153 { 154 IMAP::Protocol& protocol = WorkerPrivate(worker).Protocol(); 155 156 if (fEntries.empty()) 157 return B_OK; 158 159 fUID = *fEntries.begin(); 160 fEntries.erase(fEntries.begin()); 161 162 status_t status = WorkerPrivate(worker).SelectMailbox(fFolder); 163 if (status != B_OK) 164 return status; 165 166 printf("IMAP: fetch body for %" B_PRIu32 "\n", fUID); 167 // Since RFC3501 does not specify whether the FETCH response may 168 // alter the order of the message data items we request, we cannot 169 // request more than a single UID at a time, or else we may not be 170 // able to assign the data to the correct message beforehand. 171 IMAP::FetchCommand fetch(fUID, fUID, IMAP::kFetchBody); 172 fetch.SetListener(this); 173 174 return protocol.ProcessCommand(fetch); 175 } 176 177 virtual bool IsDone() const 178 { 179 return fEntries.empty(); 180 } 181 182 virtual bool FetchData(uint32 fetchFlags, BDataIO& stream, size_t& length) 183 { 184 fFetchStatus = fFolder.StoreBody(fUID, stream, length, fRef, fFile); 185 return true; 186 } 187 188 virtual void FetchedData(uint32 fetchFlags, uint32 uid, uint32 flags) 189 { 190 if (fFetchStatus == B_OK) 191 fFolder.BodyStored(fRef, fFile, uid); 192 } 193 194 private: 195 IMAPFolder& fFolder; 196 IMAPMailbox& fMailbox; 197 std::vector<uint32> fEntries; 198 uint32 fUID; 199 entry_ref fRef; 200 BFile fFile; 201 status_t fFetchStatus; 202 }; 203 204 205 class FetchHeadersCommand : public SyncCommand, public IMAP::FetchListener { 206 public: 207 FetchHeadersCommand(IMAPFolder& folder, IMAPMailbox& mailbox, 208 MessageUIDList& uids, int32 bodyFetchLimit) 209 : 210 fFolder(folder), 211 fMailbox(mailbox), 212 fUIDs(uids), 213 fBodyFetchLimit(bodyFetchLimit) 214 { 215 } 216 217 virtual status_t Process(IMAPConnectionWorker& worker) 218 { 219 IMAP::Protocol& protocol = WorkerPrivate(worker).Protocol(); 220 221 status_t status = WorkerPrivate(worker).SelectMailbox(fFolder); 222 if (status != B_OK) 223 return status; 224 225 printf("IMAP: fetch %" B_PRIuSIZE "u headers\n", fUIDs.size()); 226 227 IMAP::FetchCommand fetch(fUIDs, kMaxFetchEntries, 228 IMAP::kFetchHeader | IMAP::kFetchFlags); 229 fetch.SetListener(this); 230 231 status = protocol.ProcessCommand(fetch); 232 if (status != B_OK) 233 return status; 234 235 if (IsDone() && !fFetchBodies.empty()) { 236 // Enqueue command to fetch the message bodies 237 WorkerPrivate(worker).EnqueueCommand(new FetchBodiesCommand(fFolder, 238 fMailbox, fFetchBodies)); 239 } 240 241 return B_OK; 242 } 243 244 virtual bool IsDone() const 245 { 246 return fUIDs.empty(); 247 } 248 249 virtual bool FetchData(uint32 fetchFlags, BDataIO& stream, size_t& length) 250 { 251 fFetchStatus = fFolder.StoreMessage(fetchFlags, stream, length, 252 fRef, fFile); 253 return true; 254 } 255 256 virtual void FetchedData(uint32 fetchFlags, uint32 uid, uint32 flags) 257 { 258 if (fFetchStatus == B_OK) { 259 fFolder.MessageStored(fRef, fFile, fetchFlags, uid, flags); 260 261 uint32 size = fMailbox.MessageSize(uid); 262 if (fBodyFetchLimit < 0 || size < fBodyFetchLimit) 263 fFetchBodies.push_back(uid); 264 } 265 } 266 267 private: 268 IMAPFolder& fFolder; 269 IMAPMailbox& fMailbox; 270 MessageUIDList fUIDs; 271 MessageUIDList fFetchBodies; 272 uint32 fBodyFetchLimit; 273 entry_ref fRef; 274 BFile fFile; 275 status_t fFetchStatus; 276 }; 277 278 279 class CheckMailboxesCommand : public SyncCommand { 280 public: 281 CheckMailboxesCommand(IMAPConnectionWorker& worker) 282 : 283 fWorker(worker), 284 fFolders(5, false), 285 fState(INIT), 286 fFolder(NULL), 287 fMailbox(NULL) 288 { 289 } 290 291 virtual status_t Process(IMAPConnectionWorker& worker) 292 { 293 IMAP::Protocol& protocol = WorkerPrivate(worker).Protocol(); 294 295 if (fState == INIT) { 296 // Collect folders 297 status_t status = WorkerPrivate(worker).AddFolders(fFolders); 298 if (status != B_OK || fFolders.IsEmpty()) { 299 fState = DONE; 300 return status; 301 } 302 303 fState = SELECT; 304 } 305 306 if (fState == SELECT) { 307 // Get next mailbox from list, and select it 308 fFolder = fFolders.RemoveItemAt(fFolders.CountItems() - 1); 309 if (fFolder == NULL) { 310 for (int32 i = 0; i < fFetchCommands.CountItems(); i++) { 311 WorkerPrivate(worker).EnqueueCommand( 312 fFetchCommands.ItemAt(i)); 313 } 314 315 fState = DONE; 316 return B_OK; 317 } 318 319 fMailbox = WorkerPrivate(worker).MailboxFor(*fFolder); 320 321 status_t status = WorkerPrivate(worker).SelectMailbox(*fFolder); 322 if (status != B_OK) 323 return status; 324 325 fLastIndex = WorkerPrivate(worker).MessagesExist(); 326 fFirstIndex = fMailbox->CountMessages() + 1; 327 if (fLastIndex > 0) 328 fState = FETCH_ENTRIES; 329 } 330 331 if (fState == FETCH_ENTRIES) { 332 status_t status = WorkerPrivate(worker).SelectMailbox(*fFolder); 333 if (status != B_OK) 334 return status; 335 336 uint32 to = fLastIndex; 337 uint32 from = fFirstIndex + kMaxFetchEntries < to 338 ? fLastIndex - kMaxFetchEntries : fFirstIndex; 339 340 printf("IMAP: get entries from %" B_PRIu32 " to %" B_PRIu32 "\n", 341 from, to); 342 343 IMAP::MessageEntryList entries; 344 IMAP::FetchMessageEntriesCommand fetch(entries, from, to, false); 345 status = protocol.ProcessCommand(fetch); 346 if (status != B_OK) 347 return status; 348 349 std::vector<uint32> uidsToFetch; 350 351 // Determine how much we need to download 352 // TODO: also retrieve the header size, and only take the body 353 // size into account if it's below the limit -- that does not 354 // seem to be possible, though 355 for (size_t i = 0; i < entries.size(); i++) { 356 printf("%10" B_PRIu32 " %8" B_PRIu32 " bytes, flags: %#" 357 B_PRIx32 "\n", entries[i].uid, entries[i].size, 358 entries[i].flags); 359 fMailbox->AddMessageEntry(from + i, entries[i].uid, 360 entries[i].flags, entries[i].size); 361 362 if (entries[i].uid > fFolder->LastUID()) { 363 fTotalBytes += entries[i].size; 364 fUIDsToFetch.push_back(entries[i].uid); 365 } 366 } 367 368 fTotalEntries += fUIDsToFetch.size(); 369 fLastIndex = from - 1; 370 371 if (from == 1) { 372 if (fUIDsToFetch.size() > 0) { 373 // Add pending command to fetch the message headers 374 WorkerCommand* command = new FetchHeadersCommand(*fFolder, 375 *fMailbox, fUIDsToFetch, 376 WorkerPrivate(worker).BodyFetchLimit()); 377 if (!fFetchCommands.AddItem(command)) 378 delete command; 379 380 fUIDsToFetch.clear(); 381 } 382 fState = SELECT; 383 } 384 } 385 386 return B_OK; 387 } 388 389 virtual bool IsDone() const 390 { 391 return fState == DONE; 392 } 393 394 private: 395 enum State { 396 INIT, 397 SELECT, 398 FETCH_ENTRIES, 399 DONE 400 }; 401 402 IMAPConnectionWorker& fWorker; 403 BObjectList<IMAPFolder> fFolders; 404 State fState; 405 IMAPFolder* fFolder; 406 IMAPMailbox* fMailbox; 407 uint32 fFirstIndex; 408 uint32 fLastIndex; 409 uint64 fTotalEntries; 410 uint64 fTotalBytes; 411 WorkerCommandList fFetchCommands; 412 MessageUIDList fUIDsToFetch; 413 }; 414 415 416 struct CommandDelete 417 { 418 inline void operator()(WorkerCommand* command) 419 { 420 delete command; 421 } 422 }; 423 424 425 /*! An auto deleter similar to ObjectDeleter that calls SyncCommandDone() 426 for all SyncCommands. 427 */ 428 struct CommandDeleter : BPrivate::AutoDeleter<WorkerCommand, CommandDelete> 429 { 430 CommandDeleter(IMAPConnectionWorker& worker, WorkerCommand* command) 431 : 432 BPrivate::AutoDeleter<WorkerCommand, CommandDelete>(command), 433 fWorker(worker) 434 { 435 } 436 437 ~CommandDeleter() 438 { 439 if (dynamic_cast<SyncCommand*>(fObject) != NULL) 440 WorkerPrivate(fWorker).SyncCommandDone(); 441 } 442 443 private: 444 IMAPConnectionWorker& fWorker; 445 }; 446 447 448 // #pragma mark - 449 450 451 IMAPConnectionWorker::IMAPConnectionWorker(IMAPProtocol& owner, 452 const Settings& settings, bool main) 453 : 454 fOwner(owner), 455 fSettings(settings), 456 fPendingCommandsSemaphore(-1), 457 fIdleBox(NULL), 458 fMain(main), 459 fStopped(false) 460 { 461 fExistsHandler.SetListener(this); 462 fProtocol.AddHandler(fExistsHandler); 463 464 fExpungeHandler.SetListener(this); 465 fProtocol.AddHandler(fExpungeHandler); 466 } 467 468 469 IMAPConnectionWorker::~IMAPConnectionWorker() 470 { 471 puts("worker quit"); 472 delete_sem(fPendingCommandsSemaphore); 473 _Disconnect(); 474 } 475 476 477 bool 478 IMAPConnectionWorker::HasMailboxes() const 479 { 480 BAutolock locker(const_cast<IMAPConnectionWorker*>(this)->fLocker); 481 return !fMailboxes.empty(); 482 } 483 484 485 uint32 486 IMAPConnectionWorker::CountMailboxes() const 487 { 488 BAutolock locker(const_cast<IMAPConnectionWorker*>(this)->fLocker); 489 return fMailboxes.size(); 490 } 491 492 493 void 494 IMAPConnectionWorker::AddMailbox(IMAPFolder* folder) 495 { 496 BAutolock locker(fLocker); 497 498 fMailboxes.insert(std::make_pair(folder, (IMAPMailbox*)NULL)); 499 500 // Prefer to have the INBOX in idle mode over other mail boxes 501 if (fIdleBox == NULL || folder->MailboxName().ICompare("INBOX") == 0) 502 fIdleBox = folder; 503 } 504 505 506 void 507 IMAPConnectionWorker::RemoveAllMailboxes() 508 { 509 BAutolock locker(fLocker); 510 511 // Reset listeners, and delete the mailboxes 512 MailboxMap::iterator iterator = fMailboxes.begin(); 513 for (; iterator != fMailboxes.end(); iterator++) { 514 iterator->first->SetListener(NULL); 515 delete iterator->second; 516 } 517 518 fIdleBox = NULL; 519 fMailboxes.clear(); 520 } 521 522 523 status_t 524 IMAPConnectionWorker::Run() 525 { 526 fPendingCommandsSemaphore = create_sem(0, "imap pending commands"); 527 if (fPendingCommandsSemaphore < 0) 528 return fPendingCommandsSemaphore; 529 530 fThread = spawn_thread(&_Worker, "imap connection worker", 531 B_NORMAL_PRIORITY, this); 532 if (fThread < 0) 533 return fThread; 534 535 resume_thread(fThread); 536 return B_OK; 537 } 538 539 540 void 541 IMAPConnectionWorker::Quit() 542 { 543 printf("IMAP: worker %p: enqueue quit\n", this); 544 _EnqueueCommand(new QuitCommand()); 545 } 546 547 548 status_t 549 IMAPConnectionWorker::EnqueueCheckSubscribedFolders() 550 { 551 printf("IMAP: worker %p: enqueue check subscribed folders\n", this); 552 return _EnqueueCommand(new CheckSubscribedFoldersCommand()); 553 } 554 555 556 status_t 557 IMAPConnectionWorker::EnqueueCheckMailboxes() 558 { 559 // Do not schedule checking mailboxes again if we're still working on 560 // those. 561 if (fSyncPending > 0) 562 return B_OK; 563 564 printf("IMAP: worker %p: enqueue check mailboxes\n", this); 565 return _EnqueueCommand(new CheckMailboxesCommand(*this)); 566 } 567 568 569 status_t 570 IMAPConnectionWorker::EnqueueRetrieveMail(entry_ref& ref) 571 { 572 return B_OK; 573 } 574 575 576 // #pragma mark - Handler listener 577 578 579 void 580 IMAPConnectionWorker::MessageExistsReceived(uint32 count) 581 { 582 printf("Message exists: %" B_PRIu32 "\n", count); 583 fMessagesExist = count; 584 585 // TODO: We might want to trigger another check even during sync 586 // (but only one), if this isn't the result of a SELECT 587 EnqueueCheckMailboxes(); 588 } 589 590 591 void 592 IMAPConnectionWorker::MessageExpungeReceived(uint32 index) 593 { 594 printf("Message expunge: %" B_PRIu32 "\n", index); 595 if (fSelectedBox == NULL) 596 return; 597 598 IMAPMailbox* mailbox = _MailboxFor(*fSelectedBox); 599 if (mailbox != NULL) { 600 mailbox->RemoveMessageEntry(index); 601 // TODO: remove message from folder 602 } 603 } 604 605 606 // #pragma mark - private 607 608 609 status_t 610 IMAPConnectionWorker::_Worker() 611 { 612 while (!fStopped) { 613 BAutolock locker(fLocker); 614 615 if (fPendingCommands.IsEmpty()) { 616 _Disconnect(); 617 locker.Unlock(); 618 619 _WaitForCommands(); 620 continue; 621 } 622 623 WorkerCommand* command = fPendingCommands.RemoveItemAt(0); 624 if (command == NULL) 625 continue; 626 627 CommandDeleter deleter(*this, command); 628 629 status_t status = _Connect(); 630 if (status != B_OK) 631 return status; 632 633 status = command->Process(*this); 634 if (status != B_OK) 635 return status; 636 637 if (!command->IsDone()) { 638 deleter.Detach(); 639 _EnqueueCommand(command); 640 } 641 } 642 643 fOwner.WorkerQuit(this); 644 return B_OK; 645 } 646 647 648 /*! Enqueues the given command to the worker queue. This method will take 649 over ownership of the given command even in the error case. 650 */ 651 status_t 652 IMAPConnectionWorker::_EnqueueCommand(WorkerCommand* command) 653 { 654 BAutolock locker(fLocker); 655 656 if (!fPendingCommands.AddItem(command)) { 657 delete command; 658 return B_NO_MEMORY; 659 } 660 661 if (dynamic_cast<SyncCommand*>(command) != NULL) 662 fSyncPending++; 663 664 locker.Unlock(); 665 release_sem(fPendingCommandsSemaphore); 666 return B_OK; 667 } 668 669 670 void 671 IMAPConnectionWorker::_WaitForCommands() 672 { 673 while (acquire_sem(fPendingCommandsSemaphore) == B_INTERRUPTED); 674 } 675 676 677 status_t 678 IMAPConnectionWorker::_SelectMailbox(IMAPFolder& folder, uint32* _nextUID) 679 { 680 if (fSelectedBox == &folder && _nextUID == NULL) 681 return B_OK; 682 683 IMAP::SelectCommand select(folder.MailboxName().String()); 684 685 status_t status = fProtocol.ProcessCommand(select); 686 if (status == B_OK) { 687 folder.SetUIDValidity(select.UIDValidity()); 688 if (_nextUID != NULL) 689 *_nextUID = select.NextUID(); 690 fSelectedBox = &folder; 691 } 692 693 return status; 694 } 695 696 697 IMAPMailbox* 698 IMAPConnectionWorker::_MailboxFor(IMAPFolder& folder) 699 { 700 MailboxMap::iterator found = fMailboxes.find(&folder); 701 if (found == fMailboxes.end()) 702 return NULL; 703 704 IMAPMailbox* mailbox = found->second; 705 if (mailbox == NULL) { 706 mailbox = new IMAPMailbox(fProtocol, folder.MailboxName()); 707 folder.SetListener(mailbox); 708 found->second = mailbox; 709 } 710 return mailbox; 711 } 712 713 714 void 715 IMAPConnectionWorker::_SyncCommandDone() 716 { 717 fSyncPending--; 718 } 719 720 721 status_t 722 IMAPConnectionWorker::_Connect() 723 { 724 if (fProtocol.IsConnected()) 725 return B_OK; 726 727 status_t status; 728 int tries = 6; 729 while (tries-- > 0) { 730 status = fProtocol.Connect(fSettings.ServerAddress(), 731 fSettings.Username(), fSettings.Password(), fSettings.UseSSL()); 732 if (status == B_OK) 733 break; 734 735 // Wait for 10 seconds, and try again 736 snooze(10000000); 737 } 738 // TODO: if other workers are connected, but it fails for us, we need to 739 // remove this worker, and reduce the number of concurrent connections 740 if (status != B_OK) 741 return status; 742 743 fIdle = fSettings.IdleMode() && fProtocol.Capabilities().Contains("IDLE"); 744 return B_OK; 745 } 746 747 748 void 749 IMAPConnectionWorker::_Disconnect() 750 { 751 fProtocol.Disconnect(); 752 } 753 754 755 /*static*/ status_t 756 IMAPConnectionWorker::_Worker(void* _self) 757 { 758 IMAPConnectionWorker* self = (IMAPConnectionWorker*)_self; 759 status_t status = self->_Worker(); 760 761 delete self; 762 return status; 763 } 764