1 /* 2 * Copyright 2009, Colin Günther, coling@gmx.de 3 * Copyright 2007, Hugo Santos. All Rights Reserved. 4 * Distributed under the terms of the MIT License. 5 * 6 * Authors: 7 * Hugo Santos, hugosantos@gmail.com 8 */ 9 10 11 #include "device.h" 12 13 #include <stdio.h> 14 15 #include <compat/sys/callout.h> 16 #include <compat/sys/taskqueue.h> 17 #include <compat/sys/haiku-module.h> 18 19 20 #define TQ_FLAGS_ACTIVE (1 << 0) 21 #define TQ_FLAGS_BLOCKED (1 << 1) 22 #define TQ_FLAGS_PENDING (1 << 2) 23 24 #define DT_CALLOUT_ARMED (1 << 0) 25 #define DT_DRAIN_IN_PROGRESS (1 << 1) 26 27 struct taskqueue { 28 char tq_name[TASKQUEUE_NAMELEN]; 29 struct mtx tq_mutex; 30 struct list tq_list; 31 taskqueue_enqueue_fn tq_enqueue; 32 void *tq_arg; 33 int tq_fast; 34 sem_id tq_sem; 35 thread_id *tq_threads; 36 thread_id tq_thread_storage; 37 int tq_threadcount; 38 int tq_flags; 39 int tq_callouts; 40 }; 41 42 struct taskqueue *taskqueue_fast = NULL; 43 struct taskqueue *taskqueue_swi = NULL; 44 struct taskqueue *taskqueue_thread = NULL; 45 46 47 static struct taskqueue * 48 _taskqueue_create(const char *name, int mflags, int fast, 49 taskqueue_enqueue_fn enqueueFunction, void *context) 50 { 51 struct taskqueue *tq = malloc(sizeof(struct taskqueue)); 52 if (tq == NULL) 53 return NULL; 54 55 tq->tq_fast = fast; 56 57 if (fast) { 58 mtx_init(&tq->tq_mutex, name, NULL, MTX_SPIN); 59 } else { 60 mtx_init(&tq->tq_mutex, name, NULL, MTX_DEF); 61 } 62 63 strlcpy(tq->tq_name, name, sizeof(tq->tq_name)); 64 list_init_etc(&tq->tq_list, offsetof(struct task, ta_link)); 65 tq->tq_enqueue = enqueueFunction; 66 tq->tq_arg = context; 67 68 tq->tq_sem = -1; 69 tq->tq_threads = NULL; 70 tq->tq_threadcount = 0; 71 tq->tq_flags = TQ_FLAGS_ACTIVE; 72 tq->tq_callouts = 0; 73 74 return tq; 75 } 76 77 78 static void 79 tq_lock(struct taskqueue *taskQueue) 80 { 81 if (taskQueue->tq_fast) { 82 mtx_lock_spin(&taskQueue->tq_mutex); 83 } else { 84 mtx_lock(&taskQueue->tq_mutex); 85 } 86 } 87 88 89 static void 90 tq_unlock(struct taskqueue *taskQueue) 91 { 92 if (taskQueue->tq_fast) { 93 mtx_unlock_spin(&taskQueue->tq_mutex); 94 } else { 95 mtx_unlock(&taskQueue->tq_mutex); 96 } 97 } 98 99 100 struct taskqueue * 101 taskqueue_create(const char *name, int mflags, 102 taskqueue_enqueue_fn enqueueFunction, void *context) 103 { 104 return _taskqueue_create(name, mflags, 0, enqueueFunction, context); 105 } 106 107 108 static int32 109 tq_handle_thread(void *data) 110 { 111 struct taskqueue *tq = data; 112 struct task *t; 113 int pending; 114 sem_id sem; 115 116 /* just a synchronization point */ 117 tq_lock(tq); 118 sem = tq->tq_sem; 119 tq_unlock(tq); 120 121 while (acquire_sem(sem) == B_NO_ERROR) { 122 tq_lock(tq); 123 t = list_remove_head_item(&tq->tq_list); 124 tq_unlock(tq); 125 if (t == NULL) 126 continue; 127 pending = t->ta_pending; 128 t->ta_pending = 0; 129 130 t->ta_handler(t->ta_argument, pending); 131 } 132 133 return 0; 134 } 135 136 137 static int 138 _taskqueue_start_threads(struct taskqueue **taskQueue, int count, int priority, 139 const char *name) 140 { 141 struct taskqueue *tq = (*taskQueue); 142 int i, j; 143 144 if (count == 0) 145 return -1; 146 147 if (tq->tq_threads != NULL) 148 return -1; 149 150 if (count == 1) { 151 tq->tq_threads = &tq->tq_thread_storage; 152 } else { 153 tq->tq_threads = malloc(sizeof(thread_id) * count); 154 if (tq->tq_threads == NULL) 155 return B_NO_MEMORY; 156 } 157 158 tq->tq_sem = create_sem(0, tq->tq_name); 159 if (tq->tq_sem < B_OK) { 160 if (count > 1) 161 free(tq->tq_threads); 162 tq->tq_threads = NULL; 163 return tq->tq_sem; 164 } 165 166 for (i = 0; i < count; i++) { 167 tq->tq_threads[i] = spawn_kernel_thread(tq_handle_thread, tq->tq_name, 168 priority, tq); 169 if (tq->tq_threads[i] < B_OK) { 170 status_t status = tq->tq_threads[i]; 171 for (j = 0; j < i; j++) 172 kill_thread(tq->tq_threads[j]); 173 if (count > 1) 174 free(tq->tq_threads); 175 tq->tq_threads = NULL; 176 delete_sem(tq->tq_sem); 177 return status; 178 } 179 } 180 181 tq->tq_threadcount = count; 182 183 for (i = 0; i < count; i++) 184 resume_thread(tq->tq_threads[i]); 185 186 return 0; 187 } 188 189 190 int 191 taskqueue_start_threads(struct taskqueue **taskQueue, int count, int priority, 192 const char *format, ...) 193 { 194 /* we assume that start_threads is called in a sane place, and thus 195 * don't need to be locked. This is mostly due to the fact that if 196 * the TQ is 'fast', locking the TQ disables interrupts... and then 197 * we can't create semaphores, threads and bananas. */ 198 199 char name[64]; 200 int result; 201 va_list vl; 202 203 va_start(vl, format); 204 vsnprintf(name, sizeof(name), format, vl); 205 va_end(vl); 206 207 /*tq_lock(*tqp);*/ 208 result = _taskqueue_start_threads(taskQueue, count, priority, name); 209 /*tq_unlock(*tqp);*/ 210 211 return result; 212 } 213 214 215 void 216 taskqueue_free(struct taskqueue *taskQueue) 217 { 218 if (taskQueue == NULL) 219 return; 220 221 /* lock and drain list? */ 222 taskQueue->tq_flags &= ~TQ_FLAGS_ACTIVE; 223 if (!taskQueue->tq_fast) 224 mtx_destroy(&taskQueue->tq_mutex); 225 if (taskQueue->tq_sem != -1) { 226 int i; 227 228 delete_sem(taskQueue->tq_sem); 229 230 for (i = 0; i < taskQueue->tq_threadcount; i++) { 231 status_t status; 232 wait_for_thread(taskQueue->tq_threads[i], &status); 233 } 234 235 if (taskQueue->tq_threadcount > 1) 236 free(taskQueue->tq_threads); 237 } 238 239 free(taskQueue); 240 } 241 242 243 void 244 taskqueue_drain(struct taskqueue *taskQueue, struct task *task) 245 { 246 if (taskQueue == NULL) 247 return; 248 249 tq_lock(taskQueue); 250 while (task->ta_pending != 0) { 251 tq_unlock(taskQueue); 252 snooze(0); 253 tq_lock(taskQueue); 254 } 255 tq_unlock(taskQueue); 256 } 257 258 259 void 260 taskqueue_drain_timeout(struct taskqueue *queue, 261 struct timeout_task *timeout_task) 262 { 263 /* 264 * Set flag to prevent timer from re-starting during drain: 265 */ 266 tq_lock(queue); 267 KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0, 268 ("Drain already in progress")); 269 timeout_task->f |= DT_DRAIN_IN_PROGRESS; 270 tq_unlock(queue); 271 272 callout_drain(&timeout_task->c); 273 taskqueue_drain(queue, &timeout_task->t); 274 275 /* 276 * Clear flag to allow timer to re-start: 277 */ 278 tq_lock(queue); 279 timeout_task->f &= ~DT_DRAIN_IN_PROGRESS; 280 tq_unlock(queue); 281 } 282 283 284 static void 285 taskqueue_task_nop_fn(void* context, int pending) 286 { 287 } 288 289 290 void 291 taskqueue_drain_all(struct taskqueue *taskQueue) 292 { 293 struct task t_barrier; 294 295 if (taskQueue == NULL) { 296 printf("taskqueue_drain_all called with NULL taskqueue\n"); 297 return; 298 } 299 300 TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier); 301 taskqueue_enqueue(taskQueue, &t_barrier); 302 taskqueue_drain(taskQueue, &t_barrier); 303 } 304 305 306 static void 307 taskqueue_enqueue_locked(struct taskqueue *taskQueue, struct task *task) 308 { 309 /* we don't really support priorities */ 310 if (task->ta_pending) { 311 task->ta_pending++; 312 } else { 313 list_add_item(&taskQueue->tq_list, task); 314 task->ta_pending = 1; 315 if ((taskQueue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 316 taskQueue->tq_enqueue(taskQueue->tq_arg); 317 else 318 taskQueue->tq_flags |= TQ_FLAGS_PENDING; 319 } 320 tq_unlock(taskQueue); 321 } 322 323 324 int 325 taskqueue_enqueue(struct taskqueue *taskQueue, struct task *task) 326 { 327 tq_lock(taskQueue); 328 taskqueue_enqueue_locked(taskQueue, task); 329 /* The lock is released inside. */ 330 331 return 0; 332 } 333 334 335 static void 336 taskqueue_timeout_func(void *arg) 337 { 338 struct taskqueue *queue; 339 struct timeout_task *timeout_task; 340 341 timeout_task = arg; 342 queue = timeout_task->q; 343 KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, 344 ("stray timeout ('%s')", timeout_task->q->tq_name)); 345 timeout_task->f &= ~DT_CALLOUT_ARMED; 346 queue->tq_callouts--; 347 taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t); 348 /* The lock is released inside. */ 349 } 350 351 352 int 353 taskqueue_enqueue_timeout(struct taskqueue *queue, 354 struct timeout_task *ttask, int _ticks) 355 { 356 int res; 357 358 tq_lock(queue); 359 KASSERT(ttask->q == NULL || ttask->q == queue, 360 ("Migrated queue")); 361 ttask->q = queue; 362 res = ttask->t.ta_pending; 363 if (ttask->f & DT_DRAIN_IN_PROGRESS) { 364 /* Do nothing */ 365 tq_unlock(queue); 366 res = -1; 367 } else if (_ticks == 0) { 368 tq_unlock(queue); 369 taskqueue_enqueue(queue, &ttask->t); 370 } else { 371 if ((ttask->f & DT_CALLOUT_ARMED) != 0) { 372 res++; 373 } else { 374 queue->tq_callouts++; 375 ttask->f |= DT_CALLOUT_ARMED; 376 if (_ticks < 0) 377 _ticks = -_ticks; /* Ignore overflow. */ 378 } 379 tq_unlock(queue); 380 if (_ticks > 0) { 381 callout_reset(&ttask->c, _ticks, 382 taskqueue_timeout_func, ttask); 383 } 384 } 385 return (res); 386 } 387 388 389 static int 390 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, 391 u_int *pendp) 392 { 393 if (task->ta_pending > 0) 394 list_remove_item(&queue->tq_list, task); 395 if (pendp != NULL) 396 *pendp = task->ta_pending; 397 task->ta_pending = 0; 398 return 0; 399 } 400 401 402 int 403 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) 404 { 405 int error; 406 407 tq_lock(queue); 408 error = taskqueue_cancel_locked(queue, task, pendp); 409 tq_unlock(queue); 410 411 return (error); 412 } 413 414 415 int 416 taskqueue_cancel_timeout(struct taskqueue *queue, 417 struct timeout_task *timeout_task, u_int *pendp) 418 { 419 u_int pending, pending1; 420 int error; 421 422 tq_lock(queue); 423 pending = !!(callout_stop(&timeout_task->c) > 0); 424 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); 425 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 426 timeout_task->f &= ~DT_CALLOUT_ARMED; 427 queue->tq_callouts--; 428 } 429 tq_unlock(queue); 430 431 if (pendp != NULL) 432 *pendp = pending + pending1; 433 return (error); 434 } 435 436 437 void 438 taskqueue_block(struct taskqueue *taskQueue) 439 { 440 if (taskQueue == NULL) 441 return; 442 443 tq_lock(taskQueue); 444 taskQueue->tq_flags |= TQ_FLAGS_BLOCKED; 445 tq_unlock(taskQueue); 446 } 447 448 449 void 450 taskqueue_unblock(struct taskqueue *taskQueue) 451 { 452 if (taskQueue == NULL) 453 return; 454 455 tq_lock(taskQueue); 456 taskQueue->tq_flags &= ~TQ_FLAGS_BLOCKED; 457 if (taskQueue->tq_flags & TQ_FLAGS_PENDING) { 458 taskQueue->tq_flags &= ~TQ_FLAGS_PENDING; 459 taskQueue->tq_enqueue(taskQueue->tq_arg); 460 } 461 tq_unlock(taskQueue); 462 } 463 464 465 void 466 taskqueue_thread_enqueue(void *context) 467 { 468 struct taskqueue **tqp = context; 469 release_sem_etc((*tqp)->tq_sem, 1, B_DO_NOT_RESCHEDULE); 470 } 471 472 473 int 474 taskqueue_enqueue_fast(struct taskqueue *taskQueue, struct task *task) 475 { 476 return taskqueue_enqueue(taskQueue, task); 477 } 478 479 480 struct taskqueue * 481 taskqueue_create_fast(const char *name, int mflags, 482 taskqueue_enqueue_fn enqueueFunction, void *context) 483 { 484 return _taskqueue_create(name, mflags, 1, enqueueFunction, context); 485 } 486 487 488 void 489 task_init(struct task *task, int prio, task_fn_t handler, void *context) 490 { 491 task->ta_priority = prio; 492 task->ta_handler = handler; 493 task->ta_argument = context; 494 task->ta_pending = 0; 495 } 496 497 498 void 499 timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, 500 int priority, task_fn_t func, void *context) 501 { 502 TASK_INIT(&timeout_task->t, priority, func, context); 503 callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 504 CALLOUT_RETURNUNLOCKED); 505 timeout_task->q = queue; 506 timeout_task->f = 0; 507 } 508 509 510 status_t 511 init_taskqueues() 512 { 513 status_t status = B_NO_MEMORY; 514 515 if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE)) { 516 taskqueue_fast = taskqueue_create_fast("fast taskq", 0, 517 taskqueue_thread_enqueue, &taskqueue_fast); 518 if (taskqueue_fast == NULL) 519 return B_NO_MEMORY; 520 521 status = taskqueue_start_threads(&taskqueue_fast, 1, 522 B_REAL_TIME_PRIORITY, "fast taskq thread"); 523 if (status < B_OK) 524 goto err_1; 525 } 526 527 if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE)) { 528 taskqueue_swi = taskqueue_create_fast("swi taskq", 0, 529 taskqueue_thread_enqueue, &taskqueue_swi); 530 if (taskqueue_swi == NULL) { 531 status = B_NO_MEMORY; 532 goto err_1; 533 } 534 535 status = taskqueue_start_threads(&taskqueue_swi, 1, 536 B_REAL_TIME_PRIORITY, "swi taskq"); 537 if (status < B_OK) 538 goto err_2; 539 } 540 541 if (HAIKU_DRIVER_REQUIRES(FBSD_THREAD_TASKQUEUE)) { 542 taskqueue_thread = taskqueue_create_fast("thread taskq", 0, 543 taskqueue_thread_enqueue, &taskqueue_thread); 544 if (taskqueue_thread == NULL) { 545 status = B_NO_MEMORY; 546 goto err_2; 547 } 548 549 status = taskqueue_start_threads(&taskqueue_thread, 1, 550 B_REAL_TIME_PRIORITY, "swi taskq"); 551 if (status < B_OK) 552 goto err_3; 553 } 554 555 return B_OK; 556 557 err_3: 558 if (taskqueue_thread) 559 taskqueue_free(taskqueue_thread); 560 561 err_2: 562 if (taskqueue_swi) 563 taskqueue_free(taskqueue_swi); 564 565 err_1: 566 if (taskqueue_fast) 567 taskqueue_free(taskqueue_fast); 568 569 return status; 570 } 571 572 573 void 574 uninit_taskqueues() 575 { 576 if (HAIKU_DRIVER_REQUIRES(FBSD_THREAD_TASKQUEUE)) 577 taskqueue_free(taskqueue_thread); 578 579 if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE)) 580 taskqueue_free(taskqueue_swi); 581 582 if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE)) 583 taskqueue_free(taskqueue_fast); 584 } 585