1 /*-
2 * Copyright (c) 2000 Doug Rabson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27 #include <sys/cdefs.h>
28 __FBSDID("$FreeBSD$");
29
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/bus.h>
33 #include <sys/kernel.h>
34 #include <sys/kthread.h>
35 #include <sys/libkern.h>
36 #include <sys/limits.h>
37 #include <sys/lock.h>
38 #include <sys/malloc.h>
39 #include <sys/mutex.h>
40 #include <sys/proc.h>
41 #include <sys/smp.h>
42 #include <sys/taskqueue.h>
43 #include <machine/stdarg.h>
44
45 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
46 static void *taskqueue_giant_ih;
47 static void *taskqueue_ih;
48
49 struct taskqueue_busy {
50 struct task *tb_running;
51 TAILQ_ENTRY(taskqueue_busy) tb_link;
52 };
53
54 struct task * const TB_DRAIN_WAITER = (struct task *)0x1;
55
56 struct taskqueue {
57 STAILQ_HEAD(, task) tq_queue;
58 taskqueue_enqueue_fn tq_enqueue;
59 void *tq_context;
60 char *tq_name;
61 TAILQ_HEAD(, taskqueue_busy) tq_active;
62 struct mtx tq_mutex;
63 #ifdef __HAIKU__
64 sem_id tq_sem;
65 thread_id *tq_threads;
66 thread_id tq_thread_storage;
67 int tq_threadcount;
68 #else
69 struct thread **tq_threads;
70 #endif
71 int tq_tcount;
72 int tq_spin;
73 int tq_flags;
74 int tq_callouts;
75 taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
76 void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
77 };
78
79 #define TQ_FLAGS_ACTIVE (1 << 0)
80 #define TQ_FLAGS_BLOCKED (1 << 1)
81 #define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2)
82
83 #define DT_CALLOUT_ARMED (1 << 0)
84 #define DT_DRAIN_IN_PROGRESS (1 << 1)
85
86 #define TQ_LOCK(tq) \
87 do { \
88 if ((tq)->tq_spin) \
89 mtx_lock_spin(&(tq)->tq_mutex); \
90 else \
91 mtx_lock(&(tq)->tq_mutex); \
92 } while (0)
93 #define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED)
94
95 #define TQ_UNLOCK(tq) \
96 do { \
97 if ((tq)->tq_spin) \
98 mtx_unlock_spin(&(tq)->tq_mutex); \
99 else \
100 mtx_unlock(&(tq)->tq_mutex); \
101 } while (0)
102 #define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
103
104 void
_timeout_task_init(struct taskqueue * queue,struct timeout_task * timeout_task,int priority,task_fn_t func,void * context)105 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
106 int priority, task_fn_t func, void *context)
107 {
108
109 TASK_INIT(&timeout_task->t, priority, func, context);
110 callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
111 CALLOUT_RETURNUNLOCKED);
112 timeout_task->q = queue;
113 timeout_task->f = 0;
114 }
115
116 static struct taskqueue *
_taskqueue_create(const char * name,int mflags,taskqueue_enqueue_fn enqueue,void * context,int mtxflags,const char * mtxname __unused)117 _taskqueue_create(const char *name, int mflags,
118 taskqueue_enqueue_fn enqueue, void *context,
119 int mtxflags, const char *mtxname __unused)
120 {
121 struct taskqueue *queue;
122 char *tq_name;
123
124 tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO);
125 if (tq_name == NULL)
126 return (NULL);
127
128 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
129 if (queue == NULL) {
130 free(tq_name, M_TASKQUEUE);
131 return (NULL);
132 }
133
134 snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
135
136 STAILQ_INIT(&queue->tq_queue);
137 TAILQ_INIT(&queue->tq_active);
138 queue->tq_enqueue = enqueue;
139 queue->tq_context = context;
140 queue->tq_name = tq_name;
141 queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
142 queue->tq_flags |= TQ_FLAGS_ACTIVE;
143 if (enqueue == taskqueue_thread_enqueue)
144 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
145 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
146
147 return (queue);
148 }
149
150 struct taskqueue *
taskqueue_create(const char * name,int mflags,taskqueue_enqueue_fn enqueue,void * context)151 taskqueue_create(const char *name, int mflags,
152 taskqueue_enqueue_fn enqueue, void *context)
153 {
154
155 return _taskqueue_create(name, mflags, enqueue, context,
156 MTX_DEF, name);
157 }
158
159 void
taskqueue_set_callback(struct taskqueue * queue,enum taskqueue_callback_type cb_type,taskqueue_callback_fn callback,void * context)160 taskqueue_set_callback(struct taskqueue *queue,
161 enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
162 void *context)
163 {
164
165 KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
166 (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
167 ("Callback type %d not valid, must be %d-%d", cb_type,
168 TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
169 KASSERT((queue->tq_callbacks[cb_type] == NULL),
170 ("Re-initialization of taskqueue callback?"));
171
172 queue->tq_callbacks[cb_type] = callback;
173 queue->tq_cb_contexts[cb_type] = context;
174 }
175
176 void
taskqueue_free(struct taskqueue * queue)177 taskqueue_free(struct taskqueue *queue)
178 {
179
180 TQ_LOCK(queue);
181 queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
182 taskqueue_terminate(queue->tq_threads, queue);
183 KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
184 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
185 #ifdef __HAIKU__
186 TQ_UNLOCK(queue);
187 #endif
188 mtx_destroy(&queue->tq_mutex);
189 free(queue->tq_threads, M_TASKQUEUE);
190 free(queue->tq_name, M_TASKQUEUE);
191 free(queue, M_TASKQUEUE);
192 }
193
194 static int
taskqueue_enqueue_locked(struct taskqueue * queue,struct task * task)195 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
196 {
197 struct task *ins;
198 struct task *prev;
199
200 KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func"));
201 /*
202 * Count multiple enqueues.
203 */
204 if (task->ta_pending) {
205 if (task->ta_pending < USHRT_MAX)
206 task->ta_pending++;
207 TQ_UNLOCK(queue);
208 return (0);
209 }
210
211 /*
212 * Optimise the case when all tasks have the same priority.
213 */
214 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
215 if (!prev || prev->ta_priority >= task->ta_priority) {
216 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
217 } else {
218 prev = NULL;
219 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
220 prev = ins, ins = STAILQ_NEXT(ins, ta_link))
221 if (ins->ta_priority < task->ta_priority)
222 break;
223
224 if (prev)
225 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
226 else
227 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
228 }
229
230 task->ta_pending = 1;
231 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
232 TQ_UNLOCK(queue);
233 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
234 queue->tq_enqueue(queue->tq_context);
235 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
236 TQ_UNLOCK(queue);
237
238 /* Return with lock released. */
239 return (0);
240 }
241
242 int
taskqueue_enqueue(struct taskqueue * queue,struct task * task)243 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
244 {
245 int res;
246
247 TQ_LOCK(queue);
248 res = taskqueue_enqueue_locked(queue, task);
249 /* The lock is released inside. */
250
251 return (res);
252 }
253
254 static void
taskqueue_timeout_func(void * arg)255 taskqueue_timeout_func(void *arg)
256 {
257 struct taskqueue *queue;
258 struct timeout_task *timeout_task;
259
260 timeout_task = arg;
261 queue = timeout_task->q;
262 KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
263 timeout_task->f &= ~DT_CALLOUT_ARMED;
264 queue->tq_callouts--;
265 taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
266 /* The lock is released inside. */
267 }
268
269 int
taskqueue_enqueue_timeout(struct taskqueue * queue,struct timeout_task * timeout_task,int _ticks)270 taskqueue_enqueue_timeout(struct taskqueue *queue,
271 struct timeout_task *timeout_task, int _ticks)
272 {
273 int res;
274
275 TQ_LOCK(queue);
276 KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
277 ("Migrated queue"));
278 KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
279 timeout_task->q = queue;
280 res = timeout_task->t.ta_pending;
281 if (timeout_task->f & DT_DRAIN_IN_PROGRESS) {
282 /* Do nothing */
283 TQ_UNLOCK(queue);
284 res = -1;
285 } else if (_ticks == 0) {
286 taskqueue_enqueue_locked(queue, &timeout_task->t);
287 /* The lock is released inside. */
288 } else {
289 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
290 res++;
291 } else {
292 queue->tq_callouts++;
293 timeout_task->f |= DT_CALLOUT_ARMED;
294 if (_ticks < 0)
295 _ticks = -_ticks; /* Ignore overflow. */
296 }
297 if (_ticks > 0) {
298 callout_reset(&timeout_task->c, _ticks,
299 taskqueue_timeout_func, timeout_task);
300 }
301 TQ_UNLOCK(queue);
302 }
303 return (res);
304 }
305
306 static void
taskqueue_task_nop_fn(void * context,int pending)307 taskqueue_task_nop_fn(void *context, int pending)
308 {
309 }
310
311 void
taskqueue_block(struct taskqueue * queue)312 taskqueue_block(struct taskqueue *queue)
313 {
314
315 TQ_LOCK(queue);
316 queue->tq_flags |= TQ_FLAGS_BLOCKED;
317 TQ_UNLOCK(queue);
318 }
319
320 void
taskqueue_unblock(struct taskqueue * queue)321 taskqueue_unblock(struct taskqueue *queue)
322 {
323
324 TQ_LOCK(queue);
325 queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
326 if (!STAILQ_EMPTY(&queue->tq_queue))
327 queue->tq_enqueue(queue->tq_context);
328 TQ_UNLOCK(queue);
329 }
330
331 static void
taskqueue_run_locked(struct taskqueue * queue)332 taskqueue_run_locked(struct taskqueue *queue)
333 {
334 struct taskqueue_busy tb;
335 struct taskqueue_busy *tb_first;
336 struct task *task;
337 int pending;
338
339 KASSERT(queue != NULL, ("tq is NULL"));
340 TQ_ASSERT_LOCKED(queue);
341 tb.tb_running = NULL;
342
343 while (STAILQ_FIRST(&queue->tq_queue)) {
344 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
345
346 /*
347 * Carefully remove the first task from the queue and
348 * zero its pending count.
349 */
350 task = STAILQ_FIRST(&queue->tq_queue);
351 KASSERT(task != NULL, ("task is NULL"));
352 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
353 pending = task->ta_pending;
354 task->ta_pending = 0;
355 tb.tb_running = task;
356 TQ_UNLOCK(queue);
357
358 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL"));
359 #ifdef __HAIKU__
360 if ((task->ta_flags & TASK_NEEDSGIANT) != 0)
361 mtx_lock(&Giant);
362 #endif
363 task->ta_func(task->ta_context, pending);
364 #ifdef __HAIKU__
365 if ((task->ta_flags & TASK_NEEDSGIANT) != 0)
366 mtx_unlock(&Giant);
367 #endif
368
369 TQ_LOCK(queue);
370 tb.tb_running = NULL;
371
372 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
373 tb_first = TAILQ_FIRST(&queue->tq_active);
374 }
375 }
376
377 void
taskqueue_run(struct taskqueue * queue)378 taskqueue_run(struct taskqueue *queue)
379 {
380
381 TQ_LOCK(queue);
382 taskqueue_run_locked(queue);
383 TQ_UNLOCK(queue);
384 }
385
386 static int
task_is_running(struct taskqueue * queue,struct task * task)387 task_is_running(struct taskqueue *queue, struct task *task)
388 {
389 struct taskqueue_busy *tb;
390
391 TQ_ASSERT_LOCKED(queue);
392 TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
393 if (tb->tb_running == task)
394 return (1);
395 }
396 return (0);
397 }
398
399 static int
taskqueue_cancel_locked(struct taskqueue * queue,struct task * task,u_int * pendp)400 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
401 u_int *pendp)
402 {
403
404 if (task->ta_pending > 0)
405 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
406 if (pendp != NULL)
407 *pendp = task->ta_pending;
408 task->ta_pending = 0;
409 return (task_is_running(queue, task) ? EBUSY : 0);
410 }
411
412 int
taskqueue_cancel(struct taskqueue * queue,struct task * task,u_int * pendp)413 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
414 {
415 int error;
416
417 TQ_LOCK(queue);
418 error = taskqueue_cancel_locked(queue, task, pendp);
419 TQ_UNLOCK(queue);
420
421 return (error);
422 }
423
424 int
taskqueue_cancel_timeout(struct taskqueue * queue,struct timeout_task * timeout_task,u_int * pendp)425 taskqueue_cancel_timeout(struct taskqueue *queue,
426 struct timeout_task *timeout_task, u_int *pendp)
427 {
428 u_int pending, pending1;
429 int error;
430
431 TQ_LOCK(queue);
432 pending = !!(callout_stop(&timeout_task->c) > 0);
433 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
434 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
435 timeout_task->f &= ~DT_CALLOUT_ARMED;
436 queue->tq_callouts--;
437 }
438 TQ_UNLOCK(queue);
439
440 if (pendp != NULL)
441 *pendp = pending + pending1;
442 return (error);
443 }
444
445 void
taskqueue_drain_timeout(struct taskqueue * queue,struct timeout_task * timeout_task)446 taskqueue_drain_timeout(struct taskqueue *queue,
447 struct timeout_task *timeout_task)
448 {
449
450 /*
451 * Set flag to prevent timer from re-starting during drain:
452 */
453 TQ_LOCK(queue);
454 KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0,
455 ("Drain already in progress"));
456 timeout_task->f |= DT_DRAIN_IN_PROGRESS;
457 TQ_UNLOCK(queue);
458
459 callout_drain(&timeout_task->c);
460 taskqueue_drain(queue, &timeout_task->t);
461
462 /*
463 * Clear flag to allow timer to re-start:
464 */
465 TQ_LOCK(queue);
466 timeout_task->f &= ~DT_DRAIN_IN_PROGRESS;
467 TQ_UNLOCK(queue);
468 }
469
470 int
taskqueue_start_threads(struct taskqueue ** tqp,int count,int pri,const char * format,...)471 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
472 const char *format, ...)
473 {
474 char name[64];
475 int error;
476 va_list vl;
477
478 va_start(vl, format);
479 vsnprintf(name, sizeof(name), format, vl);
480 va_end(vl);
481
482 error = _taskqueue_start_threads(tqp, count, pri, name);
483 return (error);
484 }
485
486 static inline void
taskqueue_run_callback(struct taskqueue * tq,enum taskqueue_callback_type cb_type)487 taskqueue_run_callback(struct taskqueue *tq,
488 enum taskqueue_callback_type cb_type)
489 {
490 taskqueue_callback_fn tq_callback;
491
492 TQ_ASSERT_UNLOCKED(tq);
493 tq_callback = tq->tq_callbacks[cb_type];
494 if (tq_callback != NULL)
495 tq_callback(tq->tq_cb_contexts[cb_type]);
496 }
497
498 int
taskqueue_member(struct taskqueue * queue,struct thread * td)499 taskqueue_member(struct taskqueue *queue, struct thread *td)
500 {
501 int i, j, ret = 0;
502
503 for (i = 0, j = 0; ; i++) {
504 if (queue->tq_threads[i] == NULL)
505 continue;
506 if (queue->tq_threads[i] == td) {
507 ret = 1;
508 break;
509 }
510 if (++j >= queue->tq_tcount)
511 break;
512 }
513 return (ret);
514 }
515
516 struct taskqueue *
taskqueue_create_fast(const char * name,int mflags,taskqueue_enqueue_fn enqueue,void * context)517 taskqueue_create_fast(const char *name, int mflags,
518 taskqueue_enqueue_fn enqueue, void *context)
519 {
520 return _taskqueue_create(name, mflags, enqueue, context,
521 MTX_SPIN, "fast_taskqueue");
522 }
523