xref: /haiku/src/add-ons/mail_daemon/inbound_protocols/imap/IMAPConnectionWorker.cpp (revision 7d6915b4d08ffe728cd38af02843d5e98ddfe0db)
1 /*
2  * Copyright 2011-2013, Axel Dörfler, axeld@pinc-software.de.
3  * Distributed under the terms of the MIT License.
4  */
5 
6 
7 #include "IMAPConnectionWorker.h"
8 
9 #include <Autolock.h>
10 
11 #include <AutoDeleter.h>
12 
13 #include "IMAPFolder.h"
14 #include "IMAPMailbox.h"
15 #include "IMAPProtocol.h"
16 
17 
18 using IMAP::MessageUIDList;
19 
20 
21 static const uint32 kMaxFetchEntries = 500;
22 static const uint32 kMaxDirectDownloadSize = 4096;
23 
24 
25 class WorkerPrivate {
26 public:
27 	WorkerPrivate(IMAPConnectionWorker& worker)
28 		:
29 		fWorker(worker)
30 	{
31 	}
32 
33 	IMAP::Protocol& Protocol()
34 	{
35 		return fWorker.fProtocol;
36 	}
37 
38 	status_t AddFolders(BObjectList<IMAPFolder>& folders)
39 	{
40 		IMAPConnectionWorker::MailboxMap::iterator iterator
41 			= fWorker.fMailboxes.begin();
42 		for (; iterator != fWorker.fMailboxes.end(); iterator++) {
43 			IMAPFolder* folder = iterator->first;
44 			if (!folders.AddItem(folder))
45 				return B_NO_MEMORY;
46 		}
47 		return B_OK;
48 	}
49 
50 	status_t SelectMailbox(IMAPFolder& folder)
51 	{
52 		return fWorker._SelectMailbox(folder, NULL);
53 	}
54 
55 	status_t SelectMailbox(IMAPFolder& folder, uint32& nextUID)
56 	{
57 		return fWorker._SelectMailbox(folder, &nextUID);
58 	}
59 
60 	IMAPMailbox* MailboxFor(IMAPFolder& folder)
61 	{
62 		return fWorker._MailboxFor(folder);
63 	}
64 
65 	int32 BodyFetchLimit() const
66 	{
67 		return fWorker.fSettings.BodyFetchLimit();
68 	}
69 
70 	uint32 MessagesExist() const
71 	{
72 		return fWorker._MessagesExist();
73 	}
74 
75 	status_t EnqueueCommand(WorkerCommand* command)
76 	{
77 		return fWorker._EnqueueCommand(command);
78 	}
79 
80 	void SyncCommandDone()
81 	{
82 		fWorker._SyncCommandDone();
83 	}
84 
85 	void Quit()
86 	{
87 		fWorker.fStopped = true;
88 	}
89 
90 private:
91 	IMAPConnectionWorker&	fWorker;
92 };
93 
94 
95 class WorkerCommand {
96 public:
97 								WorkerCommand() {}
98 	virtual						~WorkerCommand() {}
99 
100 	virtual	status_t			Process(IMAPConnectionWorker& worker) = 0;
101 	virtual bool				IsDone() const { return true; }
102 };
103 
104 
105 /*!	All commands that inherit from this class will automatically maintain the
106 	worker's fSyncPending member, and will thus prevent syncing more than once
107 	concurrently.
108 */
109 class SyncCommand : public WorkerCommand {
110 };
111 
112 
113 class QuitCommand : public WorkerCommand {
114 public:
115 	QuitCommand()
116 	{
117 	}
118 
119 	virtual status_t Process(IMAPConnectionWorker& worker)
120 	{
121 		WorkerPrivate(worker).Quit();
122 		return B_OK;
123 	}
124 };
125 
126 
127 class CheckSubscribedFoldersCommand : public WorkerCommand {
128 public:
129 	virtual status_t Process(IMAPConnectionWorker& worker)
130 	{
131 		IMAP::Protocol& protocol = WorkerPrivate(worker).Protocol();
132 
133 		// The main worker checks the subscribed folders, and creates
134 		// other workers as needed
135 		return worker.Owner().CheckSubscribedFolders(protocol,
136 			worker.UsesIdle());
137 	}
138 };
139 
140 
141 class FetchBodiesCommand : public SyncCommand, public IMAP::FetchListener {
142 public:
143 	FetchBodiesCommand(IMAPFolder& folder, IMAPMailbox& mailbox,
144 		std::vector<uint32>& entries)
145 		:
146 		fFolder(folder),
147 		fMailbox(mailbox),
148 		fEntries(entries)
149 	{
150 	}
151 
152 	virtual status_t Process(IMAPConnectionWorker& worker)
153 	{
154 		IMAP::Protocol& protocol = WorkerPrivate(worker).Protocol();
155 
156 		if (fEntries.empty())
157 			return B_OK;
158 
159 		fUID = *fEntries.begin();
160 		fEntries.erase(fEntries.begin());
161 
162 		status_t status = WorkerPrivate(worker).SelectMailbox(fFolder);
163 		if (status != B_OK)
164 			return status;
165 
166 		printf("IMAP: fetch body for %" B_PRIu32 "\n", fUID);
167 		// Since RFC3501 does not specify whether the FETCH response may
168 		// alter the order of the message data items we request, we cannot
169 		// request more than a single UID at a time, or else we may not be
170 		// able to assign the data to the correct message beforehand.
171 		IMAP::FetchCommand fetch(fUID, fUID, IMAP::kFetchBody);
172 		fetch.SetListener(this);
173 
174 		return protocol.ProcessCommand(fetch);
175 	}
176 
177 	virtual bool IsDone() const
178 	{
179 		return fEntries.empty();
180 	}
181 
182 	virtual bool FetchData(uint32 fetchFlags, BDataIO& stream, size_t& length)
183 	{
184 		fFetchStatus = fFolder.StoreBody(fUID, stream, length, fRef, fFile);
185 		return true;
186 	}
187 
188 	virtual void FetchedData(uint32 fetchFlags, uint32 uid, uint32 flags)
189 	{
190 		if (fFetchStatus == B_OK)
191 			fFolder.BodyStored(fRef, fFile, uid);
192 	}
193 
194 private:
195 	IMAPFolder&				fFolder;
196 	IMAPMailbox&			fMailbox;
197 	std::vector<uint32>		fEntries;
198 	uint32					fUID;
199 	entry_ref				fRef;
200 	BFile					fFile;
201 	status_t				fFetchStatus;
202 };
203 
204 
205 class FetchHeadersCommand : public SyncCommand, public IMAP::FetchListener {
206 public:
207 	FetchHeadersCommand(IMAPFolder& folder, IMAPMailbox& mailbox,
208 		MessageUIDList& uids, int32 bodyFetchLimit)
209 		:
210 		fFolder(folder),
211 		fMailbox(mailbox),
212 		fUIDs(uids),
213 		fBodyFetchLimit(bodyFetchLimit)
214 	{
215 	}
216 
217 	virtual status_t Process(IMAPConnectionWorker& worker)
218 	{
219 		IMAP::Protocol& protocol = WorkerPrivate(worker).Protocol();
220 
221 		status_t status = WorkerPrivate(worker).SelectMailbox(fFolder);
222 		if (status != B_OK)
223 			return status;
224 
225 		printf("IMAP: fetch %" B_PRIuSIZE "u headers\n", fUIDs.size());
226 
227 		IMAP::FetchCommand fetch(fUIDs, kMaxFetchEntries,
228 			IMAP::kFetchHeader | IMAP::kFetchFlags);
229 		fetch.SetListener(this);
230 
231 		status = protocol.ProcessCommand(fetch);
232 		if (status != B_OK)
233 			return status;
234 
235 		if (IsDone() && !fFetchBodies.empty()) {
236 			// Enqueue command to fetch the message bodies
237 			WorkerPrivate(worker).EnqueueCommand(new FetchBodiesCommand(fFolder,
238 				fMailbox, fFetchBodies));
239 		}
240 
241 		return B_OK;
242 	}
243 
244 	virtual bool IsDone() const
245 	{
246 		return fUIDs.empty();
247 	}
248 
249 	virtual bool FetchData(uint32 fetchFlags, BDataIO& stream, size_t& length)
250 	{
251 		fFetchStatus = fFolder.StoreMessage(fetchFlags, stream, length,
252 			fRef, fFile);
253 		return true;
254 	}
255 
256 	virtual void FetchedData(uint32 fetchFlags, uint32 uid, uint32 flags)
257 	{
258 		if (fFetchStatus == B_OK) {
259 			fFolder.MessageStored(fRef, fFile, fetchFlags, uid, flags);
260 
261 			uint32 size = fMailbox.MessageSize(uid);
262 			if (fBodyFetchLimit < 0 || size < fBodyFetchLimit)
263 				fFetchBodies.push_back(uid);
264 		}
265 	}
266 
267 private:
268 	IMAPFolder&				fFolder;
269 	IMAPMailbox&			fMailbox;
270 	MessageUIDList			fUIDs;
271 	MessageUIDList			fFetchBodies;
272 	uint32					fBodyFetchLimit;
273 	entry_ref				fRef;
274 	BFile					fFile;
275 	status_t				fFetchStatus;
276 };
277 
278 
279 class CheckMailboxesCommand : public SyncCommand {
280 public:
281 	CheckMailboxesCommand(IMAPConnectionWorker& worker)
282 		:
283 		fWorker(worker),
284 		fFolders(5, false),
285 		fState(INIT),
286 		fFolder(NULL),
287 		fMailbox(NULL)
288 	{
289 	}
290 
291 	virtual status_t Process(IMAPConnectionWorker& worker)
292 	{
293 		IMAP::Protocol& protocol = WorkerPrivate(worker).Protocol();
294 
295 		if (fState == INIT) {
296 			// Collect folders
297 			status_t status = WorkerPrivate(worker).AddFolders(fFolders);
298 			if (status != B_OK || fFolders.IsEmpty()) {
299 				fState = DONE;
300 				return status;
301 			}
302 
303 			fState = SELECT;
304 		}
305 
306 		if (fState == SELECT) {
307 			// Get next mailbox from list, and select it
308 			fFolder = fFolders.RemoveItemAt(fFolders.CountItems() - 1);
309 			if (fFolder == NULL) {
310 				for (int32 i = 0; i < fFetchCommands.CountItems(); i++) {
311 					WorkerPrivate(worker).EnqueueCommand(
312 						fFetchCommands.ItemAt(i));
313 				}
314 
315 				fState = DONE;
316 				return B_OK;
317 			}
318 
319 			fMailbox = WorkerPrivate(worker).MailboxFor(*fFolder);
320 
321 			status_t status = WorkerPrivate(worker).SelectMailbox(*fFolder);
322 			if (status != B_OK)
323 				return status;
324 
325 			fLastIndex = WorkerPrivate(worker).MessagesExist();
326 			fFirstIndex = fMailbox->CountMessages() + 1;
327 			if (fLastIndex > 0)
328 				fState = FETCH_ENTRIES;
329 		}
330 
331 		if (fState == FETCH_ENTRIES) {
332 			status_t status = WorkerPrivate(worker).SelectMailbox(*fFolder);
333 			if (status != B_OK)
334 				return status;
335 
336 			uint32 to = fLastIndex;
337 			uint32 from = fFirstIndex + kMaxFetchEntries < to
338 				? fLastIndex - kMaxFetchEntries : fFirstIndex;
339 
340 			printf("IMAP: get entries from %" B_PRIu32 " to %" B_PRIu32 "\n",
341 				from, to);
342 
343 			IMAP::MessageEntryList entries;
344 			IMAP::FetchMessageEntriesCommand fetch(entries, from, to, false);
345 			status = protocol.ProcessCommand(fetch);
346 			if (status != B_OK)
347 				return status;
348 
349 			std::vector<uint32> uidsToFetch;
350 
351 			// Determine how much we need to download
352 			// TODO: also retrieve the header size, and only take the body
353 			// size into account if it's below the limit -- that does not
354 			// seem to be possible, though
355 			for (size_t i = 0; i < entries.size(); i++) {
356 				printf("%10" B_PRIu32 " %8" B_PRIu32 " bytes, flags: %#"
357 					B_PRIx32 "\n", entries[i].uid, entries[i].size,
358 					entries[i].flags);
359 				fMailbox->AddMessageEntry(from + i, entries[i].uid,
360 					entries[i].flags, entries[i].size);
361 
362 				if (entries[i].uid > fFolder->LastUID()) {
363 					fTotalBytes += entries[i].size;
364 					fUIDsToFetch.push_back(entries[i].uid);
365 				}
366 			}
367 
368 			fTotalEntries += fUIDsToFetch.size();
369 			fLastIndex = from - 1;
370 
371 			if (from == 1) {
372 				if (fUIDsToFetch.size() > 0) {
373 					// Add pending command to fetch the message headers
374 					WorkerCommand* command = new FetchHeadersCommand(*fFolder,
375 						*fMailbox, fUIDsToFetch,
376 						WorkerPrivate(worker).BodyFetchLimit());
377 					if (!fFetchCommands.AddItem(command))
378 						delete command;
379 
380 					fUIDsToFetch.clear();
381 				}
382 				fState = SELECT;
383 			}
384 		}
385 
386 		return B_OK;
387 	}
388 
389 	virtual bool IsDone() const
390 	{
391 		return fState == DONE;
392 	}
393 
394 private:
395 	enum State {
396 		INIT,
397 		SELECT,
398 		FETCH_ENTRIES,
399 		DONE
400 	};
401 
402 	IMAPConnectionWorker&	fWorker;
403 	BObjectList<IMAPFolder>	fFolders;
404 	State					fState;
405 	IMAPFolder*				fFolder;
406 	IMAPMailbox*			fMailbox;
407 	uint32					fFirstIndex;
408 	uint32					fLastIndex;
409 	uint64					fTotalEntries;
410 	uint64					fTotalBytes;
411 	WorkerCommandList		fFetchCommands;
412 	MessageUIDList			fUIDsToFetch;
413 };
414 
415 
416 struct CommandDelete
417 {
418 	inline void operator()(WorkerCommand* command)
419 	{
420 		delete command;
421 	}
422 };
423 
424 
425 /*!	An auto deleter similar to ObjectDeleter that calls SyncCommandDone()
426 	for all SyncCommands.
427 */
428 struct CommandDeleter : BPrivate::AutoDeleter<WorkerCommand, CommandDelete>
429 {
430 	CommandDeleter(IMAPConnectionWorker& worker, WorkerCommand* command)
431 		:
432 		BPrivate::AutoDeleter<WorkerCommand, CommandDelete>(command),
433 		fWorker(worker)
434 	{
435 	}
436 
437 	~CommandDeleter()
438 	{
439 		if (dynamic_cast<SyncCommand*>(fObject) != NULL)
440 			WorkerPrivate(fWorker).SyncCommandDone();
441 	}
442 
443 private:
444 	IMAPConnectionWorker&	fWorker;
445 };
446 
447 
448 // #pragma mark -
449 
450 
451 IMAPConnectionWorker::IMAPConnectionWorker(IMAPProtocol& owner,
452 	const Settings& settings, bool main)
453 	:
454 	fOwner(owner),
455 	fSettings(settings),
456 	fPendingCommandsSemaphore(-1),
457 	fIdleBox(NULL),
458 	fMain(main),
459 	fStopped(false)
460 {
461 	fExistsHandler.SetListener(this);
462 	fProtocol.AddHandler(fExistsHandler);
463 
464 	fExpungeHandler.SetListener(this);
465 	fProtocol.AddHandler(fExpungeHandler);
466 }
467 
468 
469 IMAPConnectionWorker::~IMAPConnectionWorker()
470 {
471 	puts("worker quit");
472 	delete_sem(fPendingCommandsSemaphore);
473 	_Disconnect();
474 }
475 
476 
477 bool
478 IMAPConnectionWorker::HasMailboxes() const
479 {
480 	BAutolock locker(const_cast<IMAPConnectionWorker*>(this)->fLocker);
481 	return !fMailboxes.empty();
482 }
483 
484 
485 uint32
486 IMAPConnectionWorker::CountMailboxes() const
487 {
488 	BAutolock locker(const_cast<IMAPConnectionWorker*>(this)->fLocker);
489 	return fMailboxes.size();
490 }
491 
492 
493 void
494 IMAPConnectionWorker::AddMailbox(IMAPFolder* folder)
495 {
496 	BAutolock locker(fLocker);
497 
498 	fMailboxes.insert(std::make_pair(folder, (IMAPMailbox*)NULL));
499 
500 	// Prefer to have the INBOX in idle mode over other mail boxes
501 	if (fIdleBox == NULL || folder->MailboxName().ICompare("INBOX") == 0)
502 		fIdleBox = folder;
503 }
504 
505 
506 void
507 IMAPConnectionWorker::RemoveAllMailboxes()
508 {
509 	BAutolock locker(fLocker);
510 
511 	// Reset listeners, and delete the mailboxes
512 	MailboxMap::iterator iterator = fMailboxes.begin();
513 	for (; iterator != fMailboxes.end(); iterator++) {
514 		iterator->first->SetListener(NULL);
515 		delete iterator->second;
516 	}
517 
518 	fIdleBox = NULL;
519 	fMailboxes.clear();
520 }
521 
522 
523 status_t
524 IMAPConnectionWorker::Run()
525 {
526 	fPendingCommandsSemaphore = create_sem(0, "imap pending commands");
527 	if (fPendingCommandsSemaphore < 0)
528 		return fPendingCommandsSemaphore;
529 
530 	fThread = spawn_thread(&_Worker, "imap connection worker",
531 		B_NORMAL_PRIORITY, this);
532 	if (fThread < 0)
533 		return fThread;
534 
535 	resume_thread(fThread);
536 	return B_OK;
537 }
538 
539 
540 void
541 IMAPConnectionWorker::Quit()
542 {
543 	printf("IMAP: worker %p: enqueue quit\n", this);
544 	_EnqueueCommand(new QuitCommand());
545 }
546 
547 
548 status_t
549 IMAPConnectionWorker::EnqueueCheckSubscribedFolders()
550 {
551 	printf("IMAP: worker %p: enqueue check subscribed folders\n", this);
552 	return _EnqueueCommand(new CheckSubscribedFoldersCommand());
553 }
554 
555 
556 status_t
557 IMAPConnectionWorker::EnqueueCheckMailboxes()
558 {
559 	// Do not schedule checking mailboxes again if we're still working on
560 	// those.
561 	if (fSyncPending > 0)
562 		return B_OK;
563 
564 	printf("IMAP: worker %p: enqueue check mailboxes\n", this);
565 	return _EnqueueCommand(new CheckMailboxesCommand(*this));
566 }
567 
568 
569 status_t
570 IMAPConnectionWorker::EnqueueRetrieveMail(entry_ref& ref)
571 {
572 	return B_OK;
573 }
574 
575 
576 // #pragma mark - Handler listener
577 
578 
579 void
580 IMAPConnectionWorker::MessageExistsReceived(uint32 count)
581 {
582 	printf("Message exists: %" B_PRIu32 "\n", count);
583 	fMessagesExist = count;
584 
585 	// TODO: We might want to trigger another check even during sync
586 	// (but only one), if this isn't the result of a SELECT
587 	EnqueueCheckMailboxes();
588 }
589 
590 
591 void
592 IMAPConnectionWorker::MessageExpungeReceived(uint32 index)
593 {
594 	printf("Message expunge: %" B_PRIu32 "\n", index);
595 	if (fSelectedBox == NULL)
596 		return;
597 
598 	IMAPMailbox* mailbox = _MailboxFor(*fSelectedBox);
599 	if (mailbox != NULL) {
600 		mailbox->RemoveMessageEntry(index);
601 		// TODO: remove message from folder
602 	}
603 }
604 
605 
606 // #pragma mark - private
607 
608 
609 status_t
610 IMAPConnectionWorker::_Worker()
611 {
612 	while (!fStopped) {
613 		BAutolock locker(fLocker);
614 
615 		if (fPendingCommands.IsEmpty()) {
616 			_Disconnect();
617 			locker.Unlock();
618 
619 			_WaitForCommands();
620 			continue;
621 		}
622 
623 		WorkerCommand* command = fPendingCommands.RemoveItemAt(0);
624 		if (command == NULL)
625 			continue;
626 
627 		CommandDeleter deleter(*this, command);
628 
629 		status_t status = _Connect();
630 		if (status != B_OK)
631 			return status;
632 
633 		status = command->Process(*this);
634 		if (status != B_OK)
635 			return status;
636 
637 		if (!command->IsDone()) {
638 			deleter.Detach();
639 			_EnqueueCommand(command);
640 		}
641 	}
642 
643 	fOwner.WorkerQuit(this);
644 	return B_OK;
645 }
646 
647 
648 /*!	Enqueues the given command to the worker queue. This method will take
649 	over ownership of the given command even in the error case.
650 */
651 status_t
652 IMAPConnectionWorker::_EnqueueCommand(WorkerCommand* command)
653 {
654 	BAutolock locker(fLocker);
655 
656 	if (!fPendingCommands.AddItem(command)) {
657 		delete command;
658 		return B_NO_MEMORY;
659 	}
660 
661 	if (dynamic_cast<SyncCommand*>(command) != NULL)
662 		fSyncPending++;
663 
664 	locker.Unlock();
665 	release_sem(fPendingCommandsSemaphore);
666 	return B_OK;
667 }
668 
669 
670 void
671 IMAPConnectionWorker::_WaitForCommands()
672 {
673 	while (acquire_sem(fPendingCommandsSemaphore) == B_INTERRUPTED);
674 }
675 
676 
677 status_t
678 IMAPConnectionWorker::_SelectMailbox(IMAPFolder& folder, uint32* _nextUID)
679 {
680 	if (fSelectedBox == &folder && _nextUID == NULL)
681 		return B_OK;
682 
683 	IMAP::SelectCommand select(folder.MailboxName().String());
684 
685 	status_t status = fProtocol.ProcessCommand(select);
686 	if (status == B_OK) {
687 		folder.SetUIDValidity(select.UIDValidity());
688 		if (_nextUID != NULL)
689 			*_nextUID = select.NextUID();
690 		fSelectedBox = &folder;
691 	}
692 
693 	return status;
694 }
695 
696 
697 IMAPMailbox*
698 IMAPConnectionWorker::_MailboxFor(IMAPFolder& folder)
699 {
700 	MailboxMap::iterator found = fMailboxes.find(&folder);
701 	if (found == fMailboxes.end())
702 		return NULL;
703 
704 	IMAPMailbox* mailbox = found->second;
705 	if (mailbox == NULL) {
706 		mailbox = new IMAPMailbox(fProtocol, folder.MailboxName());
707 		folder.SetListener(mailbox);
708 		found->second = mailbox;
709 	}
710 	return mailbox;
711 }
712 
713 
714 void
715 IMAPConnectionWorker::_SyncCommandDone()
716 {
717 	fSyncPending--;
718 }
719 
720 
721 status_t
722 IMAPConnectionWorker::_Connect()
723 {
724 	if (fProtocol.IsConnected())
725 		return B_OK;
726 
727 	status_t status;
728 	int tries = 6;
729 	while (tries-- > 0) {
730 		status = fProtocol.Connect(fSettings.ServerAddress(),
731 			fSettings.Username(), fSettings.Password(), fSettings.UseSSL());
732 		if (status == B_OK)
733 			break;
734 
735 		// Wait for 10 seconds, and try again
736 		snooze(10000000);
737 	}
738 	// TODO: if other workers are connected, but it fails for us, we need to
739 	// remove this worker, and reduce the number of concurrent connections
740 	if (status != B_OK)
741 		return status;
742 
743 	fIdle = fSettings.IdleMode() && fProtocol.Capabilities().Contains("IDLE");
744 	return B_OK;
745 }
746 
747 
748 void
749 IMAPConnectionWorker::_Disconnect()
750 {
751 	fProtocol.Disconnect();
752 }
753 
754 
755 /*static*/ status_t
756 IMAPConnectionWorker::_Worker(void* _self)
757 {
758 	IMAPConnectionWorker* self = (IMAPConnectionWorker*)_self;
759 	status_t status = self->_Worker();
760 
761 	delete self;
762 	return status;
763 }
764