1 /*-
2 * Copyright (c) 2000 Doug Rabson
3 * Copyright (c) 2014 Jeff Roberson
4 * Copyright (c) 2016 Matthew Macy
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * SUCH DAMAGE.
27 */
28
29 #include <sys/cdefs.h>
30 __FBSDID("$FreeBSD$");
31
32 #include <sys/param.h>
33 #include <sys/systm.h>
34 #include <sys/bus.h>
35 #ifndef __HAIKU__
36 #include <sys/cpuset.h>
37 #include <sys/interrupt.h>
38 #endif
39 #include <sys/kernel.h>
40 #include <sys/kthread.h>
41 #include <sys/libkern.h>
42 #include <sys/limits.h>
43 #include <sys/lock.h>
44 #include <sys/malloc.h>
45 #include <sys/mutex.h>
46 #include <sys/proc.h>
47 #ifndef __HAIKU__
48 #include <sys/sched.h>
49 #endif
50 #include <sys/smp.h>
51 #include <sys/gtaskqueue.h>
52 #ifndef __HAIKU__
53 #include <sys/unistd.h>
54 #endif
55 #include <machine/stdarg.h>
56
57 static MALLOC_DEFINE(M_GTASKQUEUE, "gtaskqueue", "Group Task Queues");
58 static void gtaskqueue_thread_enqueue(void *);
59 static void gtaskqueue_thread_loop(void *arg);
60 static int task_is_running(struct gtaskqueue *queue, struct gtask *gtask);
61 static void gtaskqueue_drain_locked(struct gtaskqueue *queue, struct gtask *gtask);
62
63 TASKQGROUP_DEFINE(softirq, mp_ncpus, 1);
64 TASKQGROUP_DEFINE(config, 1, 1);
65
66 struct gtaskqueue_busy {
67 struct gtask *tb_running;
68 TAILQ_ENTRY(gtaskqueue_busy) tb_link;
69 };
70
71 static struct gtask * const TB_DRAIN_WAITER = (struct gtask *)0x1;
72
73 typedef void (*gtaskqueue_enqueue_fn)(void *context);
74
75 struct gtaskqueue {
76 STAILQ_HEAD(, gtask) tq_queue;
77 gtaskqueue_enqueue_fn tq_enqueue;
78 void *tq_context;
79 char *tq_name;
80 TAILQ_HEAD(, gtaskqueue_busy) tq_active;
81 struct mtx tq_mutex;
82 struct thread **tq_threads;
83 int tq_tcount;
84 int tq_spin;
85 int tq_flags;
86 int tq_callouts;
87 taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
88 void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
89 };
90
91 #define TQ_FLAGS_ACTIVE (1 << 0)
92 #define TQ_FLAGS_BLOCKED (1 << 1)
93 #define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2)
94
95 #define DT_CALLOUT_ARMED (1 << 0)
96
97 #define TQ_LOCK(tq) \
98 do { \
99 if ((tq)->tq_spin) \
100 mtx_lock_spin(&(tq)->tq_mutex); \
101 else \
102 mtx_lock(&(tq)->tq_mutex); \
103 } while (0)
104 #define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED)
105
106 #define TQ_UNLOCK(tq) \
107 do { \
108 if ((tq)->tq_spin) \
109 mtx_unlock_spin(&(tq)->tq_mutex); \
110 else \
111 mtx_unlock(&(tq)->tq_mutex); \
112 } while (0)
113 #define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
114
115 #ifdef INVARIANTS
116 static void
gtask_dump(struct gtask * gtask)117 gtask_dump(struct gtask *gtask)
118 {
119 printf("gtask: %p ta_flags=%x ta_priority=%d ta_func=%p ta_context=%p\n",
120 gtask, gtask->ta_flags, gtask->ta_priority, gtask->ta_func, gtask->ta_context);
121 }
122 #endif
123
124 static __inline int
TQ_SLEEP(struct gtaskqueue * tq,void * p,struct mtx * m,int pri,const char * wm,int t)125 TQ_SLEEP(struct gtaskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
126 int t)
127 {
128 if (tq->tq_spin)
129 return (msleep_spin(p, m, wm, t));
130 return (msleep(p, m, pri, wm, t));
131 }
132
133 static struct gtaskqueue *
_gtaskqueue_create(const char * name,int mflags,taskqueue_enqueue_fn enqueue,void * context,int mtxflags,const char * mtxname __unused)134 _gtaskqueue_create(const char *name, int mflags,
135 taskqueue_enqueue_fn enqueue, void *context,
136 int mtxflags, const char *mtxname __unused)
137 {
138 struct gtaskqueue *queue;
139 char *tq_name;
140
141 tq_name = malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO);
142 if (!tq_name)
143 return (NULL);
144
145 snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
146
147 queue = malloc(sizeof(struct gtaskqueue), M_GTASKQUEUE, mflags | M_ZERO);
148 if (!queue) {
149 free(tq_name, M_GTASKQUEUE);
150 return (NULL);
151 }
152
153 STAILQ_INIT(&queue->tq_queue);
154 TAILQ_INIT(&queue->tq_active);
155 queue->tq_enqueue = enqueue;
156 queue->tq_context = context;
157 queue->tq_name = tq_name;
158 queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
159 queue->tq_flags |= TQ_FLAGS_ACTIVE;
160 if (enqueue == gtaskqueue_thread_enqueue)
161 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
162 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
163
164 return (queue);
165 }
166
167
168 /*
169 * Signal a taskqueue thread to terminate.
170 */
171 static void
gtaskqueue_terminate(struct thread ** pp,struct gtaskqueue * tq)172 gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq)
173 {
174
175 while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
176 wakeup(tq);
177 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
178 }
179 }
180
181 static void
gtaskqueue_free(struct gtaskqueue * queue)182 gtaskqueue_free(struct gtaskqueue *queue)
183 {
184
185 TQ_LOCK(queue);
186 queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
187 gtaskqueue_terminate(queue->tq_threads, queue);
188 KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
189 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
190 mtx_destroy(&queue->tq_mutex);
191 free(queue->tq_threads, M_GTASKQUEUE);
192 free(queue->tq_name, M_GTASKQUEUE);
193 free(queue, M_GTASKQUEUE);
194 }
195
196 /*
197 * Wait for all to complete, then prevent it from being enqueued
198 */
199 void
grouptask_block(struct grouptask * grouptask)200 grouptask_block(struct grouptask *grouptask)
201 {
202 struct gtaskqueue *queue = grouptask->gt_taskqueue;
203 struct gtask *gtask = &grouptask->gt_task;
204
205 #ifdef INVARIANTS
206 if (queue == NULL) {
207 gtask_dump(gtask);
208 panic("queue == NULL");
209 }
210 #endif
211 TQ_LOCK(queue);
212 gtask->ta_flags |= TASK_NOENQUEUE;
213 gtaskqueue_drain_locked(queue, gtask);
214 TQ_UNLOCK(queue);
215 }
216
217 void
grouptask_unblock(struct grouptask * grouptask)218 grouptask_unblock(struct grouptask *grouptask)
219 {
220 struct gtaskqueue *queue = grouptask->gt_taskqueue;
221 struct gtask *gtask = &grouptask->gt_task;
222
223 #ifdef INVARIANTS
224 if (queue == NULL) {
225 gtask_dump(gtask);
226 panic("queue == NULL");
227 }
228 #endif
229 TQ_LOCK(queue);
230 gtask->ta_flags &= ~TASK_NOENQUEUE;
231 TQ_UNLOCK(queue);
232 }
233
234 int
grouptaskqueue_enqueue(struct gtaskqueue * queue,struct gtask * gtask)235 grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask)
236 {
237 #ifdef INVARIANTS
238 if (queue == NULL) {
239 gtask_dump(gtask);
240 panic("queue == NULL");
241 }
242 #endif
243 TQ_LOCK(queue);
244 if (gtask->ta_flags & TASK_ENQUEUED) {
245 TQ_UNLOCK(queue);
246 return (0);
247 }
248 if (gtask->ta_flags & TASK_NOENQUEUE) {
249 TQ_UNLOCK(queue);
250 return (EAGAIN);
251 }
252 STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link);
253 gtask->ta_flags |= TASK_ENQUEUED;
254 TQ_UNLOCK(queue);
255 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
256 queue->tq_enqueue(queue->tq_context);
257 return (0);
258 }
259
260 static void
gtaskqueue_task_nop_fn(void * context)261 gtaskqueue_task_nop_fn(void *context)
262 {
263 }
264
265 /*
266 * Block until all currently queued tasks in this taskqueue
267 * have begun execution. Tasks queued during execution of
268 * this function are ignored.
269 */
270 static void
gtaskqueue_drain_tq_queue(struct gtaskqueue * queue)271 gtaskqueue_drain_tq_queue(struct gtaskqueue *queue)
272 {
273 struct gtask t_barrier;
274
275 if (STAILQ_EMPTY(&queue->tq_queue))
276 return;
277
278 /*
279 * Enqueue our barrier after all current tasks, but with
280 * the highest priority so that newly queued tasks cannot
281 * pass it. Because of the high priority, we can not use
282 * taskqueue_enqueue_locked directly (which drops the lock
283 * anyway) so just insert it at tail while we have the
284 * queue lock.
285 */
286 GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier);
287 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
288 t_barrier.ta_flags |= TASK_ENQUEUED;
289
290 /*
291 * Once the barrier has executed, all previously queued tasks
292 * have completed or are currently executing.
293 */
294 while (t_barrier.ta_flags & TASK_ENQUEUED)
295 TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0);
296 }
297
298 /*
299 * Block until all currently executing tasks for this taskqueue
300 * complete. Tasks that begin execution during the execution
301 * of this function are ignored.
302 */
303 static void
gtaskqueue_drain_tq_active(struct gtaskqueue * queue)304 gtaskqueue_drain_tq_active(struct gtaskqueue *queue)
305 {
306 struct gtaskqueue_busy tb_marker, *tb_first;
307
308 if (TAILQ_EMPTY(&queue->tq_active))
309 return;
310
311 /* Block taskq_terminate().*/
312 queue->tq_callouts++;
313
314 /*
315 * Wait for all currently executing taskqueue threads
316 * to go idle.
317 */
318 tb_marker.tb_running = TB_DRAIN_WAITER;
319 TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link);
320 while (TAILQ_FIRST(&queue->tq_active) != &tb_marker)
321 TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0);
322 TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link);
323
324 /*
325 * Wakeup any other drain waiter that happened to queue up
326 * without any intervening active thread.
327 */
328 tb_first = TAILQ_FIRST(&queue->tq_active);
329 if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER)
330 wakeup(tb_first);
331
332 /* Release taskqueue_terminate(). */
333 queue->tq_callouts--;
334 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
335 wakeup_one(queue->tq_threads);
336 }
337
338 void
gtaskqueue_block(struct gtaskqueue * queue)339 gtaskqueue_block(struct gtaskqueue *queue)
340 {
341
342 TQ_LOCK(queue);
343 queue->tq_flags |= TQ_FLAGS_BLOCKED;
344 TQ_UNLOCK(queue);
345 }
346
347 void
gtaskqueue_unblock(struct gtaskqueue * queue)348 gtaskqueue_unblock(struct gtaskqueue *queue)
349 {
350
351 TQ_LOCK(queue);
352 queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
353 if (!STAILQ_EMPTY(&queue->tq_queue))
354 queue->tq_enqueue(queue->tq_context);
355 TQ_UNLOCK(queue);
356 }
357
358 static void
gtaskqueue_run_locked(struct gtaskqueue * queue)359 gtaskqueue_run_locked(struct gtaskqueue *queue)
360 {
361 struct gtaskqueue_busy tb;
362 struct gtaskqueue_busy *tb_first;
363 struct gtask *gtask;
364
365 KASSERT(queue != NULL, ("tq is NULL"));
366 TQ_ASSERT_LOCKED(queue);
367 tb.tb_running = NULL;
368
369 while (STAILQ_FIRST(&queue->tq_queue)) {
370 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
371
372 /*
373 * Carefully remove the first task from the queue and
374 * clear its TASK_ENQUEUED flag
375 */
376 gtask = STAILQ_FIRST(&queue->tq_queue);
377 KASSERT(gtask != NULL, ("task is NULL"));
378 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
379 gtask->ta_flags &= ~TASK_ENQUEUED;
380 tb.tb_running = gtask;
381 TQ_UNLOCK(queue);
382
383 KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL"));
384 gtask->ta_func(gtask->ta_context);
385
386 TQ_LOCK(queue);
387 tb.tb_running = NULL;
388 wakeup(gtask);
389
390 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
391 tb_first = TAILQ_FIRST(&queue->tq_active);
392 if (tb_first != NULL &&
393 tb_first->tb_running == TB_DRAIN_WAITER)
394 wakeup(tb_first);
395 }
396 }
397
398 static int
task_is_running(struct gtaskqueue * queue,struct gtask * gtask)399 task_is_running(struct gtaskqueue *queue, struct gtask *gtask)
400 {
401 struct gtaskqueue_busy *tb;
402
403 TQ_ASSERT_LOCKED(queue);
404 TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
405 if (tb->tb_running == gtask)
406 return (1);
407 }
408 return (0);
409 }
410
411 static int
gtaskqueue_cancel_locked(struct gtaskqueue * queue,struct gtask * gtask)412 gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask)
413 {
414
415 if (gtask->ta_flags & TASK_ENQUEUED)
416 STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link);
417 gtask->ta_flags &= ~TASK_ENQUEUED;
418 return (task_is_running(queue, gtask) ? EBUSY : 0);
419 }
420
421 int
gtaskqueue_cancel(struct gtaskqueue * queue,struct gtask * gtask)422 gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask)
423 {
424 int error;
425
426 TQ_LOCK(queue);
427 error = gtaskqueue_cancel_locked(queue, gtask);
428 TQ_UNLOCK(queue);
429
430 return (error);
431 }
432
433 static void
gtaskqueue_drain_locked(struct gtaskqueue * queue,struct gtask * gtask)434 gtaskqueue_drain_locked(struct gtaskqueue *queue, struct gtask *gtask)
435 {
436 while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask))
437 TQ_SLEEP(queue, gtask, &queue->tq_mutex, PWAIT, "-", 0);
438 }
439
440 void
gtaskqueue_drain(struct gtaskqueue * queue,struct gtask * gtask)441 gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask)
442 {
443 #ifndef __HAIKU__
444 if (!queue->tq_spin)
445 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
446 #endif
447
448 TQ_LOCK(queue);
449 gtaskqueue_drain_locked(queue, gtask);
450 TQ_UNLOCK(queue);
451 }
452
453 void
gtaskqueue_drain_all(struct gtaskqueue * queue)454 gtaskqueue_drain_all(struct gtaskqueue *queue)
455 {
456 #ifndef __HAIKU__
457 if (!queue->tq_spin)
458 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
459 #endif
460
461 TQ_LOCK(queue);
462 gtaskqueue_drain_tq_queue(queue);
463 gtaskqueue_drain_tq_active(queue);
464 TQ_UNLOCK(queue);
465 }
466
467 static int
_gtaskqueue_start_threads(struct gtaskqueue ** tqp,int count,int pri,void * mask,const char * name,va_list ap)468 _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
469 void* mask, const char *name, va_list ap)
470 {
471 char ktname[19 + 1];
472 struct thread *td;
473 struct gtaskqueue *tq;
474 int i, error;
475
476 if (count <= 0)
477 return (EINVAL);
478
479 vsnprintf(ktname, sizeof(ktname), name, ap);
480 tq = *tqp;
481
482 tq->tq_threads = malloc(sizeof(struct thread *) * count, M_GTASKQUEUE,
483 M_NOWAIT | M_ZERO);
484 if (tq->tq_threads == NULL) {
485 printf("%s: no memory for %s threads\n", __func__, ktname);
486 return (ENOMEM);
487 }
488
489 for (i = 0; i < count; i++) {
490 if (count == 1)
491 error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
492 &tq->tq_threads[i], 0, 0, "%s", ktname);
493 else
494 error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
495 &tq->tq_threads[i], 0, 0,
496 "%s_%d", ktname, i);
497 if (error) {
498 /* should be ok to continue, taskqueue_free will dtrt */
499 printf("%s: kthread_add(%s): error %d", __func__,
500 ktname, error);
501 tq->tq_threads[i] = NULL; /* paranoid */
502 } else
503 tq->tq_tcount++;
504 }
505 for (i = 0; i < count; i++) {
506 if (tq->tq_threads[i] == NULL)
507 continue;
508 td = tq->tq_threads[i];
509 #ifndef __HAIKU__
510 if (mask) {
511 error = cpuset_setthread(td->td_tid, mask);
512 /*
513 * Failing to pin is rarely an actual fatal error;
514 * it'll just affect performance.
515 */
516 if (error)
517 printf("%s: curthread=%llu: can't pin; "
518 "error=%d\n",
519 __func__,
520 (unsigned long long) td->td_tid,
521 error);
522 }
523 #endif
524 thread_lock(td);
525 sched_prio(td, pri);
526 sched_add(td, SRQ_BORING);
527 thread_unlock(td);
528 }
529
530 return (0);
531 }
532
533 static int
gtaskqueue_start_threads(struct gtaskqueue ** tqp,int count,int pri,const char * name,...)534 gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
535 const char *name, ...)
536 {
537 va_list ap;
538 int error;
539
540 va_start(ap, name);
541 error = _gtaskqueue_start_threads(tqp, count, pri, NULL, name, ap);
542 va_end(ap);
543 return (error);
544 }
545
546 static inline void
gtaskqueue_run_callback(struct gtaskqueue * tq,enum taskqueue_callback_type cb_type)547 gtaskqueue_run_callback(struct gtaskqueue *tq,
548 enum taskqueue_callback_type cb_type)
549 {
550 taskqueue_callback_fn tq_callback;
551
552 TQ_ASSERT_UNLOCKED(tq);
553 tq_callback = tq->tq_callbacks[cb_type];
554 if (tq_callback != NULL)
555 tq_callback(tq->tq_cb_contexts[cb_type]);
556 }
557
558 static void
gtaskqueue_thread_loop(void * arg)559 gtaskqueue_thread_loop(void *arg)
560 {
561 struct gtaskqueue **tqp, *tq;
562
563 tqp = arg;
564 tq = *tqp;
565 gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
566 TQ_LOCK(tq);
567 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
568 /* XXX ? */
569 gtaskqueue_run_locked(tq);
570 /*
571 * Because taskqueue_run() can drop tq_mutex, we need to
572 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
573 * meantime, which means we missed a wakeup.
574 */
575 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
576 break;
577 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
578 }
579 gtaskqueue_run_locked(tq);
580 /*
581 * This thread is on its way out, so just drop the lock temporarily
582 * in order to call the shutdown callback. This allows the callback
583 * to look at the taskqueue, even just before it dies.
584 */
585 TQ_UNLOCK(tq);
586 gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
587 TQ_LOCK(tq);
588
589 /* rendezvous with thread that asked us to terminate */
590 tq->tq_tcount--;
591 wakeup_one(tq->tq_threads);
592 TQ_UNLOCK(tq);
593 kthread_exit();
594 }
595
596 static void
gtaskqueue_thread_enqueue(void * context)597 gtaskqueue_thread_enqueue(void *context)
598 {
599 struct gtaskqueue **tqp, *tq;
600
601 tqp = context;
602 tq = *tqp;
603 wakeup_one(tq);
604 }
605
606
607 static struct gtaskqueue *
gtaskqueue_create_fast(const char * name,int mflags,taskqueue_enqueue_fn enqueue,void * context)608 gtaskqueue_create_fast(const char *name, int mflags,
609 taskqueue_enqueue_fn enqueue, void *context)
610 {
611 return _gtaskqueue_create(name, mflags, enqueue, context,
612 MTX_SPIN, "fast_taskqueue");
613 }
614
615
616 struct taskqgroup_cpu {
617 LIST_HEAD(, grouptask) tgc_tasks;
618 struct gtaskqueue *tgc_taskq;
619 int tgc_cnt;
620 int tgc_cpu;
621 };
622
623 struct taskqgroup {
624 struct taskqgroup_cpu tqg_queue[MAXCPU];
625 struct mtx tqg_lock;
626 const char * tqg_name;
627 int tqg_adjusting;
628 int tqg_stride;
629 int tqg_cnt;
630 };
631
632 struct taskq_bind_task {
633 struct gtask bt_task;
634 int bt_cpuid;
635 };
636
637 static void
taskqgroup_cpu_create(struct taskqgroup * qgroup,int idx,int cpu)638 taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx, int cpu)
639 {
640 struct taskqgroup_cpu *qcpu;
641
642 qcpu = &qgroup->tqg_queue[idx];
643 LIST_INIT(&qcpu->tgc_tasks);
644 qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK,
645 gtaskqueue_thread_enqueue, &qcpu->tgc_taskq);
646 MPASS(qcpu->tgc_taskq);
647 gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT,
648 "%s_%d", qgroup->tqg_name, idx);
649 qcpu->tgc_cpu = cpu;
650 }
651
652 static void
taskqgroup_cpu_remove(struct taskqgroup * qgroup,int idx)653 taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx)
654 {
655
656 gtaskqueue_free(qgroup->tqg_queue[idx].tgc_taskq);
657 }
658
659 /*
660 * Find the taskq with least # of tasks that doesn't currently have any
661 * other queues from the uniq identifier.
662 */
663 static int
taskqgroup_find(struct taskqgroup * qgroup,void * uniq)664 taskqgroup_find(struct taskqgroup *qgroup, void *uniq)
665 {
666 struct grouptask *n;
667 int i, idx, mincnt;
668 int strict;
669
670 mtx_assert(&qgroup->tqg_lock, MA_OWNED);
671 #ifndef __HAIKU__
672 if (qgroup->tqg_cnt == 0)
673 #else
674 KASSERT(qgroup->tqg_cnt > 0, ("qgroup(%p)->tqg_cnt is %d!", qgroup, qgroup->tqg_cnt));
675 if (qgroup->tqg_cnt == 1)
676 #endif
677 return (0);
678 idx = -1;
679 mincnt = INT_MAX;
680 /*
681 * Two passes; First scan for a queue with the least tasks that
682 * does not already service this uniq id. If that fails simply find
683 * the queue with the least total tasks;
684 */
685 for (strict = 1; mincnt == INT_MAX; strict = 0) {
686 for (i = 0; i < qgroup->tqg_cnt; i++) {
687 if (qgroup->tqg_queue[i].tgc_cnt > mincnt)
688 continue;
689 if (strict) {
690 LIST_FOREACH(n,
691 &qgroup->tqg_queue[i].tgc_tasks, gt_list)
692 if (n->gt_uniq == uniq)
693 break;
694 if (n != NULL)
695 continue;
696 }
697 mincnt = qgroup->tqg_queue[i].tgc_cnt;
698 idx = i;
699 }
700 }
701 if (idx == -1)
702 panic("%s: failed to pick a qid.", __func__);
703
704 return (idx);
705 }
706
707 /*
708 * smp_started is unusable since it is not set for UP kernels or even for
709 * SMP kernels when there is 1 CPU. This is usually handled by adding a
710 * (mp_ncpus == 1) test, but that would be broken here since we need to
711 * to synchronize with the SI_SUB_SMP ordering. Even in the pure SMP case
712 * smp_started only gives a fuzzy ordering relative to SI_SUB_SMP.
713 *
714 * So maintain our own flag. It must be set after all CPUs are started
715 * and before SI_SUB_SMP:SI_ORDER_ANY so that the SYSINIT for delayed
716 * adjustment is properly delayed. SI_ORDER_FOURTH is clearly before
717 * SI_ORDER_ANY and unclearly after the CPUs are started. It would be
718 * simpler for adjustment to pass a flag indicating if it is delayed.
719 */
720
721 static int tqg_smp_started;
722
723 static void
tqg_record_smp_started(void * arg)724 tqg_record_smp_started(void *arg)
725 {
726 tqg_smp_started = 1;
727 }
728
729 SYSINIT(tqg_record_smp_started, SI_SUB_SMP, SI_ORDER_FOURTH,
730 tqg_record_smp_started, NULL);
731
732 void
taskqgroup_attach(struct taskqgroup * qgroup,struct grouptask * gtask,void * uniq,device_t dev,struct resource * irq,const char * name)733 taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask,
734 void *uniq, device_t dev, struct resource *irq, const char *name)
735 {
736 int cpu, qid, error;
737
738 gtask->gt_uniq = uniq;
739 snprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask");
740 gtask->gt_dev = dev;
741 gtask->gt_irq = irq;
742 gtask->gt_cpu = -1;
743 mtx_lock(&qgroup->tqg_lock);
744 qid = taskqgroup_find(qgroup, uniq);
745 qgroup->tqg_queue[qid].tgc_cnt++;
746 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
747 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
748 if (dev != NULL && irq != NULL && tqg_smp_started) {
749 cpu = qgroup->tqg_queue[qid].tgc_cpu;
750 gtask->gt_cpu = cpu;
751 mtx_unlock(&qgroup->tqg_lock);
752 error = bus_bind_intr(dev, irq, cpu);
753 if (error)
754 printf("%s: binding interrupt failed for %s: %d\n",
755 __func__, gtask->gt_name, error);
756 } else
757 mtx_unlock(&qgroup->tqg_lock);
758 }
759
760 static void
taskqgroup_attach_deferred(struct taskqgroup * qgroup,struct grouptask * gtask)761 taskqgroup_attach_deferred(struct taskqgroup *qgroup, struct grouptask *gtask)
762 {
763 int qid, cpu, error;
764
765 mtx_lock(&qgroup->tqg_lock);
766 qid = taskqgroup_find(qgroup, gtask->gt_uniq);
767 cpu = qgroup->tqg_queue[qid].tgc_cpu;
768 if (gtask->gt_dev != NULL && gtask->gt_irq != NULL) {
769 mtx_unlock(&qgroup->tqg_lock);
770 error = bus_bind_intr(gtask->gt_dev, gtask->gt_irq, cpu);
771 mtx_lock(&qgroup->tqg_lock);
772 if (error)
773 printf("%s: binding interrupt failed for %s: %d\n",
774 __func__, gtask->gt_name, error);
775
776 }
777 qgroup->tqg_queue[qid].tgc_cnt++;
778 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
779 MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL);
780 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
781 mtx_unlock(&qgroup->tqg_lock);
782 }
783
784 int
taskqgroup_attach_cpu(struct taskqgroup * qgroup,struct grouptask * gtask,void * uniq,int cpu,device_t dev,struct resource * irq,const char * name)785 taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask,
786 void *uniq, int cpu, device_t dev, struct resource *irq, const char *name)
787 {
788 int i, qid, error;
789
790 qid = -1;
791 gtask->gt_uniq = uniq;
792 snprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask");
793 gtask->gt_dev = dev;
794 gtask->gt_irq = irq;
795 gtask->gt_cpu = cpu;
796 mtx_lock(&qgroup->tqg_lock);
797 if (tqg_smp_started) {
798 for (i = 0; i < qgroup->tqg_cnt; i++)
799 if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
800 qid = i;
801 break;
802 }
803 if (qid == -1) {
804 mtx_unlock(&qgroup->tqg_lock);
805 printf("%s: qid not found for %s cpu=%d\n", __func__, gtask->gt_name, cpu);
806 return (EINVAL);
807 }
808 } else
809 qid = 0;
810 qgroup->tqg_queue[qid].tgc_cnt++;
811 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
812 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
813 cpu = qgroup->tqg_queue[qid].tgc_cpu;
814 mtx_unlock(&qgroup->tqg_lock);
815
816 if (dev != NULL && irq != NULL && tqg_smp_started) {
817 error = bus_bind_intr(dev, irq, cpu);
818 if (error)
819 printf("%s: binding interrupt failed for %s: %d\n",
820 __func__, gtask->gt_name, error);
821 }
822 return (0);
823 }
824
825 static int
taskqgroup_attach_cpu_deferred(struct taskqgroup * qgroup,struct grouptask * gtask)826 taskqgroup_attach_cpu_deferred(struct taskqgroup *qgroup, struct grouptask *gtask)
827 {
828 device_t dev;
829 struct resource *irq;
830 int cpu, error, i, qid;
831
832 qid = -1;
833 dev = gtask->gt_dev;
834 irq = gtask->gt_irq;
835 cpu = gtask->gt_cpu;
836 MPASS(tqg_smp_started);
837 mtx_lock(&qgroup->tqg_lock);
838 for (i = 0; i < qgroup->tqg_cnt; i++)
839 if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
840 qid = i;
841 break;
842 }
843 if (qid == -1) {
844 mtx_unlock(&qgroup->tqg_lock);
845 printf("%s: qid not found for %s cpu=%d\n", __func__, gtask->gt_name, cpu);
846 return (EINVAL);
847 }
848 qgroup->tqg_queue[qid].tgc_cnt++;
849 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
850 MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL);
851 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
852 mtx_unlock(&qgroup->tqg_lock);
853
854 if (dev != NULL && irq != NULL) {
855 error = bus_bind_intr(dev, irq, cpu);
856 if (error)
857 printf("%s: binding interrupt failed for %s: %d\n",
858 __func__, gtask->gt_name, error);
859 }
860 return (0);
861 }
862
863 void
taskqgroup_detach(struct taskqgroup * qgroup,struct grouptask * gtask)864 taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask)
865 {
866 int i;
867
868 grouptask_block(gtask);
869 mtx_lock(&qgroup->tqg_lock);
870 for (i = 0; i < qgroup->tqg_cnt; i++)
871 if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue)
872 break;
873 if (i == qgroup->tqg_cnt)
874 panic("%s: task %s not in group", __func__, gtask->gt_name);
875 qgroup->tqg_queue[i].tgc_cnt--;
876 LIST_REMOVE(gtask, gt_list);
877 mtx_unlock(&qgroup->tqg_lock);
878 gtask->gt_taskqueue = NULL;
879 gtask->gt_task.ta_flags &= ~TASK_NOENQUEUE;
880 }
881
882 static void
taskqgroup_binder(void * ctx)883 taskqgroup_binder(void *ctx)
884 {
885 struct taskq_bind_task *gtask = (struct taskq_bind_task *)ctx;
886 #ifndef __HAIKU__
887 cpuset_t mask;
888 int error;
889
890 CPU_ZERO(&mask);
891 CPU_SET(gtask->bt_cpuid, &mask);
892 error = cpuset_setthread(curthread->td_tid, &mask);
893 thread_lock(curthread);
894 sched_bind(curthread, gtask->bt_cpuid);
895 thread_unlock(curthread);
896
897 if (error)
898 printf("%s: binding curthread failed: %d\n", __func__, error);
899 #endif
900 free(gtask, M_DEVBUF);
901 }
902
903 static void
taskqgroup_bind(struct taskqgroup * qgroup)904 taskqgroup_bind(struct taskqgroup *qgroup)
905 {
906 struct taskq_bind_task *gtask;
907 int i;
908
909 /*
910 * Bind taskqueue threads to specific CPUs, if they have been assigned
911 * one.
912 */
913 if (qgroup->tqg_cnt == 1)
914 return;
915
916 for (i = 0; i < qgroup->tqg_cnt; i++) {
917 gtask = malloc(sizeof (*gtask), M_DEVBUF, M_WAITOK);
918 GTASK_INIT(>ask->bt_task, 0, 0, taskqgroup_binder, gtask);
919 gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu;
920 grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq,
921 >ask->bt_task);
922 }
923 }
924
925 static void
taskqgroup_config_init(void * arg)926 taskqgroup_config_init(void *arg)
927 {
928 struct taskqgroup *qgroup = qgroup_config;
929 LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL);
930
931 LIST_SWAP(>ask_head, &qgroup->tqg_queue[0].tgc_tasks,
932 grouptask, gt_list);
933 qgroup->tqg_queue[0].tgc_cnt = 0;
934 taskqgroup_cpu_create(qgroup, 0, 0);
935
936 qgroup->tqg_cnt = 1;
937 qgroup->tqg_stride = 1;
938 }
939
940 SYSINIT(taskqgroup_config_init, SI_SUB_TASKQ, SI_ORDER_SECOND,
941 taskqgroup_config_init, NULL);
942
943 static int
_taskqgroup_adjust(struct taskqgroup * qgroup,int cnt,int stride)944 _taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
945 {
946 LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL);
947 struct grouptask *gtask;
948 int i, k, old_cnt, old_cpu, cpu;
949
950 mtx_assert(&qgroup->tqg_lock, MA_OWNED);
951
952 if (cnt < 1 || cnt * stride > mp_ncpus || !tqg_smp_started) {
953 printf("%s: failed cnt: %d stride: %d "
954 "mp_ncpus: %d tqg_smp_started: %d\n",
955 __func__, cnt, stride, mp_ncpus, tqg_smp_started);
956 return (EINVAL);
957 }
958 if (qgroup->tqg_adjusting) {
959 printf("%s failed: adjusting\n", __func__);
960 return (EBUSY);
961 }
962 qgroup->tqg_adjusting = 1;
963 old_cnt = qgroup->tqg_cnt;
964 old_cpu = 0;
965 if (old_cnt < cnt)
966 old_cpu = qgroup->tqg_queue[old_cnt].tgc_cpu;
967 mtx_unlock(&qgroup->tqg_lock);
968 /*
969 * Set up queue for tasks added before boot.
970 */
971 if (old_cnt == 0) {
972 LIST_SWAP(>ask_head, &qgroup->tqg_queue[0].tgc_tasks,
973 grouptask, gt_list);
974 qgroup->tqg_queue[0].tgc_cnt = 0;
975 }
976
977 /*
978 * If new taskq threads have been added.
979 */
980 cpu = old_cpu;
981 for (i = old_cnt; i < cnt; i++) {
982 taskqgroup_cpu_create(qgroup, i, cpu);
983
984 for (k = 0; k < stride; k++)
985 cpu = CPU_NEXT(cpu);
986 }
987 mtx_lock(&qgroup->tqg_lock);
988 qgroup->tqg_cnt = cnt;
989 qgroup->tqg_stride = stride;
990
991 /*
992 * Adjust drivers to use new taskqs.
993 */
994 for (i = 0; i < old_cnt; i++) {
995 while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) {
996 LIST_REMOVE(gtask, gt_list);
997 qgroup->tqg_queue[i].tgc_cnt--;
998 LIST_INSERT_HEAD(>ask_head, gtask, gt_list);
999 }
1000 }
1001 mtx_unlock(&qgroup->tqg_lock);
1002
1003 while ((gtask = LIST_FIRST(>ask_head))) {
1004 LIST_REMOVE(gtask, gt_list);
1005 if (gtask->gt_cpu == -1)
1006 taskqgroup_attach_deferred(qgroup, gtask);
1007 else if (taskqgroup_attach_cpu_deferred(qgroup, gtask))
1008 taskqgroup_attach_deferred(qgroup, gtask);
1009 }
1010
1011 #ifdef INVARIANTS
1012 mtx_lock(&qgroup->tqg_lock);
1013 for (i = 0; i < qgroup->tqg_cnt; i++) {
1014 MPASS(qgroup->tqg_queue[i].tgc_taskq != NULL);
1015 LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list)
1016 MPASS(gtask->gt_taskqueue != NULL);
1017 }
1018 mtx_unlock(&qgroup->tqg_lock);
1019 #endif
1020 /*
1021 * If taskq thread count has been reduced.
1022 */
1023 for (i = cnt; i < old_cnt; i++)
1024 taskqgroup_cpu_remove(qgroup, i);
1025
1026 taskqgroup_bind(qgroup);
1027
1028 mtx_lock(&qgroup->tqg_lock);
1029 qgroup->tqg_adjusting = 0;
1030
1031 return (0);
1032 }
1033
1034 int
taskqgroup_adjust(struct taskqgroup * qgroup,int cnt,int stride)1035 taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
1036 {
1037 int error;
1038
1039 mtx_lock(&qgroup->tqg_lock);
1040 error = _taskqgroup_adjust(qgroup, cnt, stride);
1041 mtx_unlock(&qgroup->tqg_lock);
1042
1043 return (error);
1044 }
1045
1046 struct taskqgroup *
taskqgroup_create(const char * name)1047 taskqgroup_create(const char *name)
1048 {
1049 struct taskqgroup *qgroup;
1050
1051 qgroup = malloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO);
1052 mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF);
1053 qgroup->tqg_name = name;
1054 LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks);
1055
1056 return (qgroup);
1057 }
1058
1059 void
taskqgroup_destroy(struct taskqgroup * qgroup)1060 taskqgroup_destroy(struct taskqgroup *qgroup)
1061 {
1062
1063 }
1064
1065 void
taskqgroup_config_gtask_init(void * ctx,struct grouptask * gtask,gtask_fn_t * fn,const char * name)1066 taskqgroup_config_gtask_init(void *ctx, struct grouptask *gtask, gtask_fn_t *fn,
1067 const char *name)
1068 {
1069
1070 GROUPTASK_INIT(gtask, 0, fn, ctx);
1071 taskqgroup_attach(qgroup_config, gtask, gtask, NULL, NULL, name);
1072 }
1073
1074 void
taskqgroup_config_gtask_deinit(struct grouptask * gtask)1075 taskqgroup_config_gtask_deinit(struct grouptask *gtask)
1076 {
1077
1078 taskqgroup_detach(qgroup_config, gtask);
1079 }
1080