1 /* 2 * Copyright 2009, Colin Günther, coling@gmx.de 3 * Copyright 2007, Hugo Santos. All Rights Reserved. 4 * Distributed under the terms of the MIT License. 5 * 6 * Authors: 7 * Hugo Santos, hugosantos@gmail.com 8 */ 9 10 11 #include "device.h" 12 13 #include <stdio.h> 14 15 #include <compat/sys/callout.h> 16 #include <compat/sys/taskqueue.h> 17 #include <compat/sys/haiku-module.h> 18 19 20 #define TQ_FLAGS_ACTIVE (1 << 0) 21 #define TQ_FLAGS_BLOCKED (1 << 1) 22 #define TQ_FLAGS_PENDING (1 << 2) 23 24 #define DT_CALLOUT_ARMED (1 << 0) 25 #define DT_DRAIN_IN_PROGRESS (1 << 1) 26 27 struct taskqueue { 28 char tq_name[TASKQUEUE_NAMELEN]; 29 struct mtx tq_mutex; 30 struct list tq_list; 31 taskqueue_enqueue_fn tq_enqueue; 32 void *tq_arg; 33 int tq_fast; 34 spinlock tq_spinlock; 35 sem_id tq_sem; 36 thread_id *tq_threads; 37 thread_id tq_thread_storage; 38 int tq_threadcount; 39 int tq_flags; 40 int tq_callouts; 41 }; 42 43 struct taskqueue *taskqueue_fast = NULL; 44 struct taskqueue *taskqueue_swi = NULL; 45 struct taskqueue *taskqueue_thread = NULL; 46 47 48 static struct taskqueue * 49 _taskqueue_create(const char *name, int mflags, int fast, 50 taskqueue_enqueue_fn enqueueFunction, void *context) 51 { 52 struct taskqueue *tq = malloc(sizeof(struct taskqueue)); 53 if (tq == NULL) 54 return NULL; 55 56 tq->tq_fast = fast; 57 58 if (fast) { 59 B_INITIALIZE_SPINLOCK(&tq->tq_spinlock); 60 } else { 61 mtx_init(&tq->tq_mutex, name, NULL, MTX_DEF); 62 } 63 64 strlcpy(tq->tq_name, name, sizeof(tq->tq_name)); 65 list_init_etc(&tq->tq_list, offsetof(struct task, ta_link)); 66 tq->tq_enqueue = enqueueFunction; 67 tq->tq_arg = context; 68 69 tq->tq_sem = -1; 70 tq->tq_threads = NULL; 71 tq->tq_threadcount = 0; 72 tq->tq_flags = TQ_FLAGS_ACTIVE; 73 tq->tq_callouts = 0; 74 75 return tq; 76 } 77 78 79 static void 80 tq_lock(struct taskqueue *taskQueue, cpu_status *status) 81 { 82 if (taskQueue->tq_fast) { 83 *status = disable_interrupts(); 84 acquire_spinlock(&taskQueue->tq_spinlock); 85 } else { 86 mtx_lock(&taskQueue->tq_mutex); 87 } 88 } 89 90 91 static void 92 tq_unlock(struct taskqueue *taskQueue, cpu_status status) 93 { 94 if (taskQueue->tq_fast) { 95 release_spinlock(&taskQueue->tq_spinlock); 96 restore_interrupts(status); 97 } else { 98 mtx_unlock(&taskQueue->tq_mutex); 99 } 100 } 101 102 103 struct taskqueue * 104 taskqueue_create(const char *name, int mflags, 105 taskqueue_enqueue_fn enqueueFunction, void *context) 106 { 107 return _taskqueue_create(name, mflags, 0, enqueueFunction, context); 108 } 109 110 111 static int32 112 tq_handle_thread(void *data) 113 { 114 struct taskqueue *tq = data; 115 cpu_status cpu_state; 116 struct task *t; 117 int pending; 118 sem_id sem; 119 120 /* just a synchronization point */ 121 tq_lock(tq, &cpu_state); 122 sem = tq->tq_sem; 123 tq_unlock(tq, cpu_state); 124 125 while (acquire_sem(sem) == B_NO_ERROR) { 126 tq_lock(tq, &cpu_state); 127 t = list_remove_head_item(&tq->tq_list); 128 tq_unlock(tq, cpu_state); 129 if (t == NULL) 130 continue; 131 pending = t->ta_pending; 132 t->ta_pending = 0; 133 134 t->ta_handler(t->ta_argument, pending); 135 } 136 137 return 0; 138 } 139 140 141 static int 142 _taskqueue_start_threads(struct taskqueue **taskQueue, int count, int priority, 143 const char *name) 144 { 145 struct taskqueue *tq = (*taskQueue); 146 int i, j; 147 148 if (count == 0) 149 return -1; 150 151 if (tq->tq_threads != NULL) 152 return -1; 153 154 if (count == 1) { 155 tq->tq_threads = &tq->tq_thread_storage; 156 } else { 157 tq->tq_threads = malloc(sizeof(thread_id) * count); 158 if (tq->tq_threads == NULL) 159 return B_NO_MEMORY; 160 } 161 162 tq->tq_sem = create_sem(0, tq->tq_name); 163 if (tq->tq_sem < B_OK) { 164 if (count > 1) 165 free(tq->tq_threads); 166 tq->tq_threads = NULL; 167 return tq->tq_sem; 168 } 169 170 for (i = 0; i < count; i++) { 171 tq->tq_threads[i] = spawn_kernel_thread(tq_handle_thread, tq->tq_name, 172 priority, tq); 173 if (tq->tq_threads[i] < B_OK) { 174 status_t status = tq->tq_threads[i]; 175 for (j = 0; j < i; j++) 176 kill_thread(tq->tq_threads[j]); 177 if (count > 1) 178 free(tq->tq_threads); 179 tq->tq_threads = NULL; 180 delete_sem(tq->tq_sem); 181 return status; 182 } 183 } 184 185 tq->tq_threadcount = count; 186 187 for (i = 0; i < count; i++) 188 resume_thread(tq->tq_threads[i]); 189 190 return 0; 191 } 192 193 194 int 195 taskqueue_start_threads(struct taskqueue **taskQueue, int count, int priority, 196 const char *format, ...) 197 { 198 /* we assume that start_threads is called in a sane place, and thus 199 * don't need to be locked. This is mostly due to the fact that if 200 * the TQ is 'fast', locking the TQ disables interrupts... and then 201 * we can't create semaphores, threads and bananas. */ 202 203 /* cpu_status state; */ 204 char name[64]; 205 int result; 206 va_list vl; 207 208 va_start(vl, format); 209 vsnprintf(name, sizeof(name), format, vl); 210 va_end(vl); 211 212 /*tq_lock(*tqp, &state);*/ 213 result = _taskqueue_start_threads(taskQueue, count, priority, name); 214 /*tq_unlock(*tqp, state);*/ 215 216 return result; 217 } 218 219 220 void 221 taskqueue_free(struct taskqueue *taskQueue) 222 { 223 if (taskQueue == NULL) { 224 printf("taskqueue_free called with NULL taskqueue\n"); 225 return; 226 } 227 228 /* lock and drain list? */ 229 taskQueue->tq_flags &= ~TQ_FLAGS_ACTIVE; 230 if (!taskQueue->tq_fast) 231 mtx_destroy(&taskQueue->tq_mutex); 232 if (taskQueue->tq_sem != -1) { 233 int i; 234 235 delete_sem(taskQueue->tq_sem); 236 237 for (i = 0; i < taskQueue->tq_threadcount; i++) { 238 status_t status; 239 wait_for_thread(taskQueue->tq_threads[i], &status); 240 } 241 242 if (taskQueue->tq_threadcount > 1) 243 free(taskQueue->tq_threads); 244 } 245 246 free(taskQueue); 247 } 248 249 250 void 251 taskqueue_drain(struct taskqueue *taskQueue, struct task *task) 252 { 253 cpu_status status; 254 255 if (taskQueue == NULL) { 256 printf("taskqueue_drain called with NULL taskqueue\n"); 257 return; 258 } 259 260 tq_lock(taskQueue, &status); 261 while (task->ta_pending != 0) { 262 tq_unlock(taskQueue, status); 263 snooze(0); 264 tq_lock(taskQueue, &status); 265 } 266 tq_unlock(taskQueue, status); 267 } 268 269 270 void 271 taskqueue_drain_timeout(struct taskqueue *queue, 272 struct timeout_task *timeout_task) 273 { 274 cpu_status status; 275 /* 276 * Set flag to prevent timer from re-starting during drain: 277 */ 278 tq_lock(queue, &status); 279 KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0, 280 ("Drain already in progress")); 281 timeout_task->f |= DT_DRAIN_IN_PROGRESS; 282 tq_unlock(queue, status); 283 284 callout_drain(&timeout_task->c); 285 taskqueue_drain(queue, &timeout_task->t); 286 287 /* 288 * Clear flag to allow timer to re-start: 289 */ 290 tq_lock(queue, &status); 291 timeout_task->f &= ~DT_DRAIN_IN_PROGRESS; 292 tq_unlock(queue, status); 293 } 294 295 296 static void 297 taskqueue_task_nop_fn(void* context, int pending) 298 { 299 } 300 301 302 void 303 taskqueue_drain_all(struct taskqueue *taskQueue) 304 { 305 struct task t_barrier; 306 307 if (taskQueue == NULL) { 308 printf("taskqueue_drain_all called with NULL taskqueue\n"); 309 return; 310 } 311 312 TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier); 313 taskqueue_enqueue(taskQueue, &t_barrier); 314 taskqueue_drain(taskQueue, &t_barrier); 315 } 316 317 318 static void 319 taskqueue_enqueue_locked(struct taskqueue *taskQueue, struct task *task, 320 cpu_status status) 321 { 322 /* we don't really support priorities */ 323 if (task->ta_pending) { 324 task->ta_pending++; 325 } else { 326 list_add_item(&taskQueue->tq_list, task); 327 task->ta_pending = 1; 328 if ((taskQueue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 329 taskQueue->tq_enqueue(taskQueue->tq_arg); 330 else 331 taskQueue->tq_flags |= TQ_FLAGS_PENDING; 332 } 333 tq_unlock(taskQueue, status); 334 } 335 336 337 int 338 taskqueue_enqueue(struct taskqueue *taskQueue, struct task *task) 339 { 340 cpu_status status; 341 342 tq_lock(taskQueue, &status); 343 taskqueue_enqueue_locked(taskQueue, task, status); 344 /* The lock is released inside. */ 345 346 return 0; 347 } 348 349 350 static void 351 taskqueue_timeout_func(void *arg) 352 { 353 struct taskqueue *queue; 354 struct timeout_task *timeout_task; 355 cpu_status status; 356 // dummy, as we should never get here on a spin taskqueue 357 358 timeout_task = arg; 359 queue = timeout_task->q; 360 KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout")); 361 timeout_task->f &= ~DT_CALLOUT_ARMED; 362 queue->tq_callouts--; 363 taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t, status); 364 /* The lock is released inside. */ 365 } 366 367 368 int 369 taskqueue_enqueue_timeout(struct taskqueue *queue, 370 struct timeout_task *ttask, int _ticks) 371 { 372 int res; 373 cpu_status status; 374 375 tq_lock(queue, &status); 376 KASSERT(ttask->q == NULL || ttask->q == queue, 377 ("Migrated queue")); 378 ttask->q = queue; 379 res = ttask->t.ta_pending; 380 if (ttask->f & DT_DRAIN_IN_PROGRESS) { 381 /* Do nothing */ 382 tq_unlock(queue, status); 383 res = -1; 384 } else if (_ticks == 0) { 385 tq_unlock(queue, status); 386 taskqueue_enqueue(queue, &ttask->t); 387 } else { 388 if ((ttask->f & DT_CALLOUT_ARMED) != 0) { 389 res++; 390 } else { 391 queue->tq_callouts++; 392 ttask->f |= DT_CALLOUT_ARMED; 393 if (_ticks < 0) 394 _ticks = -_ticks; /* Ignore overflow. */ 395 } 396 tq_unlock(queue, status); 397 if (_ticks > 0) { 398 callout_reset(&ttask->c, _ticks, 399 taskqueue_timeout_func, ttask); 400 } 401 } 402 return (res); 403 } 404 405 406 static int 407 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, 408 u_int *pendp) 409 { 410 if (task->ta_pending > 0) 411 list_remove_item(&queue->tq_list, task); 412 if (pendp != NULL) 413 *pendp = task->ta_pending; 414 task->ta_pending = 0; 415 return 0; 416 } 417 418 419 int 420 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) 421 { 422 int error; 423 cpu_status status; 424 425 tq_lock(queue, &status); 426 error = taskqueue_cancel_locked(queue, task, pendp); 427 tq_unlock(queue, status); 428 429 return (error); 430 } 431 432 433 int 434 taskqueue_cancel_timeout(struct taskqueue *queue, 435 struct timeout_task *timeout_task, u_int *pendp) 436 { 437 u_int pending, pending1; 438 int error; 439 cpu_status status; 440 441 tq_lock(queue, &status); 442 pending = !!(callout_stop(&timeout_task->c) > 0); 443 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); 444 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 445 timeout_task->f &= ~DT_CALLOUT_ARMED; 446 queue->tq_callouts--; 447 } 448 tq_unlock(queue, status); 449 450 if (pendp != NULL) 451 *pendp = pending + pending1; 452 return (error); 453 } 454 455 456 void 457 taskqueue_thread_enqueue(void *context) 458 { 459 struct taskqueue **tqp = context; 460 release_sem_etc((*tqp)->tq_sem, 1, B_DO_NOT_RESCHEDULE); 461 } 462 463 464 int 465 taskqueue_enqueue_fast(struct taskqueue *taskQueue, struct task *task) 466 { 467 return taskqueue_enqueue(taskQueue, task); 468 } 469 470 471 struct taskqueue * 472 taskqueue_create_fast(const char *name, int mflags, 473 taskqueue_enqueue_fn enqueueFunction, void *context) 474 { 475 return _taskqueue_create(name, mflags, 1, enqueueFunction, context); 476 } 477 478 479 void 480 task_init(struct task *task, int prio, task_fn_t handler, void *context) 481 { 482 task->ta_priority = prio; 483 task->ta_handler = handler; 484 task->ta_argument = context; 485 task->ta_pending = 0; 486 } 487 488 489 void 490 timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, 491 int priority, task_fn_t func, void *context) 492 { 493 TASK_INIT(&timeout_task->t, priority, func, context); 494 callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 495 CALLOUT_RETURNUNLOCKED); 496 timeout_task->q = queue; 497 timeout_task->f = 0; 498 } 499 500 501 status_t 502 init_taskqueues() 503 { 504 status_t status = B_NO_MEMORY; 505 506 if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE)) { 507 taskqueue_fast = taskqueue_create_fast("fast taskq", 0, 508 taskqueue_thread_enqueue, &taskqueue_fast); 509 if (taskqueue_fast == NULL) 510 return B_NO_MEMORY; 511 512 status = taskqueue_start_threads(&taskqueue_fast, 1, 513 B_REAL_TIME_PRIORITY, "fast taskq thread"); 514 if (status < B_OK) 515 goto err_1; 516 } 517 518 if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE)) { 519 taskqueue_swi = taskqueue_create_fast("swi taskq", 0, 520 taskqueue_thread_enqueue, &taskqueue_swi); 521 if (taskqueue_swi == NULL) { 522 status = B_NO_MEMORY; 523 goto err_1; 524 } 525 526 status = taskqueue_start_threads(&taskqueue_swi, 1, 527 B_REAL_TIME_PRIORITY, "swi taskq"); 528 if (status < B_OK) 529 goto err_2; 530 } 531 532 if (HAIKU_DRIVER_REQUIRES(FBSD_THREAD_TASKQUEUE)) { 533 taskqueue_thread = taskqueue_create_fast("thread taskq", 0, 534 taskqueue_thread_enqueue, &taskqueue_thread); 535 if (taskqueue_thread == NULL) { 536 status = B_NO_MEMORY; 537 goto err_2; 538 } 539 540 status = taskqueue_start_threads(&taskqueue_thread, 1, 541 B_REAL_TIME_PRIORITY, "swi taskq"); 542 if (status < B_OK) 543 goto err_3; 544 } 545 546 return B_OK; 547 548 err_3: 549 if (taskqueue_thread) 550 taskqueue_free(taskqueue_thread); 551 552 err_2: 553 if (taskqueue_swi) 554 taskqueue_free(taskqueue_swi); 555 556 err_1: 557 if (taskqueue_fast) 558 taskqueue_free(taskqueue_fast); 559 560 return status; 561 } 562 563 564 void 565 uninit_taskqueues() 566 { 567 if (HAIKU_DRIVER_REQUIRES(FBSD_THREAD_TASKQUEUE)) 568 taskqueue_free(taskqueue_thread); 569 570 if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE)) 571 taskqueue_free(taskqueue_swi); 572 573 if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE)) 574 taskqueue_free(taskqueue_fast); 575 } 576 577 578 void 579 taskqueue_block(struct taskqueue *taskQueue) 580 { 581 cpu_status status; 582 583 tq_lock(taskQueue, &status); 584 taskQueue->tq_flags |= TQ_FLAGS_BLOCKED; 585 tq_unlock(taskQueue, status); 586 } 587 588 589 void 590 taskqueue_unblock(struct taskqueue *taskQueue) 591 { 592 cpu_status status; 593 594 tq_lock(taskQueue, &status); 595 taskQueue->tq_flags &= ~TQ_FLAGS_BLOCKED; 596 if (taskQueue->tq_flags & TQ_FLAGS_PENDING) { 597 taskQueue->tq_flags &= ~TQ_FLAGS_PENDING; 598 taskQueue->tq_enqueue(taskQueue->tq_arg); 599 } 600 tq_unlock(taskQueue, status); 601 } 602