xref: /haiku/src/libs/compat/freebsd_network/taskqueue.c (revision 52f7c9389475e19fc21487b38064b4390eeb6fea)
1 /*
2  * Copyright 2009, Colin Günther, coling@gmx.de
3  * Copyright 2007, Hugo Santos. All Rights Reserved.
4  * Distributed under the terms of the MIT License.
5  *
6  * Authors:
7  *      Hugo Santos, hugosantos@gmail.com
8  */
9 
10 
11 #include "device.h"
12 
13 #include <stdio.h>
14 
15 #include <compat/sys/callout.h>
16 #include <compat/sys/taskqueue.h>
17 #include <compat/sys/haiku-module.h>
18 
19 
20 #define TQ_FLAGS_ACTIVE		(1 << 0)
21 #define TQ_FLAGS_BLOCKED	(1 << 1)
22 #define TQ_FLAGS_PENDING	(1 << 2)
23 
24 #define	DT_CALLOUT_ARMED		(1 << 0)
25 #define	DT_DRAIN_IN_PROGRESS	(1 << 1)
26 
27 struct taskqueue {
28 	char tq_name[TASKQUEUE_NAMELEN];
29 	struct mtx tq_mutex;
30 	struct list tq_list;
31 	taskqueue_enqueue_fn tq_enqueue;
32 	void *tq_arg;
33 	int tq_fast;
34 	sem_id tq_sem;
35 	thread_id *tq_threads;
36 	thread_id tq_thread_storage;
37 	int tq_threadcount;
38 	int tq_flags;
39 	int tq_callouts;
40 };
41 
42 struct taskqueue *taskqueue_fast = NULL;
43 struct taskqueue *taskqueue_swi = NULL;
44 struct taskqueue *taskqueue_thread = NULL;
45 
46 
47 static struct taskqueue *
48 _taskqueue_create(const char *name, int mflags, int fast,
49 	taskqueue_enqueue_fn enqueueFunction, void *context)
50 {
51 	struct taskqueue *tq = malloc(sizeof(struct taskqueue));
52 	if (tq == NULL)
53 		return NULL;
54 
55 	tq->tq_fast = fast;
56 
57 	if (fast) {
58 		mtx_init(&tq->tq_mutex, name, NULL, MTX_SPIN);
59 	} else {
60 		mtx_init(&tq->tq_mutex, name, NULL, MTX_DEF);
61 	}
62 
63 	strlcpy(tq->tq_name, name, sizeof(tq->tq_name));
64 	list_init_etc(&tq->tq_list, offsetof(struct task, ta_link));
65 	tq->tq_enqueue = enqueueFunction;
66 	tq->tq_arg = context;
67 
68 	tq->tq_sem = -1;
69 	tq->tq_threads = NULL;
70 	tq->tq_threadcount = 0;
71 	tq->tq_flags = TQ_FLAGS_ACTIVE;
72 	tq->tq_callouts = 0;
73 
74 	return tq;
75 }
76 
77 
78 static void
79 tq_lock(struct taskqueue *taskQueue)
80 {
81 	if (taskQueue->tq_fast) {
82 		mtx_lock_spin(&taskQueue->tq_mutex);
83 	} else {
84 		mtx_lock(&taskQueue->tq_mutex);
85 	}
86 }
87 
88 
89 static void
90 tq_unlock(struct taskqueue *taskQueue)
91 {
92 	if (taskQueue->tq_fast) {
93 		mtx_unlock_spin(&taskQueue->tq_mutex);
94 	} else {
95 		mtx_unlock(&taskQueue->tq_mutex);
96 	}
97 }
98 
99 
100 struct taskqueue *
101 taskqueue_create(const char *name, int mflags,
102 	taskqueue_enqueue_fn enqueueFunction, void *context)
103 {
104 	return _taskqueue_create(name, mflags, 0, enqueueFunction, context);
105 }
106 
107 
108 static int32
109 tq_handle_thread(void *data)
110 {
111 	struct taskqueue *tq = data;
112 	struct task *t;
113 	int pending;
114 	sem_id sem;
115 
116 	/* just a synchronization point */
117 	tq_lock(tq);
118 	sem = tq->tq_sem;
119 	tq_unlock(tq);
120 
121 	while (acquire_sem(sem) == B_NO_ERROR) {
122 		tq_lock(tq);
123 		t = list_remove_head_item(&tq->tq_list);
124 		tq_unlock(tq);
125 		if (t == NULL)
126 			continue;
127 		pending = t->ta_pending;
128 		t->ta_pending = 0;
129 
130 		t->ta_handler(t->ta_argument, pending);
131 	}
132 
133 	return 0;
134 }
135 
136 
137 static int
138 _taskqueue_start_threads(struct taskqueue **taskQueue, int count, int priority,
139 	const char *name)
140 {
141 	struct taskqueue *tq = (*taskQueue);
142 	int i, j;
143 
144 	if (count == 0)
145 		return -1;
146 
147 	if (tq->tq_threads != NULL)
148 		return -1;
149 
150 	if (count == 1) {
151 		tq->tq_threads = &tq->tq_thread_storage;
152 	} else {
153 		tq->tq_threads = malloc(sizeof(thread_id) * count);
154 		if (tq->tq_threads == NULL)
155 			return B_NO_MEMORY;
156 	}
157 
158 	tq->tq_sem = create_sem(0, tq->tq_name);
159 	if (tq->tq_sem < B_OK) {
160 		if (count > 1)
161 			free(tq->tq_threads);
162 		tq->tq_threads = NULL;
163 		return tq->tq_sem;
164 	}
165 
166 	for (i = 0; i < count; i++) {
167 		tq->tq_threads[i] = spawn_kernel_thread(tq_handle_thread, tq->tq_name,
168 			priority, tq);
169 		if (tq->tq_threads[i] < B_OK) {
170 			status_t status = tq->tq_threads[i];
171 			for (j = 0; j < i; j++)
172 				kill_thread(tq->tq_threads[j]);
173 			if (count > 1)
174 				free(tq->tq_threads);
175 			tq->tq_threads = NULL;
176 			delete_sem(tq->tq_sem);
177 			return status;
178 		}
179 	}
180 
181 	tq->tq_threadcount = count;
182 
183 	for (i = 0; i < count; i++)
184 		resume_thread(tq->tq_threads[i]);
185 
186 	return 0;
187 }
188 
189 
190 int
191 taskqueue_start_threads(struct taskqueue **taskQueue, int count, int priority,
192 	const char *format, ...)
193 {
194 	/* we assume that start_threads is called in a sane place, and thus
195 	 * don't need to be locked. This is mostly due to the fact that if
196 	 * the TQ is 'fast', locking the TQ disables interrupts... and then
197 	 * we can't create semaphores, threads and bananas. */
198 
199 	char name[64];
200 	int result;
201 	va_list vl;
202 
203 	va_start(vl, format);
204 	vsnprintf(name, sizeof(name), format, vl);
205 	va_end(vl);
206 
207 	/*tq_lock(*tqp);*/
208 	result = _taskqueue_start_threads(taskQueue, count, priority, name);
209 	/*tq_unlock(*tqp);*/
210 
211 	return result;
212 }
213 
214 
215 void
216 taskqueue_free(struct taskqueue *taskQueue)
217 {
218 	if (taskQueue == NULL)
219 		return;
220 
221 	/* lock and  drain list? */
222 	taskQueue->tq_flags &= ~TQ_FLAGS_ACTIVE;
223 	if (!taskQueue->tq_fast)
224 		mtx_destroy(&taskQueue->tq_mutex);
225 	if (taskQueue->tq_sem != -1) {
226 		int i;
227 
228 		delete_sem(taskQueue->tq_sem);
229 
230 		for (i = 0; i < taskQueue->tq_threadcount; i++) {
231 			status_t status;
232 			wait_for_thread(taskQueue->tq_threads[i], &status);
233 		}
234 
235 		if (taskQueue->tq_threadcount > 1)
236 			free(taskQueue->tq_threads);
237 	}
238 
239 	free(taskQueue);
240 }
241 
242 
243 void
244 taskqueue_drain(struct taskqueue *taskQueue, struct task *task)
245 {
246 	if (taskQueue == NULL)
247 		return;
248 
249 	tq_lock(taskQueue);
250 	while (task->ta_pending != 0) {
251 		tq_unlock(taskQueue);
252 		snooze(0);
253 		tq_lock(taskQueue);
254 	}
255 	tq_unlock(taskQueue);
256 }
257 
258 
259 void
260 taskqueue_drain_timeout(struct taskqueue *queue,
261 	struct timeout_task *timeout_task)
262 {
263 	/*
264 	 * Set flag to prevent timer from re-starting during drain:
265 	 */
266 	tq_lock(queue);
267 	KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0,
268 		("Drain already in progress"));
269 	timeout_task->f |= DT_DRAIN_IN_PROGRESS;
270 	tq_unlock(queue);
271 
272 	callout_drain(&timeout_task->c);
273 	taskqueue_drain(queue, &timeout_task->t);
274 
275 	/*
276 	 * Clear flag to allow timer to re-start:
277 	 */
278 	tq_lock(queue);
279 	timeout_task->f &= ~DT_DRAIN_IN_PROGRESS;
280 	tq_unlock(queue);
281 }
282 
283 
284 static void
285 taskqueue_task_nop_fn(void* context, int pending)
286 {
287 }
288 
289 
290 void
291 taskqueue_drain_all(struct taskqueue *taskQueue)
292 {
293 	struct task t_barrier;
294 
295 	if (taskQueue == NULL) {
296 		printf("taskqueue_drain_all called with NULL taskqueue\n");
297 		return;
298 	}
299 
300 	TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier);
301 	taskqueue_enqueue(taskQueue, &t_barrier);
302 	taskqueue_drain(taskQueue, &t_barrier);
303 }
304 
305 
306 static void
307 taskqueue_enqueue_locked(struct taskqueue *taskQueue, struct task *task)
308 {
309 	/* we don't really support priorities */
310 	if (task->ta_pending) {
311 		task->ta_pending++;
312 	} else {
313 		list_add_item(&taskQueue->tq_list, task);
314 		task->ta_pending = 1;
315 		if ((taskQueue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
316 			taskQueue->tq_enqueue(taskQueue->tq_arg);
317 		else
318 			taskQueue->tq_flags |= TQ_FLAGS_PENDING;
319 	}
320 	tq_unlock(taskQueue);
321 }
322 
323 
324 int
325 taskqueue_enqueue(struct taskqueue *taskQueue, struct task *task)
326 {
327 	tq_lock(taskQueue);
328 	taskqueue_enqueue_locked(taskQueue, task);
329 	/* The lock is released inside. */
330 
331 	return 0;
332 }
333 
334 
335 static void
336 taskqueue_timeout_func(void *arg)
337 {
338 	struct taskqueue *queue;
339 	struct timeout_task *timeout_task;
340 
341 	timeout_task = arg;
342 	queue = timeout_task->q;
343 	KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0,
344 		("stray timeout ('%s')", timeout_task->q->tq_name));
345 	timeout_task->f &= ~DT_CALLOUT_ARMED;
346 	queue->tq_callouts--;
347 	taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
348 	/* The lock is released inside. */
349 }
350 
351 
352 int
353 taskqueue_enqueue_timeout(struct taskqueue *queue,
354 	struct timeout_task *ttask, int _ticks)
355 {
356 	int res;
357 
358 	tq_lock(queue);
359 	KASSERT(ttask->q == NULL || ttask->q == queue,
360 		("Migrated queue"));
361 	ttask->q = queue;
362 	res = ttask->t.ta_pending;
363 	if (ttask->f & DT_DRAIN_IN_PROGRESS) {
364 		/* Do nothing */
365 		tq_unlock(queue);
366 		res = -1;
367 	} else if (_ticks == 0) {
368 		tq_unlock(queue);
369 		taskqueue_enqueue(queue, &ttask->t);
370 	} else {
371 		if ((ttask->f & DT_CALLOUT_ARMED) != 0) {
372 			res++;
373 		} else {
374 			queue->tq_callouts++;
375 			ttask->f |= DT_CALLOUT_ARMED;
376 			if (_ticks < 0)
377 				_ticks = -_ticks; /* Ignore overflow. */
378 		}
379 		tq_unlock(queue);
380 		if (_ticks > 0) {
381 			callout_reset(&ttask->c, _ticks,
382 				taskqueue_timeout_func, ttask);
383 		}
384 	}
385 	return (res);
386 }
387 
388 
389 static int
390 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
391 	u_int *pendp)
392 {
393 	if (task->ta_pending > 0)
394 		list_remove_item(&queue->tq_list, task);
395 	if (pendp != NULL)
396 		*pendp = task->ta_pending;
397 	task->ta_pending = 0;
398 	return 0;
399 }
400 
401 
402 int
403 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
404 {
405 	int error;
406 
407 	tq_lock(queue);
408 	error = taskqueue_cancel_locked(queue, task, pendp);
409 	tq_unlock(queue);
410 
411 	return (error);
412 }
413 
414 
415 int
416 taskqueue_cancel_timeout(struct taskqueue *queue,
417 	struct timeout_task *timeout_task, u_int *pendp)
418 {
419 	u_int pending, pending1;
420 	int error;
421 
422 	tq_lock(queue);
423 	pending = !!(callout_stop(&timeout_task->c) > 0);
424 	error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
425 	if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
426 		timeout_task->f &= ~DT_CALLOUT_ARMED;
427 		queue->tq_callouts--;
428 	}
429 	tq_unlock(queue);
430 
431 	if (pendp != NULL)
432 		*pendp = pending + pending1;
433 	return (error);
434 }
435 
436 
437 void
438 taskqueue_block(struct taskqueue *taskQueue)
439 {
440 	if (taskQueue == NULL)
441 		return;
442 
443 	tq_lock(taskQueue);
444 	taskQueue->tq_flags |= TQ_FLAGS_BLOCKED;
445 	tq_unlock(taskQueue);
446 }
447 
448 
449 void
450 taskqueue_unblock(struct taskqueue *taskQueue)
451 {
452 	if (taskQueue == NULL)
453 		return;
454 
455 	tq_lock(taskQueue);
456 	taskQueue->tq_flags &= ~TQ_FLAGS_BLOCKED;
457 	if (taskQueue->tq_flags & TQ_FLAGS_PENDING) {
458 		taskQueue->tq_flags &= ~TQ_FLAGS_PENDING;
459 		taskQueue->tq_enqueue(taskQueue->tq_arg);
460 	}
461 	tq_unlock(taskQueue);
462 }
463 
464 
465 void
466 taskqueue_thread_enqueue(void *context)
467 {
468 	struct taskqueue **tqp = context;
469 	release_sem_etc((*tqp)->tq_sem, 1, B_DO_NOT_RESCHEDULE);
470 }
471 
472 
473 int
474 taskqueue_enqueue_fast(struct taskqueue *taskQueue, struct task *task)
475 {
476 	return taskqueue_enqueue(taskQueue, task);
477 }
478 
479 
480 struct taskqueue *
481 taskqueue_create_fast(const char *name, int mflags,
482 	taskqueue_enqueue_fn enqueueFunction, void *context)
483 {
484 	return _taskqueue_create(name, mflags, 1, enqueueFunction, context);
485 }
486 
487 
488 void
489 task_init(struct task *task, int prio, task_fn_t handler, void *context)
490 {
491 	task->ta_priority = prio;
492 	task->ta_handler = handler;
493 	task->ta_argument = context;
494 	task->ta_pending = 0;
495 }
496 
497 
498 void
499 timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
500 	int priority, task_fn_t func, void *context)
501 {
502 	TASK_INIT(&timeout_task->t, priority, func, context);
503 	callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
504 		CALLOUT_RETURNUNLOCKED);
505 	timeout_task->q = queue;
506 	timeout_task->f = 0;
507 }
508 
509 
510 status_t
511 init_taskqueues()
512 {
513 	status_t status = B_NO_MEMORY;
514 
515 	if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE)) {
516 		taskqueue_fast = taskqueue_create_fast("fast taskq", 0,
517 			taskqueue_thread_enqueue, &taskqueue_fast);
518 		if (taskqueue_fast == NULL)
519 			return B_NO_MEMORY;
520 
521 		status = taskqueue_start_threads(&taskqueue_fast, 1,
522 			B_REAL_TIME_PRIORITY, "fast taskq thread");
523 		if (status < B_OK)
524 			goto err_1;
525 	}
526 
527 	if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE)) {
528 		taskqueue_swi = taskqueue_create_fast("swi taskq", 0,
529 			taskqueue_thread_enqueue, &taskqueue_swi);
530 		if (taskqueue_swi == NULL) {
531 			status = B_NO_MEMORY;
532 			goto err_1;
533 		}
534 
535 		status = taskqueue_start_threads(&taskqueue_swi, 1,
536 			B_REAL_TIME_PRIORITY, "swi taskq");
537 		if (status < B_OK)
538 			goto err_2;
539 	}
540 
541 	if (HAIKU_DRIVER_REQUIRES(FBSD_THREAD_TASKQUEUE)) {
542 		taskqueue_thread = taskqueue_create_fast("thread taskq", 0,
543 			taskqueue_thread_enqueue, &taskqueue_thread);
544 		if (taskqueue_thread == NULL) {
545 			status = B_NO_MEMORY;
546 			goto err_2;
547 		}
548 
549 		status = taskqueue_start_threads(&taskqueue_thread, 1,
550 			B_REAL_TIME_PRIORITY, "swi taskq");
551 		if (status < B_OK)
552 			goto err_3;
553 	}
554 
555 	return B_OK;
556 
557 err_3:
558 	if (taskqueue_thread)
559 		taskqueue_free(taskqueue_thread);
560 
561 err_2:
562 	if (taskqueue_swi)
563 		taskqueue_free(taskqueue_swi);
564 
565 err_1:
566 	if (taskqueue_fast)
567 		taskqueue_free(taskqueue_fast);
568 
569 	return status;
570 }
571 
572 
573 void
574 uninit_taskqueues()
575 {
576 	if (HAIKU_DRIVER_REQUIRES(FBSD_THREAD_TASKQUEUE))
577 		taskqueue_free(taskqueue_thread);
578 
579 	if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE))
580 		taskqueue_free(taskqueue_swi);
581 
582 	if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE))
583 		taskqueue_free(taskqueue_fast);
584 }
585