xref: /haiku/src/libs/compat/freebsd_network/fbsd_subr_taskqueue.c (revision 1454f3c27aec3e7879a58e7cedc9d66d90c773a7)
1 /*-
2  * Copyright (c) 2000 Doug Rabson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  */
26 
27 #include <sys/cdefs.h>
28 __FBSDID("$FreeBSD$");
29 
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/bus.h>
33 #include <sys/kernel.h>
34 #include <sys/kthread.h>
35 #include <sys/libkern.h>
36 #include <sys/limits.h>
37 #include <sys/lock.h>
38 #include <sys/malloc.h>
39 #include <sys/mutex.h>
40 #include <sys/proc.h>
41 #include <sys/smp.h>
42 #include <sys/taskqueue.h>
43 #include <machine/stdarg.h>
44 
45 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
46 static void	*taskqueue_giant_ih;
47 static void	*taskqueue_ih;
48 
49 struct taskqueue_busy {
50 	struct task	*tb_running;
51 	TAILQ_ENTRY(taskqueue_busy) tb_link;
52 };
53 
54 struct task * const TB_DRAIN_WAITER = (struct task *)0x1;
55 
56 struct taskqueue {
57 	STAILQ_HEAD(, task)	tq_queue;
58 	taskqueue_enqueue_fn	tq_enqueue;
59 	void			*tq_context;
60 	char			*tq_name;
61 	TAILQ_HEAD(, taskqueue_busy) tq_active;
62 	struct mtx		tq_mutex;
63 #ifdef __HAIKU__
64 	sem_id tq_sem;
65 	thread_id *tq_threads;
66 	thread_id tq_thread_storage;
67 	int tq_threadcount;
68 #else
69 	struct thread		**tq_threads;
70 #endif
71 	int			tq_tcount;
72 	int			tq_spin;
73 	int			tq_flags;
74 	int			tq_callouts;
75 	taskqueue_callback_fn	tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
76 	void			*tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
77 };
78 
79 #define	TQ_FLAGS_ACTIVE		(1 << 0)
80 #define	TQ_FLAGS_BLOCKED	(1 << 1)
81 #define	TQ_FLAGS_UNLOCKED_ENQUEUE	(1 << 2)
82 
83 #define	DT_CALLOUT_ARMED	(1 << 0)
84 #define	DT_DRAIN_IN_PROGRESS	(1 << 1)
85 
86 #define	TQ_LOCK(tq)							\
87 	do {								\
88 		if ((tq)->tq_spin)					\
89 			mtx_lock_spin(&(tq)->tq_mutex);			\
90 		else							\
91 			mtx_lock(&(tq)->tq_mutex);			\
92 	} while (0)
93 #define	TQ_ASSERT_LOCKED(tq)	mtx_assert(&(tq)->tq_mutex, MA_OWNED)
94 
95 #define	TQ_UNLOCK(tq)							\
96 	do {								\
97 		if ((tq)->tq_spin)					\
98 			mtx_unlock_spin(&(tq)->tq_mutex);		\
99 		else							\
100 			mtx_unlock(&(tq)->tq_mutex);			\
101 	} while (0)
102 #define	TQ_ASSERT_UNLOCKED(tq)	mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
103 
104 void
105 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
106 	int priority, task_fn_t func, void *context)
107 {
108 
109 	TASK_INIT(&timeout_task->t, priority, func, context);
110 	callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
111 		CALLOUT_RETURNUNLOCKED);
112 	timeout_task->q = queue;
113 	timeout_task->f = 0;
114 }
115 
116 static struct taskqueue *
117 _taskqueue_create(const char *name, int mflags,
118 		 taskqueue_enqueue_fn enqueue, void *context,
119 		 int mtxflags, const char *mtxname __unused)
120 {
121 	struct taskqueue *queue;
122 	char *tq_name;
123 
124 	tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO);
125 	if (tq_name == NULL)
126 		return (NULL);
127 
128 	queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
129 	if (queue == NULL) {
130 		free(tq_name, M_TASKQUEUE);
131 		return (NULL);
132 	}
133 
134 	snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
135 
136 	STAILQ_INIT(&queue->tq_queue);
137 	TAILQ_INIT(&queue->tq_active);
138 	queue->tq_enqueue = enqueue;
139 	queue->tq_context = context;
140 	queue->tq_name = tq_name;
141 	queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
142 	queue->tq_flags |= TQ_FLAGS_ACTIVE;
143 	if (enqueue == taskqueue_thread_enqueue)
144 		queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
145 	mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
146 
147 	return (queue);
148 }
149 
150 struct taskqueue *
151 taskqueue_create(const char *name, int mflags,
152 		 taskqueue_enqueue_fn enqueue, void *context)
153 {
154 
155 	return _taskqueue_create(name, mflags, enqueue, context,
156 			MTX_DEF, name);
157 }
158 
159 void
160 taskqueue_set_callback(struct taskqueue *queue,
161 	enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
162 	void *context)
163 {
164 
165 	KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
166 		(cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
167 		("Callback type %d not valid, must be %d-%d", cb_type,
168 		TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
169 	KASSERT((queue->tq_callbacks[cb_type] == NULL),
170 		("Re-initialization of taskqueue callback?"));
171 
172 	queue->tq_callbacks[cb_type] = callback;
173 	queue->tq_cb_contexts[cb_type] = context;
174 }
175 
176 void
177 taskqueue_free(struct taskqueue *queue)
178 {
179 
180 	TQ_LOCK(queue);
181 	queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
182 	taskqueue_terminate(queue->tq_threads, queue);
183 	KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
184 	KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
185 #ifdef __HAIKU__
186 	TQ_UNLOCK(queue);
187 #endif
188 	mtx_destroy(&queue->tq_mutex);
189 	free(queue->tq_threads, M_TASKQUEUE);
190 	free(queue->tq_name, M_TASKQUEUE);
191 	free(queue, M_TASKQUEUE);
192 }
193 
194 static int
195 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
196 {
197 	struct task *ins;
198 	struct task *prev;
199 
200 	KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func"));
201 	/*
202 	 * Count multiple enqueues.
203 	 */
204 	if (task->ta_pending) {
205 		if (task->ta_pending < USHRT_MAX)
206 			task->ta_pending++;
207 		TQ_UNLOCK(queue);
208 		return (0);
209 	}
210 
211 	/*
212 	 * Optimise the case when all tasks have the same priority.
213 	 */
214 	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
215 	if (!prev || prev->ta_priority >= task->ta_priority) {
216 		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
217 	} else {
218 		prev = NULL;
219 		for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
220 			 prev = ins, ins = STAILQ_NEXT(ins, ta_link))
221 			if (ins->ta_priority < task->ta_priority)
222 				break;
223 
224 		if (prev)
225 			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
226 		else
227 			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
228 	}
229 
230 	task->ta_pending = 1;
231 	if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
232 		TQ_UNLOCK(queue);
233 	if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
234 		queue->tq_enqueue(queue->tq_context);
235 	if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
236 		TQ_UNLOCK(queue);
237 
238 	/* Return with lock released. */
239 	return (0);
240 }
241 
242 int
243 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
244 {
245 	int res;
246 
247 	TQ_LOCK(queue);
248 	res = taskqueue_enqueue_locked(queue, task);
249 	/* The lock is released inside. */
250 
251 	return (res);
252 }
253 
254 static void
255 taskqueue_timeout_func(void *arg)
256 {
257 	struct taskqueue *queue;
258 	struct timeout_task *timeout_task;
259 
260 	timeout_task = arg;
261 	queue = timeout_task->q;
262 	KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
263 	timeout_task->f &= ~DT_CALLOUT_ARMED;
264 	queue->tq_callouts--;
265 	taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
266 	/* The lock is released inside. */
267 }
268 
269 int
270 taskqueue_enqueue_timeout(struct taskqueue *queue,
271 	struct timeout_task *timeout_task, int _ticks)
272 {
273 	int res;
274 
275 	TQ_LOCK(queue);
276 	KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
277 		("Migrated queue"));
278 	KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
279 	timeout_task->q = queue;
280 	res = timeout_task->t.ta_pending;
281 	if (timeout_task->f & DT_DRAIN_IN_PROGRESS) {
282 		/* Do nothing */
283 		TQ_UNLOCK(queue);
284 		res = -1;
285 	} else if (_ticks == 0) {
286 		taskqueue_enqueue_locked(queue, &timeout_task->t);
287 		/* The lock is released inside. */
288 	} else {
289 		if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
290 			res++;
291 		} else {
292 			queue->tq_callouts++;
293 			timeout_task->f |= DT_CALLOUT_ARMED;
294 			if (_ticks < 0)
295 				_ticks = -_ticks; /* Ignore overflow. */
296 		}
297 		if (_ticks > 0) {
298 			callout_reset(&timeout_task->c, _ticks,
299 				taskqueue_timeout_func, timeout_task);
300 		}
301 		TQ_UNLOCK(queue);
302 	}
303 	return (res);
304 }
305 
306 static void
307 taskqueue_task_nop_fn(void *context, int pending)
308 {
309 }
310 
311 void
312 taskqueue_block(struct taskqueue *queue)
313 {
314 
315 	TQ_LOCK(queue);
316 	queue->tq_flags |= TQ_FLAGS_BLOCKED;
317 	TQ_UNLOCK(queue);
318 }
319 
320 void
321 taskqueue_unblock(struct taskqueue *queue)
322 {
323 
324 	TQ_LOCK(queue);
325 	queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
326 	if (!STAILQ_EMPTY(&queue->tq_queue))
327 		queue->tq_enqueue(queue->tq_context);
328 	TQ_UNLOCK(queue);
329 }
330 
331 static void
332 taskqueue_run_locked(struct taskqueue *queue)
333 {
334 	struct taskqueue_busy tb;
335 	struct taskqueue_busy *tb_first;
336 	struct task *task;
337 	int pending;
338 
339 	KASSERT(queue != NULL, ("tq is NULL"));
340 	TQ_ASSERT_LOCKED(queue);
341 	tb.tb_running = NULL;
342 
343 	while (STAILQ_FIRST(&queue->tq_queue)) {
344 		TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
345 
346 		/*
347 		 * Carefully remove the first task from the queue and
348 		 * zero its pending count.
349 		 */
350 		task = STAILQ_FIRST(&queue->tq_queue);
351 		KASSERT(task != NULL, ("task is NULL"));
352 		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
353 		pending = task->ta_pending;
354 		task->ta_pending = 0;
355 		tb.tb_running = task;
356 		TQ_UNLOCK(queue);
357 
358 		KASSERT(task->ta_func != NULL, ("task->ta_func is NULL"));
359 #ifdef __HAIKU__
360 		if ((task->ta_flags & TASK_NEEDSGIANT) != 0)
361 			mtx_lock(&Giant);
362 #endif
363 		task->ta_func(task->ta_context, pending);
364 #ifdef __HAIKU__
365 		if ((task->ta_flags & TASK_NEEDSGIANT) != 0)
366 			mtx_unlock(&Giant);
367 #endif
368 
369 		TQ_LOCK(queue);
370 		tb.tb_running = NULL;
371 
372 		TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
373 		tb_first = TAILQ_FIRST(&queue->tq_active);
374 	}
375 }
376 
377 void
378 taskqueue_run(struct taskqueue *queue)
379 {
380 
381 	TQ_LOCK(queue);
382 	taskqueue_run_locked(queue);
383 	TQ_UNLOCK(queue);
384 }
385 
386 static int
387 task_is_running(struct taskqueue *queue, struct task *task)
388 {
389 	struct taskqueue_busy *tb;
390 
391 	TQ_ASSERT_LOCKED(queue);
392 	TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
393 		if (tb->tb_running == task)
394 			return (1);
395 	}
396 	return (0);
397 }
398 
399 static int
400 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
401 	u_int *pendp)
402 {
403 
404 	if (task->ta_pending > 0)
405 		STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
406 	if (pendp != NULL)
407 		*pendp = task->ta_pending;
408 	task->ta_pending = 0;
409 	return (task_is_running(queue, task) ? EBUSY : 0);
410 }
411 
412 int
413 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
414 {
415 	int error;
416 
417 	TQ_LOCK(queue);
418 	error = taskqueue_cancel_locked(queue, task, pendp);
419 	TQ_UNLOCK(queue);
420 
421 	return (error);
422 }
423 
424 int
425 taskqueue_cancel_timeout(struct taskqueue *queue,
426 	struct timeout_task *timeout_task, u_int *pendp)
427 {
428 	u_int pending, pending1;
429 	int error;
430 
431 	TQ_LOCK(queue);
432 	pending = !!(callout_stop(&timeout_task->c) > 0);
433 	error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
434 	if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
435 		timeout_task->f &= ~DT_CALLOUT_ARMED;
436 		queue->tq_callouts--;
437 	}
438 	TQ_UNLOCK(queue);
439 
440 	if (pendp != NULL)
441 		*pendp = pending + pending1;
442 	return (error);
443 }
444 
445 void
446 taskqueue_drain_timeout(struct taskqueue *queue,
447 	struct timeout_task *timeout_task)
448 {
449 
450 	/*
451 	 * Set flag to prevent timer from re-starting during drain:
452 	 */
453 	TQ_LOCK(queue);
454 	KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0,
455 		("Drain already in progress"));
456 	timeout_task->f |= DT_DRAIN_IN_PROGRESS;
457 	TQ_UNLOCK(queue);
458 
459 	callout_drain(&timeout_task->c);
460 	taskqueue_drain(queue, &timeout_task->t);
461 
462 	/*
463 	 * Clear flag to allow timer to re-start:
464 	 */
465 	TQ_LOCK(queue);
466 	timeout_task->f &= ~DT_DRAIN_IN_PROGRESS;
467 	TQ_UNLOCK(queue);
468 }
469 
470 int
471 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
472 	const char *format, ...)
473 {
474 	char name[64];
475 	int error;
476 	va_list vl;
477 
478 	va_start(vl, format);
479 	vsnprintf(name, sizeof(name), format, vl);
480 	va_end(vl);
481 
482 	error = _taskqueue_start_threads(tqp, count, pri, name);
483 	return (error);
484 }
485 
486 static inline void
487 taskqueue_run_callback(struct taskqueue *tq,
488 	enum taskqueue_callback_type cb_type)
489 {
490 	taskqueue_callback_fn tq_callback;
491 
492 	TQ_ASSERT_UNLOCKED(tq);
493 	tq_callback = tq->tq_callbacks[cb_type];
494 	if (tq_callback != NULL)
495 		tq_callback(tq->tq_cb_contexts[cb_type]);
496 }
497 
498 int
499 taskqueue_member(struct taskqueue *queue, struct thread *td)
500 {
501 	int i, j, ret = 0;
502 
503 	for (i = 0, j = 0; ; i++) {
504 		if (queue->tq_threads[i] == NULL)
505 			continue;
506 		if (queue->tq_threads[i] == td) {
507 			ret = 1;
508 			break;
509 		}
510 		if (++j >= queue->tq_tcount)
511 			break;
512 	}
513 	return (ret);
514 }
515 
516 struct taskqueue *
517 taskqueue_create_fast(const char *name, int mflags,
518 		 taskqueue_enqueue_fn enqueue, void *context)
519 {
520 	return _taskqueue_create(name, mflags, enqueue, context,
521 			MTX_SPIN, "fast_taskqueue");
522 }
523