xref: /haiku/src/libs/compat/freebsd_network/taskqueue.c (revision 5629675a326ecf2ff3fd23f154beb525c171048d)
1 /*
2  * Copyright 2009, Colin Günther, coling@gmx.de
3  * Copyright 2007, Hugo Santos. All Rights Reserved.
4  * Distributed under the terms of the MIT License.
5  *
6  * Authors:
7  *      Hugo Santos, hugosantos@gmail.com
8  */
9 
10 
11 #include "device.h"
12 
13 #include <stdio.h>
14 
15 #include <compat/sys/callout.h>
16 #include <compat/sys/taskqueue.h>
17 #include <compat/sys/haiku-module.h>
18 
19 
20 #define TQ_FLAGS_ACTIVE		(1 << 0)
21 #define TQ_FLAGS_BLOCKED	(1 << 1)
22 #define TQ_FLAGS_PENDING	(1 << 2)
23 
24 #define	DT_CALLOUT_ARMED		(1 << 0)
25 #define	DT_DRAIN_IN_PROGRESS	(1 << 1)
26 
27 struct taskqueue {
28 	char tq_name[TASKQUEUE_NAMELEN];
29 	struct mtx tq_mutex;
30 	struct list tq_list;
31 	taskqueue_enqueue_fn tq_enqueue;
32 	void *tq_arg;
33 	int tq_fast;
34 	spinlock tq_spinlock;
35 	sem_id tq_sem;
36 	thread_id *tq_threads;
37 	thread_id tq_thread_storage;
38 	int tq_threadcount;
39 	int tq_flags;
40 	int tq_callouts;
41 };
42 
43 struct taskqueue *taskqueue_fast = NULL;
44 struct taskqueue *taskqueue_swi = NULL;
45 struct taskqueue *taskqueue_thread = NULL;
46 
47 
48 static struct taskqueue *
49 _taskqueue_create(const char *name, int mflags, int fast,
50 	taskqueue_enqueue_fn enqueueFunction, void *context)
51 {
52 	struct taskqueue *tq = malloc(sizeof(struct taskqueue));
53 	if (tq == NULL)
54 		return NULL;
55 
56 	tq->tq_fast = fast;
57 
58 	if (fast) {
59 		B_INITIALIZE_SPINLOCK(&tq->tq_spinlock);
60 	} else {
61 		mtx_init(&tq->tq_mutex, name, NULL, MTX_DEF);
62 	}
63 
64 	strlcpy(tq->tq_name, name, sizeof(tq->tq_name));
65 	list_init_etc(&tq->tq_list, offsetof(struct task, ta_link));
66 	tq->tq_enqueue = enqueueFunction;
67 	tq->tq_arg = context;
68 
69 	tq->tq_sem = -1;
70 	tq->tq_threads = NULL;
71 	tq->tq_threadcount = 0;
72 	tq->tq_flags = TQ_FLAGS_ACTIVE;
73 	tq->tq_callouts = 0;
74 
75 	return tq;
76 }
77 
78 
79 static void
80 tq_lock(struct taskqueue *taskQueue, cpu_status *status)
81 {
82 	if (taskQueue->tq_fast) {
83 		*status = disable_interrupts();
84 		acquire_spinlock(&taskQueue->tq_spinlock);
85 	} else {
86 		mtx_lock(&taskQueue->tq_mutex);
87 	}
88 }
89 
90 
91 static void
92 tq_unlock(struct taskqueue *taskQueue, cpu_status status)
93 {
94 	if (taskQueue->tq_fast) {
95 		release_spinlock(&taskQueue->tq_spinlock);
96 		restore_interrupts(status);
97 	} else {
98 		mtx_unlock(&taskQueue->tq_mutex);
99 	}
100 }
101 
102 
103 struct taskqueue *
104 taskqueue_create(const char *name, int mflags,
105 	taskqueue_enqueue_fn enqueueFunction, void *context)
106 {
107 	return _taskqueue_create(name, mflags, 0, enqueueFunction, context);
108 }
109 
110 
111 static int32
112 tq_handle_thread(void *data)
113 {
114 	struct taskqueue *tq = data;
115 	cpu_status cpu_state;
116 	struct task *t;
117 	int pending;
118 	sem_id sem;
119 
120 	/* just a synchronization point */
121 	tq_lock(tq, &cpu_state);
122 	sem = tq->tq_sem;
123 	tq_unlock(tq, cpu_state);
124 
125 	while (acquire_sem(sem) == B_NO_ERROR) {
126 		tq_lock(tq, &cpu_state);
127 		t = list_remove_head_item(&tq->tq_list);
128 		tq_unlock(tq, cpu_state);
129 		if (t == NULL)
130 			continue;
131 		pending = t->ta_pending;
132 		t->ta_pending = 0;
133 
134 		t->ta_handler(t->ta_argument, pending);
135 	}
136 
137 	return 0;
138 }
139 
140 
141 static int
142 _taskqueue_start_threads(struct taskqueue **taskQueue, int count, int priority,
143 	const char *name)
144 {
145 	struct taskqueue *tq = (*taskQueue);
146 	int i, j;
147 
148 	if (count == 0)
149 		return -1;
150 
151 	if (tq->tq_threads != NULL)
152 		return -1;
153 
154 	if (count == 1) {
155 		tq->tq_threads = &tq->tq_thread_storage;
156 	} else {
157 		tq->tq_threads = malloc(sizeof(thread_id) * count);
158 		if (tq->tq_threads == NULL)
159 			return B_NO_MEMORY;
160 	}
161 
162 	tq->tq_sem = create_sem(0, tq->tq_name);
163 	if (tq->tq_sem < B_OK) {
164 		if (count > 1)
165 			free(tq->tq_threads);
166 		tq->tq_threads = NULL;
167 		return tq->tq_sem;
168 	}
169 
170 	for (i = 0; i < count; i++) {
171 		tq->tq_threads[i] = spawn_kernel_thread(tq_handle_thread, tq->tq_name,
172 			priority, tq);
173 		if (tq->tq_threads[i] < B_OK) {
174 			status_t status = tq->tq_threads[i];
175 			for (j = 0; j < i; j++)
176 				kill_thread(tq->tq_threads[j]);
177 			if (count > 1)
178 				free(tq->tq_threads);
179 			tq->tq_threads = NULL;
180 			delete_sem(tq->tq_sem);
181 			return status;
182 		}
183 	}
184 
185 	tq->tq_threadcount = count;
186 
187 	for (i = 0; i < count; i++)
188 		resume_thread(tq->tq_threads[i]);
189 
190 	return 0;
191 }
192 
193 
194 int
195 taskqueue_start_threads(struct taskqueue **taskQueue, int count, int priority,
196 	const char *format, ...)
197 {
198 	/* we assume that start_threads is called in a sane place, and thus
199 	 * don't need to be locked. This is mostly due to the fact that if
200 	 * the TQ is 'fast', locking the TQ disables interrupts... and then
201 	 * we can't create semaphores, threads and bananas. */
202 
203 	/* cpu_status state; */
204 	char name[64];
205 	int result;
206 	va_list vl;
207 
208 	va_start(vl, format);
209 	vsnprintf(name, sizeof(name), format, vl);
210 	va_end(vl);
211 
212 	/*tq_lock(*tqp, &state);*/
213 	result = _taskqueue_start_threads(taskQueue, count, priority, name);
214 	/*tq_unlock(*tqp, state);*/
215 
216 	return result;
217 }
218 
219 
220 void
221 taskqueue_free(struct taskqueue *taskQueue)
222 {
223 	if (taskQueue == NULL) {
224 		printf("taskqueue_free called with NULL taskqueue\n");
225 		return;
226 	}
227 
228 	/* lock and  drain list? */
229 	taskQueue->tq_flags &= ~TQ_FLAGS_ACTIVE;
230 	if (!taskQueue->tq_fast)
231 		mtx_destroy(&taskQueue->tq_mutex);
232 	if (taskQueue->tq_sem != -1) {
233 		int i;
234 
235 		delete_sem(taskQueue->tq_sem);
236 
237 		for (i = 0; i < taskQueue->tq_threadcount; i++) {
238 			status_t status;
239 			wait_for_thread(taskQueue->tq_threads[i], &status);
240 		}
241 
242 		if (taskQueue->tq_threadcount > 1)
243 			free(taskQueue->tq_threads);
244 	}
245 
246 	free(taskQueue);
247 }
248 
249 
250 void
251 taskqueue_drain(struct taskqueue *taskQueue, struct task *task)
252 {
253 	cpu_status status;
254 
255 	if (taskQueue == NULL) {
256 		printf("taskqueue_drain called with NULL taskqueue\n");
257 		return;
258 	}
259 
260 	tq_lock(taskQueue, &status);
261 	while (task->ta_pending != 0) {
262 		tq_unlock(taskQueue, status);
263 		snooze(0);
264 		tq_lock(taskQueue, &status);
265 	}
266 	tq_unlock(taskQueue, status);
267 }
268 
269 
270 void
271 taskqueue_drain_timeout(struct taskqueue *queue,
272 	struct timeout_task *timeout_task)
273 {
274 	cpu_status status;
275 	/*
276 	 * Set flag to prevent timer from re-starting during drain:
277 	 */
278 	tq_lock(queue, &status);
279 	KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0,
280 		("Drain already in progress"));
281 	timeout_task->f |= DT_DRAIN_IN_PROGRESS;
282 	tq_unlock(queue, status);
283 
284 	callout_drain(&timeout_task->c);
285 	taskqueue_drain(queue, &timeout_task->t);
286 
287 	/*
288 	 * Clear flag to allow timer to re-start:
289 	 */
290 	tq_lock(queue, &status);
291 	timeout_task->f &= ~DT_DRAIN_IN_PROGRESS;
292 	tq_unlock(queue, status);
293 }
294 
295 
296 static void
297 taskqueue_task_nop_fn(void* context, int pending)
298 {
299 }
300 
301 
302 void
303 taskqueue_drain_all(struct taskqueue *taskQueue)
304 {
305 	struct task t_barrier;
306 
307 	if (taskQueue == NULL) {
308 		printf("taskqueue_drain_all called with NULL taskqueue\n");
309 		return;
310 	}
311 
312 	TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier);
313 	taskqueue_enqueue(taskQueue, &t_barrier);
314 	taskqueue_drain(taskQueue, &t_barrier);
315 }
316 
317 
318 static void
319 taskqueue_enqueue_locked(struct taskqueue *taskQueue, struct task *task,
320 	cpu_status status)
321 {
322 	/* we don't really support priorities */
323 	if (task->ta_pending) {
324 		task->ta_pending++;
325 	} else {
326 		list_add_item(&taskQueue->tq_list, task);
327 		task->ta_pending = 1;
328 		if ((taskQueue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
329 			taskQueue->tq_enqueue(taskQueue->tq_arg);
330 		else
331 			taskQueue->tq_flags |= TQ_FLAGS_PENDING;
332 	}
333 	tq_unlock(taskQueue, status);
334 }
335 
336 
337 int
338 taskqueue_enqueue(struct taskqueue *taskQueue, struct task *task)
339 {
340 	cpu_status status;
341 
342 	tq_lock(taskQueue, &status);
343 	taskqueue_enqueue_locked(taskQueue, task, status);
344 	/* The lock is released inside. */
345 
346 	return 0;
347 }
348 
349 
350 static void
351 taskqueue_timeout_func(void *arg)
352 {
353 	struct taskqueue *queue;
354 	struct timeout_task *timeout_task;
355 	cpu_status status;
356 		// dummy, as we should never get here on a spin taskqueue
357 
358 	timeout_task = arg;
359 	queue = timeout_task->q;
360 	KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
361 	timeout_task->f &= ~DT_CALLOUT_ARMED;
362 	queue->tq_callouts--;
363 	taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t, status);
364 	/* The lock is released inside. */
365 }
366 
367 
368 int
369 taskqueue_enqueue_timeout(struct taskqueue *queue,
370 	struct timeout_task *ttask, int _ticks)
371 {
372 	int res;
373 	cpu_status status;
374 
375 	tq_lock(queue, &status);
376 	KASSERT(ttask->q == NULL || ttask->q == queue,
377 		("Migrated queue"));
378 	ttask->q = queue;
379 	res = ttask->t.ta_pending;
380 	if (ttask->f & DT_DRAIN_IN_PROGRESS) {
381 		/* Do nothing */
382 		tq_unlock(queue, status);
383 		res = -1;
384 	} else if (_ticks == 0) {
385 		tq_unlock(queue, status);
386 		taskqueue_enqueue(queue, &ttask->t);
387 	} else {
388 		if ((ttask->f & DT_CALLOUT_ARMED) != 0) {
389 			res++;
390 		} else {
391 			queue->tq_callouts++;
392 			ttask->f |= DT_CALLOUT_ARMED;
393 			if (_ticks < 0)
394 				_ticks = -_ticks; /* Ignore overflow. */
395 		}
396 		tq_unlock(queue, status);
397 		if (_ticks > 0) {
398 			callout_reset(&ttask->c, _ticks,
399 				taskqueue_timeout_func, ttask);
400 		}
401 	}
402 	return (res);
403 }
404 
405 
406 static int
407 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
408 	u_int *pendp)
409 {
410 	if (task->ta_pending > 0)
411 		list_remove_item(&queue->tq_list, task);
412 	if (pendp != NULL)
413 		*pendp = task->ta_pending;
414 	task->ta_pending = 0;
415 	return 0;
416 }
417 
418 
419 int
420 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
421 {
422 	int error;
423 	cpu_status status;
424 
425 	tq_lock(queue, &status);
426 	error = taskqueue_cancel_locked(queue, task, pendp);
427 	tq_unlock(queue, status);
428 
429 	return (error);
430 }
431 
432 
433 int
434 taskqueue_cancel_timeout(struct taskqueue *queue,
435 	struct timeout_task *timeout_task, u_int *pendp)
436 {
437 	u_int pending, pending1;
438 	int error;
439 	cpu_status status;
440 
441 	tq_lock(queue, &status);
442 	pending = !!(callout_stop(&timeout_task->c) > 0);
443 	error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
444 	if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
445 		timeout_task->f &= ~DT_CALLOUT_ARMED;
446 		queue->tq_callouts--;
447 	}
448 	tq_unlock(queue, status);
449 
450 	if (pendp != NULL)
451 		*pendp = pending + pending1;
452 	return (error);
453 }
454 
455 
456 void
457 taskqueue_thread_enqueue(void *context)
458 {
459 	struct taskqueue **tqp = context;
460 	release_sem_etc((*tqp)->tq_sem, 1, B_DO_NOT_RESCHEDULE);
461 }
462 
463 
464 int
465 taskqueue_enqueue_fast(struct taskqueue *taskQueue, struct task *task)
466 {
467 	return taskqueue_enqueue(taskQueue, task);
468 }
469 
470 
471 struct taskqueue *
472 taskqueue_create_fast(const char *name, int mflags,
473 	taskqueue_enqueue_fn enqueueFunction, void *context)
474 {
475 	return _taskqueue_create(name, mflags, 1, enqueueFunction, context);
476 }
477 
478 
479 void
480 task_init(struct task *task, int prio, task_fn_t handler, void *context)
481 {
482 	task->ta_priority = prio;
483 	task->ta_handler = handler;
484 	task->ta_argument = context;
485 	task->ta_pending = 0;
486 }
487 
488 
489 void
490 timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
491 	int priority, task_fn_t func, void *context)
492 {
493 	TASK_INIT(&timeout_task->t, priority, func, context);
494 	callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
495 		CALLOUT_RETURNUNLOCKED);
496 	timeout_task->q = queue;
497 	timeout_task->f = 0;
498 }
499 
500 
501 status_t
502 init_taskqueues()
503 {
504 	status_t status = B_NO_MEMORY;
505 
506 	if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE)) {
507 		taskqueue_fast = taskqueue_create_fast("fast taskq", 0,
508 			taskqueue_thread_enqueue, &taskqueue_fast);
509 		if (taskqueue_fast == NULL)
510 			return B_NO_MEMORY;
511 
512 		status = taskqueue_start_threads(&taskqueue_fast, 1,
513 			B_REAL_TIME_PRIORITY, "fast taskq thread");
514 		if (status < B_OK)
515 			goto err_1;
516 	}
517 
518 	if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE)) {
519 		taskqueue_swi = taskqueue_create_fast("swi taskq", 0,
520 			taskqueue_thread_enqueue, &taskqueue_swi);
521 		if (taskqueue_swi == NULL) {
522 			status = B_NO_MEMORY;
523 			goto err_1;
524 		}
525 
526 		status = taskqueue_start_threads(&taskqueue_swi, 1,
527 			B_REAL_TIME_PRIORITY, "swi taskq");
528 		if (status < B_OK)
529 			goto err_2;
530 	}
531 
532 	if (HAIKU_DRIVER_REQUIRES(FBSD_THREAD_TASKQUEUE)) {
533 		taskqueue_thread = taskqueue_create_fast("thread taskq", 0,
534 			taskqueue_thread_enqueue, &taskqueue_thread);
535 		if (taskqueue_thread == NULL) {
536 			status = B_NO_MEMORY;
537 			goto err_2;
538 		}
539 
540 		status = taskqueue_start_threads(&taskqueue_thread, 1,
541 			B_REAL_TIME_PRIORITY, "swi taskq");
542 		if (status < B_OK)
543 			goto err_3;
544 	}
545 
546 	return B_OK;
547 
548 err_3:
549 	if (taskqueue_thread)
550 		taskqueue_free(taskqueue_thread);
551 
552 err_2:
553 	if (taskqueue_swi)
554 		taskqueue_free(taskqueue_swi);
555 
556 err_1:
557 	if (taskqueue_fast)
558 		taskqueue_free(taskqueue_fast);
559 
560 	return status;
561 }
562 
563 
564 void
565 uninit_taskqueues()
566 {
567 	if (HAIKU_DRIVER_REQUIRES(FBSD_THREAD_TASKQUEUE))
568 		taskqueue_free(taskqueue_thread);
569 
570 	if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE))
571 		taskqueue_free(taskqueue_swi);
572 
573 	if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE))
574 		taskqueue_free(taskqueue_fast);
575 }
576 
577 
578 void
579 taskqueue_block(struct taskqueue *taskQueue)
580 {
581 	cpu_status status;
582 
583 	tq_lock(taskQueue, &status);
584 	taskQueue->tq_flags |= TQ_FLAGS_BLOCKED;
585 	tq_unlock(taskQueue, status);
586 }
587 
588 
589 void
590 taskqueue_unblock(struct taskqueue *taskQueue)
591 {
592 	cpu_status status;
593 
594 	tq_lock(taskQueue, &status);
595 	taskQueue->tq_flags &= ~TQ_FLAGS_BLOCKED;
596 	if (taskQueue->tq_flags & TQ_FLAGS_PENDING) {
597 		taskQueue->tq_flags &= ~TQ_FLAGS_PENDING;
598 		taskQueue->tq_enqueue(taskQueue->tq_arg);
599 	}
600 	tq_unlock(taskQueue, status);
601 }
602