1 /*- 2 * Copyright (c) 2000 Doug Rabson 3 * Copyright (c) 2014 Jeff Roberson 4 * Copyright (c) 2016 Matthew Macy 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/cdefs.h> 30 __FBSDID("$FreeBSD$"); 31 32 #include <sys/param.h> 33 #include <sys/systm.h> 34 #include <sys/bus.h> 35 #ifndef __HAIKU__ 36 #include <sys/cpuset.h> 37 #include <sys/interrupt.h> 38 #endif 39 #include <sys/kernel.h> 40 #include <sys/kthread.h> 41 #include <sys/libkern.h> 42 #include <sys/limits.h> 43 #include <sys/lock.h> 44 #include <sys/malloc.h> 45 #include <sys/mutex.h> 46 #include <sys/proc.h> 47 #ifndef __HAIKU__ 48 #include <sys/sched.h> 49 #endif 50 #include <sys/smp.h> 51 #include <sys/gtaskqueue.h> 52 #ifndef __HAIKU__ 53 #include <sys/unistd.h> 54 #endif 55 #include <machine/stdarg.h> 56 57 static MALLOC_DEFINE(M_GTASKQUEUE, "gtaskqueue", "Group Task Queues"); 58 static void gtaskqueue_thread_enqueue(void *); 59 static void gtaskqueue_thread_loop(void *arg); 60 static int task_is_running(struct gtaskqueue *queue, struct gtask *gtask); 61 static void gtaskqueue_drain_locked(struct gtaskqueue *queue, struct gtask *gtask); 62 63 TASKQGROUP_DEFINE(softirq, mp_ncpus, 1); 64 TASKQGROUP_DEFINE(config, 1, 1); 65 66 struct gtaskqueue_busy { 67 struct gtask *tb_running; 68 TAILQ_ENTRY(gtaskqueue_busy) tb_link; 69 }; 70 71 static struct gtask * const TB_DRAIN_WAITER = (struct gtask *)0x1; 72 73 typedef void (*gtaskqueue_enqueue_fn)(void *context); 74 75 struct gtaskqueue { 76 STAILQ_HEAD(, gtask) tq_queue; 77 gtaskqueue_enqueue_fn tq_enqueue; 78 void *tq_context; 79 char *tq_name; 80 TAILQ_HEAD(, gtaskqueue_busy) tq_active; 81 struct mtx tq_mutex; 82 #ifdef __HAIKU__ 83 sem_id tq_sem; 84 #endif 85 struct thread **tq_threads; 86 int tq_tcount; 87 int tq_spin; 88 int tq_flags; 89 int tq_callouts; 90 taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 91 void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 92 }; 93 94 #define TQ_FLAGS_ACTIVE (1 << 0) 95 #define TQ_FLAGS_BLOCKED (1 << 1) 96 #define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 97 98 #define DT_CALLOUT_ARMED (1 << 0) 99 100 #define TQ_LOCK(tq) \ 101 do { \ 102 if ((tq)->tq_spin) \ 103 mtx_lock_spin(&(tq)->tq_mutex); \ 104 else \ 105 mtx_lock(&(tq)->tq_mutex); \ 106 } while (0) 107 #define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) 108 109 #define TQ_UNLOCK(tq) \ 110 do { \ 111 if ((tq)->tq_spin) \ 112 mtx_unlock_spin(&(tq)->tq_mutex); \ 113 else \ 114 mtx_unlock(&(tq)->tq_mutex); \ 115 } while (0) 116 #define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) 117 118 #ifdef INVARIANTS 119 static void 120 gtask_dump(struct gtask *gtask) 121 { 122 printf("gtask: %p ta_flags=%x ta_priority=%d ta_func=%p ta_context=%p\n", 123 gtask, gtask->ta_flags, gtask->ta_priority, gtask->ta_func, gtask->ta_context); 124 } 125 #endif 126 127 static __inline int 128 TQ_SLEEP(struct gtaskqueue *tq, void *p, struct mtx *m, int pri, const char *wm, 129 int t) 130 { 131 if (tq->tq_spin) 132 return (msleep_spin(p, m, wm, t)); 133 return (msleep(p, m, pri, wm, t)); 134 } 135 136 static struct gtaskqueue * 137 _gtaskqueue_create(const char *name, int mflags, 138 taskqueue_enqueue_fn enqueue, void *context, 139 int mtxflags, const char *mtxname __unused) 140 { 141 struct gtaskqueue *queue; 142 char *tq_name; 143 144 tq_name = malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO); 145 if (!tq_name) 146 return (NULL); 147 148 snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue"); 149 150 queue = malloc(sizeof(struct gtaskqueue), M_GTASKQUEUE, mflags | M_ZERO); 151 if (!queue) { 152 free(tq_name, M_GTASKQUEUE); 153 return (NULL); 154 } 155 156 STAILQ_INIT(&queue->tq_queue); 157 TAILQ_INIT(&queue->tq_active); 158 queue->tq_enqueue = enqueue; 159 queue->tq_context = context; 160 queue->tq_name = tq_name; 161 queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 162 queue->tq_flags |= TQ_FLAGS_ACTIVE; 163 if (enqueue == gtaskqueue_thread_enqueue) 164 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 165 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); 166 #ifdef __HAIKU__ 167 queue->tq_sem = create_sem(0, tq_name); 168 #endif 169 170 return (queue); 171 } 172 173 174 /* 175 * Signal a taskqueue thread to terminate. 176 */ 177 static void 178 gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq) 179 { 180 181 while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 182 wakeup(tq); 183 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0); 184 } 185 } 186 187 static void 188 gtaskqueue_free(struct gtaskqueue *queue) 189 { 190 191 TQ_LOCK(queue); 192 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 193 gtaskqueue_terminate(queue->tq_threads, queue); 194 KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?")); 195 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 196 mtx_destroy(&queue->tq_mutex); 197 #ifdef __HAIKU__ 198 delete_sem(queue->tq_sem); 199 #endif 200 free(queue->tq_threads, M_GTASKQUEUE); 201 free(queue->tq_name, M_GTASKQUEUE); 202 free(queue, M_GTASKQUEUE); 203 } 204 205 /* 206 * Wait for all to complete, then prevent it from being enqueued 207 */ 208 void 209 grouptask_block(struct grouptask *grouptask) 210 { 211 struct gtaskqueue *queue = grouptask->gt_taskqueue; 212 struct gtask *gtask = &grouptask->gt_task; 213 214 #ifdef INVARIANTS 215 if (queue == NULL) { 216 gtask_dump(gtask); 217 panic("queue == NULL"); 218 } 219 #endif 220 TQ_LOCK(queue); 221 gtask->ta_flags |= TASK_NOENQUEUE; 222 gtaskqueue_drain_locked(queue, gtask); 223 TQ_UNLOCK(queue); 224 } 225 226 void 227 grouptask_unblock(struct grouptask *grouptask) 228 { 229 struct gtaskqueue *queue = grouptask->gt_taskqueue; 230 struct gtask *gtask = &grouptask->gt_task; 231 232 #ifdef INVARIANTS 233 if (queue == NULL) { 234 gtask_dump(gtask); 235 panic("queue == NULL"); 236 } 237 #endif 238 TQ_LOCK(queue); 239 gtask->ta_flags &= ~TASK_NOENQUEUE; 240 TQ_UNLOCK(queue); 241 } 242 243 int 244 grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask) 245 { 246 #ifdef INVARIANTS 247 if (queue == NULL) { 248 gtask_dump(gtask); 249 panic("queue == NULL"); 250 } 251 #endif 252 TQ_LOCK(queue); 253 if (gtask->ta_flags & TASK_ENQUEUED) { 254 TQ_UNLOCK(queue); 255 return (0); 256 } 257 if (gtask->ta_flags & TASK_NOENQUEUE) { 258 TQ_UNLOCK(queue); 259 return (EAGAIN); 260 } 261 STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link); 262 gtask->ta_flags |= TASK_ENQUEUED; 263 TQ_UNLOCK(queue); 264 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 265 queue->tq_enqueue(queue->tq_context); 266 return (0); 267 } 268 269 static void 270 gtaskqueue_task_nop_fn(void *context) 271 { 272 } 273 274 /* 275 * Block until all currently queued tasks in this taskqueue 276 * have begun execution. Tasks queued during execution of 277 * this function are ignored. 278 */ 279 static void 280 gtaskqueue_drain_tq_queue(struct gtaskqueue *queue) 281 { 282 struct gtask t_barrier; 283 284 if (STAILQ_EMPTY(&queue->tq_queue)) 285 return; 286 287 /* 288 * Enqueue our barrier after all current tasks, but with 289 * the highest priority so that newly queued tasks cannot 290 * pass it. Because of the high priority, we can not use 291 * taskqueue_enqueue_locked directly (which drops the lock 292 * anyway) so just insert it at tail while we have the 293 * queue lock. 294 */ 295 GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier); 296 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); 297 t_barrier.ta_flags |= TASK_ENQUEUED; 298 299 /* 300 * Once the barrier has executed, all previously queued tasks 301 * have completed or are currently executing. 302 */ 303 while (t_barrier.ta_flags & TASK_ENQUEUED) 304 TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0); 305 } 306 307 /* 308 * Block until all currently executing tasks for this taskqueue 309 * complete. Tasks that begin execution during the execution 310 * of this function are ignored. 311 */ 312 static void 313 gtaskqueue_drain_tq_active(struct gtaskqueue *queue) 314 { 315 struct gtaskqueue_busy tb_marker, *tb_first; 316 317 if (TAILQ_EMPTY(&queue->tq_active)) 318 return; 319 320 /* Block taskq_terminate().*/ 321 queue->tq_callouts++; 322 323 /* 324 * Wait for all currently executing taskqueue threads 325 * to go idle. 326 */ 327 tb_marker.tb_running = TB_DRAIN_WAITER; 328 TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link); 329 while (TAILQ_FIRST(&queue->tq_active) != &tb_marker) 330 TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0); 331 TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link); 332 333 /* 334 * Wakeup any other drain waiter that happened to queue up 335 * without any intervening active thread. 336 */ 337 tb_first = TAILQ_FIRST(&queue->tq_active); 338 if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER) 339 wakeup(tb_first); 340 341 /* Release taskqueue_terminate(). */ 342 queue->tq_callouts--; 343 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) 344 wakeup_one(queue->tq_threads); 345 } 346 347 void 348 gtaskqueue_block(struct gtaskqueue *queue) 349 { 350 351 TQ_LOCK(queue); 352 queue->tq_flags |= TQ_FLAGS_BLOCKED; 353 TQ_UNLOCK(queue); 354 } 355 356 void 357 gtaskqueue_unblock(struct gtaskqueue *queue) 358 { 359 360 TQ_LOCK(queue); 361 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 362 if (!STAILQ_EMPTY(&queue->tq_queue)) 363 queue->tq_enqueue(queue->tq_context); 364 TQ_UNLOCK(queue); 365 } 366 367 static void 368 gtaskqueue_run_locked(struct gtaskqueue *queue) 369 { 370 struct gtaskqueue_busy tb; 371 struct gtaskqueue_busy *tb_first; 372 struct gtask *gtask; 373 374 KASSERT(queue != NULL, ("tq is NULL")); 375 TQ_ASSERT_LOCKED(queue); 376 tb.tb_running = NULL; 377 378 while (STAILQ_FIRST(&queue->tq_queue)) { 379 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); 380 381 /* 382 * Carefully remove the first task from the queue and 383 * clear its TASK_ENQUEUED flag 384 */ 385 gtask = STAILQ_FIRST(&queue->tq_queue); 386 KASSERT(gtask != NULL, ("task is NULL")); 387 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 388 gtask->ta_flags &= ~TASK_ENQUEUED; 389 tb.tb_running = gtask; 390 TQ_UNLOCK(queue); 391 392 KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL")); 393 gtask->ta_func(gtask->ta_context); 394 395 TQ_LOCK(queue); 396 tb.tb_running = NULL; 397 wakeup(gtask); 398 399 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); 400 tb_first = TAILQ_FIRST(&queue->tq_active); 401 if (tb_first != NULL && 402 tb_first->tb_running == TB_DRAIN_WAITER) 403 wakeup(tb_first); 404 } 405 } 406 407 static int 408 task_is_running(struct gtaskqueue *queue, struct gtask *gtask) 409 { 410 struct gtaskqueue_busy *tb; 411 412 TQ_ASSERT_LOCKED(queue); 413 TAILQ_FOREACH(tb, &queue->tq_active, tb_link) { 414 if (tb->tb_running == gtask) 415 return (1); 416 } 417 return (0); 418 } 419 420 static int 421 gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask) 422 { 423 424 if (gtask->ta_flags & TASK_ENQUEUED) 425 STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link); 426 gtask->ta_flags &= ~TASK_ENQUEUED; 427 return (task_is_running(queue, gtask) ? EBUSY : 0); 428 } 429 430 int 431 gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask) 432 { 433 int error; 434 435 TQ_LOCK(queue); 436 error = gtaskqueue_cancel_locked(queue, gtask); 437 TQ_UNLOCK(queue); 438 439 return (error); 440 } 441 442 static void 443 gtaskqueue_drain_locked(struct gtaskqueue *queue, struct gtask *gtask) 444 { 445 while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask)) 446 TQ_SLEEP(queue, gtask, &queue->tq_mutex, PWAIT, "-", 0); 447 } 448 449 void 450 gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask) 451 { 452 #ifndef __HAIKU__ 453 if (!queue->tq_spin) 454 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 455 #endif 456 457 TQ_LOCK(queue); 458 gtaskqueue_drain_locked(queue, gtask); 459 TQ_UNLOCK(queue); 460 } 461 462 void 463 gtaskqueue_drain_all(struct gtaskqueue *queue) 464 { 465 #ifndef __HAIKU__ 466 if (!queue->tq_spin) 467 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 468 #endif 469 470 TQ_LOCK(queue); 471 gtaskqueue_drain_tq_queue(queue); 472 gtaskqueue_drain_tq_active(queue); 473 TQ_UNLOCK(queue); 474 } 475 476 static int 477 _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, 478 void* mask, const char *name, va_list ap) 479 { 480 char ktname[19 + 1]; 481 struct thread *td; 482 struct gtaskqueue *tq; 483 int i, error; 484 485 if (count <= 0) 486 return (EINVAL); 487 488 vsnprintf(ktname, sizeof(ktname), name, ap); 489 tq = *tqp; 490 491 tq->tq_threads = malloc(sizeof(struct thread *) * count, M_GTASKQUEUE, 492 M_NOWAIT | M_ZERO); 493 if (tq->tq_threads == NULL) { 494 printf("%s: no memory for %s threads\n", __func__, ktname); 495 return (ENOMEM); 496 } 497 498 for (i = 0; i < count; i++) { 499 if (count == 1) 500 error = kthread_add(gtaskqueue_thread_loop, tqp, NULL, 501 &tq->tq_threads[i], 0, 0, "%s", ktname); 502 else 503 error = kthread_add(gtaskqueue_thread_loop, tqp, NULL, 504 &tq->tq_threads[i], 0, 0, 505 "%s_%d", ktname, i); 506 if (error) { 507 /* should be ok to continue, taskqueue_free will dtrt */ 508 printf("%s: kthread_add(%s): error %d", __func__, 509 ktname, error); 510 tq->tq_threads[i] = NULL; /* paranoid */ 511 } else 512 tq->tq_tcount++; 513 } 514 for (i = 0; i < count; i++) { 515 if (tq->tq_threads[i] == NULL) 516 continue; 517 td = tq->tq_threads[i]; 518 #ifndef __HAIKU__ 519 if (mask) { 520 error = cpuset_setthread(td->td_tid, mask); 521 /* 522 * Failing to pin is rarely an actual fatal error; 523 * it'll just affect performance. 524 */ 525 if (error) 526 printf("%s: curthread=%llu: can't pin; " 527 "error=%d\n", 528 __func__, 529 (unsigned long long) td->td_tid, 530 error); 531 } 532 #endif 533 thread_lock(td); 534 sched_prio(td, pri); 535 sched_add(td, SRQ_BORING); 536 thread_unlock(td); 537 } 538 539 return (0); 540 } 541 542 static int 543 gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, 544 const char *name, ...) 545 { 546 va_list ap; 547 int error; 548 549 va_start(ap, name); 550 error = _gtaskqueue_start_threads(tqp, count, pri, NULL, name, ap); 551 va_end(ap); 552 return (error); 553 } 554 555 static inline void 556 gtaskqueue_run_callback(struct gtaskqueue *tq, 557 enum taskqueue_callback_type cb_type) 558 { 559 taskqueue_callback_fn tq_callback; 560 561 TQ_ASSERT_UNLOCKED(tq); 562 tq_callback = tq->tq_callbacks[cb_type]; 563 if (tq_callback != NULL) 564 tq_callback(tq->tq_cb_contexts[cb_type]); 565 } 566 567 static void 568 gtaskqueue_thread_loop(void *arg) 569 { 570 struct gtaskqueue **tqp, *tq; 571 572 tqp = arg; 573 tq = *tqp; 574 gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 575 TQ_LOCK(tq); 576 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 577 /* XXX ? */ 578 gtaskqueue_run_locked(tq); 579 /* 580 * Because taskqueue_run() can drop tq_mutex, we need to 581 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 582 * meantime, which means we missed a wakeup. 583 */ 584 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 585 break; 586 #ifndef __HAIKU__ 587 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0); 588 #else 589 TQ_UNLOCK(tq); 590 acquire_sem(tq->tq_sem); 591 TQ_LOCK(tq); 592 #endif 593 } 594 gtaskqueue_run_locked(tq); 595 /* 596 * This thread is on its way out, so just drop the lock temporarily 597 * in order to call the shutdown callback. This allows the callback 598 * to look at the taskqueue, even just before it dies. 599 */ 600 TQ_UNLOCK(tq); 601 gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 602 TQ_LOCK(tq); 603 604 /* rendezvous with thread that asked us to terminate */ 605 tq->tq_tcount--; 606 wakeup_one(tq->tq_threads); 607 TQ_UNLOCK(tq); 608 kthread_exit(); 609 } 610 611 static void 612 gtaskqueue_thread_enqueue(void *context) 613 { 614 struct gtaskqueue **tqp, *tq; 615 616 tqp = context; 617 tq = *tqp; 618 #ifndef __HAIKU__ 619 wakeup_one(tq); 620 #else 621 release_sem_etc(tq->tq_sem, 1, B_DO_NOT_RESCHEDULE); 622 #endif 623 } 624 625 626 static struct gtaskqueue * 627 gtaskqueue_create_fast(const char *name, int mflags, 628 taskqueue_enqueue_fn enqueue, void *context) 629 { 630 return _gtaskqueue_create(name, mflags, enqueue, context, 631 MTX_SPIN, "fast_taskqueue"); 632 } 633 634 635 struct taskqgroup_cpu { 636 LIST_HEAD(, grouptask) tgc_tasks; 637 struct gtaskqueue *tgc_taskq; 638 int tgc_cnt; 639 int tgc_cpu; 640 }; 641 642 struct taskqgroup { 643 struct taskqgroup_cpu tqg_queue[MAXCPU]; 644 struct mtx tqg_lock; 645 const char * tqg_name; 646 int tqg_adjusting; 647 int tqg_stride; 648 int tqg_cnt; 649 }; 650 651 struct taskq_bind_task { 652 struct gtask bt_task; 653 int bt_cpuid; 654 }; 655 656 static void 657 taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx, int cpu) 658 { 659 struct taskqgroup_cpu *qcpu; 660 661 qcpu = &qgroup->tqg_queue[idx]; 662 LIST_INIT(&qcpu->tgc_tasks); 663 qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK, 664 gtaskqueue_thread_enqueue, &qcpu->tgc_taskq); 665 MPASS(qcpu->tgc_taskq); 666 gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT, 667 "%s_%d", qgroup->tqg_name, idx); 668 qcpu->tgc_cpu = cpu; 669 } 670 671 static void 672 taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx) 673 { 674 675 gtaskqueue_free(qgroup->tqg_queue[idx].tgc_taskq); 676 } 677 678 /* 679 * Find the taskq with least # of tasks that doesn't currently have any 680 * other queues from the uniq identifier. 681 */ 682 static int 683 taskqgroup_find(struct taskqgroup *qgroup, void *uniq) 684 { 685 struct grouptask *n; 686 int i, idx, mincnt; 687 int strict; 688 689 mtx_assert(&qgroup->tqg_lock, MA_OWNED); 690 #ifndef __HAIKU__ 691 if (qgroup->tqg_cnt == 0) 692 #else 693 KASSERT(qgroup->tqg_cnt > 0, ("qgroup(%p)->tqg_cnt is %d!", qgroup, qgroup->tqg_cnt)); 694 if (qgroup->tqg_cnt == 1) 695 #endif 696 return (0); 697 idx = -1; 698 mincnt = INT_MAX; 699 /* 700 * Two passes; First scan for a queue with the least tasks that 701 * does not already service this uniq id. If that fails simply find 702 * the queue with the least total tasks; 703 */ 704 for (strict = 1; mincnt == INT_MAX; strict = 0) { 705 for (i = 0; i < qgroup->tqg_cnt; i++) { 706 if (qgroup->tqg_queue[i].tgc_cnt > mincnt) 707 continue; 708 if (strict) { 709 LIST_FOREACH(n, 710 &qgroup->tqg_queue[i].tgc_tasks, gt_list) 711 if (n->gt_uniq == uniq) 712 break; 713 if (n != NULL) 714 continue; 715 } 716 mincnt = qgroup->tqg_queue[i].tgc_cnt; 717 idx = i; 718 } 719 } 720 if (idx == -1) 721 panic("%s: failed to pick a qid.", __func__); 722 723 return (idx); 724 } 725 726 /* 727 * smp_started is unusable since it is not set for UP kernels or even for 728 * SMP kernels when there is 1 CPU. This is usually handled by adding a 729 * (mp_ncpus == 1) test, but that would be broken here since we need to 730 * to synchronize with the SI_SUB_SMP ordering. Even in the pure SMP case 731 * smp_started only gives a fuzzy ordering relative to SI_SUB_SMP. 732 * 733 * So maintain our own flag. It must be set after all CPUs are started 734 * and before SI_SUB_SMP:SI_ORDER_ANY so that the SYSINIT for delayed 735 * adjustment is properly delayed. SI_ORDER_FOURTH is clearly before 736 * SI_ORDER_ANY and unclearly after the CPUs are started. It would be 737 * simpler for adjustment to pass a flag indicating if it is delayed. 738 */ 739 740 static int tqg_smp_started; 741 742 static void 743 tqg_record_smp_started(void *arg) 744 { 745 tqg_smp_started = 1; 746 } 747 748 SYSINIT(tqg_record_smp_started, SI_SUB_SMP, SI_ORDER_FOURTH, 749 tqg_record_smp_started, NULL); 750 751 void 752 taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask, 753 void *uniq, device_t dev, struct resource *irq, const char *name) 754 { 755 int cpu, qid, error; 756 757 gtask->gt_uniq = uniq; 758 snprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask"); 759 gtask->gt_dev = dev; 760 gtask->gt_irq = irq; 761 gtask->gt_cpu = -1; 762 mtx_lock(&qgroup->tqg_lock); 763 qid = taskqgroup_find(qgroup, uniq); 764 qgroup->tqg_queue[qid].tgc_cnt++; 765 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 766 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 767 if (dev != NULL && irq != NULL && tqg_smp_started) { 768 cpu = qgroup->tqg_queue[qid].tgc_cpu; 769 gtask->gt_cpu = cpu; 770 mtx_unlock(&qgroup->tqg_lock); 771 error = bus_bind_intr(dev, irq, cpu); 772 if (error) 773 printf("%s: binding interrupt failed for %s: %d\n", 774 __func__, gtask->gt_name, error); 775 } else 776 mtx_unlock(&qgroup->tqg_lock); 777 } 778 779 static void 780 taskqgroup_attach_deferred(struct taskqgroup *qgroup, struct grouptask *gtask) 781 { 782 int qid, cpu, error; 783 784 mtx_lock(&qgroup->tqg_lock); 785 qid = taskqgroup_find(qgroup, gtask->gt_uniq); 786 cpu = qgroup->tqg_queue[qid].tgc_cpu; 787 if (gtask->gt_dev != NULL && gtask->gt_irq != NULL) { 788 mtx_unlock(&qgroup->tqg_lock); 789 error = bus_bind_intr(gtask->gt_dev, gtask->gt_irq, cpu); 790 mtx_lock(&qgroup->tqg_lock); 791 if (error) 792 printf("%s: binding interrupt failed for %s: %d\n", 793 __func__, gtask->gt_name, error); 794 795 } 796 qgroup->tqg_queue[qid].tgc_cnt++; 797 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 798 MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL); 799 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 800 mtx_unlock(&qgroup->tqg_lock); 801 } 802 803 int 804 taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask, 805 void *uniq, int cpu, device_t dev, struct resource *irq, const char *name) 806 { 807 int i, qid, error; 808 809 qid = -1; 810 gtask->gt_uniq = uniq; 811 snprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask"); 812 gtask->gt_dev = dev; 813 gtask->gt_irq = irq; 814 gtask->gt_cpu = cpu; 815 mtx_lock(&qgroup->tqg_lock); 816 if (tqg_smp_started) { 817 for (i = 0; i < qgroup->tqg_cnt; i++) 818 if (qgroup->tqg_queue[i].tgc_cpu == cpu) { 819 qid = i; 820 break; 821 } 822 if (qid == -1) { 823 mtx_unlock(&qgroup->tqg_lock); 824 printf("%s: qid not found for %s cpu=%d\n", __func__, gtask->gt_name, cpu); 825 return (EINVAL); 826 } 827 } else 828 qid = 0; 829 qgroup->tqg_queue[qid].tgc_cnt++; 830 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 831 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 832 cpu = qgroup->tqg_queue[qid].tgc_cpu; 833 mtx_unlock(&qgroup->tqg_lock); 834 835 if (dev != NULL && irq != NULL && tqg_smp_started) { 836 error = bus_bind_intr(dev, irq, cpu); 837 if (error) 838 printf("%s: binding interrupt failed for %s: %d\n", 839 __func__, gtask->gt_name, error); 840 } 841 return (0); 842 } 843 844 static int 845 taskqgroup_attach_cpu_deferred(struct taskqgroup *qgroup, struct grouptask *gtask) 846 { 847 device_t dev; 848 struct resource *irq; 849 int cpu, error, i, qid; 850 851 qid = -1; 852 dev = gtask->gt_dev; 853 irq = gtask->gt_irq; 854 cpu = gtask->gt_cpu; 855 MPASS(tqg_smp_started); 856 mtx_lock(&qgroup->tqg_lock); 857 for (i = 0; i < qgroup->tqg_cnt; i++) 858 if (qgroup->tqg_queue[i].tgc_cpu == cpu) { 859 qid = i; 860 break; 861 } 862 if (qid == -1) { 863 mtx_unlock(&qgroup->tqg_lock); 864 printf("%s: qid not found for %s cpu=%d\n", __func__, gtask->gt_name, cpu); 865 return (EINVAL); 866 } 867 qgroup->tqg_queue[qid].tgc_cnt++; 868 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 869 MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL); 870 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 871 mtx_unlock(&qgroup->tqg_lock); 872 873 if (dev != NULL && irq != NULL) { 874 error = bus_bind_intr(dev, irq, cpu); 875 if (error) 876 printf("%s: binding interrupt failed for %s: %d\n", 877 __func__, gtask->gt_name, error); 878 } 879 return (0); 880 } 881 882 void 883 taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask) 884 { 885 int i; 886 887 grouptask_block(gtask); 888 mtx_lock(&qgroup->tqg_lock); 889 for (i = 0; i < qgroup->tqg_cnt; i++) 890 if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue) 891 break; 892 if (i == qgroup->tqg_cnt) 893 panic("%s: task %s not in group", __func__, gtask->gt_name); 894 qgroup->tqg_queue[i].tgc_cnt--; 895 LIST_REMOVE(gtask, gt_list); 896 mtx_unlock(&qgroup->tqg_lock); 897 gtask->gt_taskqueue = NULL; 898 gtask->gt_task.ta_flags &= ~TASK_NOENQUEUE; 899 } 900 901 static void 902 taskqgroup_binder(void *ctx) 903 { 904 struct taskq_bind_task *gtask = (struct taskq_bind_task *)ctx; 905 #ifndef __HAIKU__ 906 cpuset_t mask; 907 int error; 908 909 CPU_ZERO(&mask); 910 CPU_SET(gtask->bt_cpuid, &mask); 911 error = cpuset_setthread(curthread->td_tid, &mask); 912 thread_lock(curthread); 913 sched_bind(curthread, gtask->bt_cpuid); 914 thread_unlock(curthread); 915 916 if (error) 917 printf("%s: binding curthread failed: %d\n", __func__, error); 918 #endif 919 free(gtask, M_DEVBUF); 920 } 921 922 static void 923 taskqgroup_bind(struct taskqgroup *qgroup) 924 { 925 struct taskq_bind_task *gtask; 926 int i; 927 928 /* 929 * Bind taskqueue threads to specific CPUs, if they have been assigned 930 * one. 931 */ 932 if (qgroup->tqg_cnt == 1) 933 return; 934 935 for (i = 0; i < qgroup->tqg_cnt; i++) { 936 gtask = malloc(sizeof (*gtask), M_DEVBUF, M_WAITOK); 937 GTASK_INIT(>ask->bt_task, 0, 0, taskqgroup_binder, gtask); 938 gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu; 939 grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq, 940 >ask->bt_task); 941 } 942 } 943 944 static void 945 taskqgroup_config_init(void *arg) 946 { 947 struct taskqgroup *qgroup = qgroup_config; 948 LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL); 949 950 LIST_SWAP(>ask_head, &qgroup->tqg_queue[0].tgc_tasks, 951 grouptask, gt_list); 952 qgroup->tqg_queue[0].tgc_cnt = 0; 953 taskqgroup_cpu_create(qgroup, 0, 0); 954 955 qgroup->tqg_cnt = 1; 956 qgroup->tqg_stride = 1; 957 } 958 959 SYSINIT(taskqgroup_config_init, SI_SUB_TASKQ, SI_ORDER_SECOND, 960 taskqgroup_config_init, NULL); 961 962 static int 963 _taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride) 964 { 965 LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL); 966 struct grouptask *gtask; 967 int i, k, old_cnt, old_cpu, cpu; 968 969 mtx_assert(&qgroup->tqg_lock, MA_OWNED); 970 971 if (cnt < 1 || cnt * stride > mp_ncpus || !tqg_smp_started) { 972 printf("%s: failed cnt: %d stride: %d " 973 "mp_ncpus: %d tqg_smp_started: %d\n", 974 __func__, cnt, stride, mp_ncpus, tqg_smp_started); 975 return (EINVAL); 976 } 977 if (qgroup->tqg_adjusting) { 978 printf("%s failed: adjusting\n", __func__); 979 return (EBUSY); 980 } 981 qgroup->tqg_adjusting = 1; 982 old_cnt = qgroup->tqg_cnt; 983 old_cpu = 0; 984 if (old_cnt < cnt) 985 old_cpu = qgroup->tqg_queue[old_cnt].tgc_cpu; 986 mtx_unlock(&qgroup->tqg_lock); 987 /* 988 * Set up queue for tasks added before boot. 989 */ 990 if (old_cnt == 0) { 991 LIST_SWAP(>ask_head, &qgroup->tqg_queue[0].tgc_tasks, 992 grouptask, gt_list); 993 qgroup->tqg_queue[0].tgc_cnt = 0; 994 } 995 996 /* 997 * If new taskq threads have been added. 998 */ 999 cpu = old_cpu; 1000 for (i = old_cnt; i < cnt; i++) { 1001 taskqgroup_cpu_create(qgroup, i, cpu); 1002 1003 for (k = 0; k < stride; k++) 1004 cpu = CPU_NEXT(cpu); 1005 } 1006 mtx_lock(&qgroup->tqg_lock); 1007 qgroup->tqg_cnt = cnt; 1008 qgroup->tqg_stride = stride; 1009 1010 /* 1011 * Adjust drivers to use new taskqs. 1012 */ 1013 for (i = 0; i < old_cnt; i++) { 1014 while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) { 1015 LIST_REMOVE(gtask, gt_list); 1016 qgroup->tqg_queue[i].tgc_cnt--; 1017 LIST_INSERT_HEAD(>ask_head, gtask, gt_list); 1018 } 1019 } 1020 mtx_unlock(&qgroup->tqg_lock); 1021 1022 while ((gtask = LIST_FIRST(>ask_head))) { 1023 LIST_REMOVE(gtask, gt_list); 1024 if (gtask->gt_cpu == -1) 1025 taskqgroup_attach_deferred(qgroup, gtask); 1026 else if (taskqgroup_attach_cpu_deferred(qgroup, gtask)) 1027 taskqgroup_attach_deferred(qgroup, gtask); 1028 } 1029 1030 #ifdef INVARIANTS 1031 mtx_lock(&qgroup->tqg_lock); 1032 for (i = 0; i < qgroup->tqg_cnt; i++) { 1033 MPASS(qgroup->tqg_queue[i].tgc_taskq != NULL); 1034 LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list) 1035 MPASS(gtask->gt_taskqueue != NULL); 1036 } 1037 mtx_unlock(&qgroup->tqg_lock); 1038 #endif 1039 /* 1040 * If taskq thread count has been reduced. 1041 */ 1042 for (i = cnt; i < old_cnt; i++) 1043 taskqgroup_cpu_remove(qgroup, i); 1044 1045 taskqgroup_bind(qgroup); 1046 1047 mtx_lock(&qgroup->tqg_lock); 1048 qgroup->tqg_adjusting = 0; 1049 1050 return (0); 1051 } 1052 1053 int 1054 taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride) 1055 { 1056 int error; 1057 1058 mtx_lock(&qgroup->tqg_lock); 1059 error = _taskqgroup_adjust(qgroup, cnt, stride); 1060 mtx_unlock(&qgroup->tqg_lock); 1061 1062 return (error); 1063 } 1064 1065 struct taskqgroup * 1066 taskqgroup_create(const char *name) 1067 { 1068 struct taskqgroup *qgroup; 1069 1070 qgroup = malloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO); 1071 mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF); 1072 qgroup->tqg_name = name; 1073 LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks); 1074 1075 return (qgroup); 1076 } 1077 1078 void 1079 taskqgroup_destroy(struct taskqgroup *qgroup) 1080 { 1081 1082 } 1083 1084 void 1085 taskqgroup_config_gtask_init(void *ctx, struct grouptask *gtask, gtask_fn_t *fn, 1086 const char *name) 1087 { 1088 1089 GROUPTASK_INIT(gtask, 0, fn, ctx); 1090 taskqgroup_attach(qgroup_config, gtask, gtask, NULL, NULL, name); 1091 } 1092 1093 void 1094 taskqgroup_config_gtask_deinit(struct grouptask *gtask) 1095 { 1096 1097 taskqgroup_detach(qgroup_config, gtask); 1098 } 1099