|
Ruby
1.9.3p448(2013-06-27revision41675)
|
00001 /********************************************************************** 00002 00003 thread.c - 00004 00005 $Author: usa $ 00006 00007 Copyright (C) 2004-2007 Koichi Sasada 00008 00009 **********************************************************************/ 00010 00011 /* 00012 YARV Thread Design 00013 00014 model 1: Userlevel Thread 00015 Same as traditional ruby thread. 00016 00017 model 2: Native Thread with Global VM lock 00018 Using pthread (or Windows thread) and Ruby threads run concurrent. 00019 00020 model 3: Native Thread with fine grain lock 00021 Using pthread and Ruby threads run concurrent or parallel. 00022 00023 ------------------------------------------------------------------------ 00024 00025 model 2: 00026 A thread has mutex (GVL: Global VM Lock or Giant VM Lock) can run. 00027 When thread scheduling, running thread release GVL. If running thread 00028 try blocking operation, this thread must release GVL and another 00029 thread can continue this flow. After blocking operation, thread 00030 must check interrupt (RUBY_VM_CHECK_INTS). 00031 00032 Every VM can run parallel. 00033 00034 Ruby threads are scheduled by OS thread scheduler. 00035 00036 ------------------------------------------------------------------------ 00037 00038 model 3: 00039 Every threads run concurrent or parallel and to access shared object 00040 exclusive access control is needed. For example, to access String 00041 object or Array object, fine grain lock must be locked every time. 00042 */ 00043 00044 00045 /* 00046 * FD_SET, FD_CLR and FD_ISSET have a small sanity check when using glibc 00047 * 2.15 or later and set _FORTIFY_SOURCE > 0. 00048 * However, the implementation is wrong. Even though Linux's select(2) 00049 * support large fd size (>FD_SETSIZE), it wrongly assume fd is always 00050 * less than FD_SETSIZE (i.e. 1024). And then when enabling HAVE_RB_FD_INIT, 00051 * it doesn't work correctly and makes program abort. Therefore we need to 00052 * disable FORTY_SOURCE until glibc fixes it. 00053 */ 00054 #undef _FORTIFY_SOURCE 00055 #undef __USE_FORTIFY_LEVEL 00056 #define __USE_FORTIFY_LEVEL 0 00057 00058 /* for model 2 */ 00059 00060 #include "eval_intern.h" 00061 #include "gc.h" 00062 #include "internal.h" 00063 #include "ruby/io.h" 00064 00065 #ifndef USE_NATIVE_THREAD_PRIORITY 00066 #define USE_NATIVE_THREAD_PRIORITY 0 00067 #define RUBY_THREAD_PRIORITY_MAX 3 00068 #define RUBY_THREAD_PRIORITY_MIN -3 00069 #endif 00070 00071 #ifndef THREAD_DEBUG 00072 #define THREAD_DEBUG 0 00073 #endif 00074 00075 VALUE rb_cMutex; 00076 VALUE rb_cBarrier; 00077 00078 static void sleep_timeval(rb_thread_t *th, struct timeval time); 00079 static void sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec); 00080 static void sleep_forever(rb_thread_t *th, int nodeadlock); 00081 static double timeofday(void); 00082 static int rb_threadptr_dead(rb_thread_t *th); 00083 00084 static void rb_check_deadlock(rb_vm_t *vm); 00085 00086 #define eKillSignal INT2FIX(0) 00087 #define eTerminateSignal INT2FIX(1) 00088 static volatile int system_working = 1; 00089 00090 #define closed_stream_error GET_VM()->special_exceptions[ruby_error_closed_stream] 00091 00092 inline static void 00093 st_delete_wrap(st_table *table, st_data_t key) 00094 { 00095 st_delete(table, &key, 0); 00096 } 00097 00098 /********************************************************************************/ 00099 00100 #define THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION 00101 00102 struct rb_blocking_region_buffer { 00103 enum rb_thread_status prev_status; 00104 struct rb_unblock_callback oldubf; 00105 }; 00106 00107 static void set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, 00108 struct rb_unblock_callback *old); 00109 static void reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old); 00110 00111 static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region); 00112 00113 #define RB_GC_SAVE_MACHINE_CONTEXT(th) \ 00114 do { \ 00115 rb_gc_save_machine_context(th); \ 00116 SET_MACHINE_STACK_END(&(th)->machine_stack_end); \ 00117 } while (0) 00118 00119 #define GVL_UNLOCK_BEGIN() do { \ 00120 rb_thread_t *_th_stored = GET_THREAD(); \ 00121 RB_GC_SAVE_MACHINE_CONTEXT(_th_stored); \ 00122 gvl_release(_th_stored->vm); 00123 00124 #define GVL_UNLOCK_END() \ 00125 gvl_acquire(_th_stored->vm, _th_stored); \ 00126 rb_thread_set_current(_th_stored); \ 00127 } while(0) 00128 00129 #define blocking_region_begin(th, region, func, arg) \ 00130 do { \ 00131 (region)->prev_status = (th)->status; \ 00132 set_unblock_function((th), (func), (arg), &(region)->oldubf); \ 00133 (th)->blocking_region_buffer = (region); \ 00134 (th)->status = THREAD_STOPPED; \ 00135 thread_debug("enter blocking region (%p)\n", (void *)(th)); \ 00136 RB_GC_SAVE_MACHINE_CONTEXT(th); \ 00137 gvl_release((th)->vm); \ 00138 } while (0) 00139 00140 #define BLOCKING_REGION(exec, ubf, ubfarg) do { \ 00141 rb_thread_t *__th = GET_THREAD(); \ 00142 struct rb_blocking_region_buffer __region; \ 00143 blocking_region_begin(__th, &__region, (ubf), (ubfarg)); \ 00144 exec; \ 00145 blocking_region_end(__th, &__region); \ 00146 RUBY_VM_CHECK_INTS(); \ 00147 } while(0) 00148 00149 #if THREAD_DEBUG 00150 #ifdef HAVE_VA_ARGS_MACRO 00151 void rb_thread_debug(const char *file, int line, const char *fmt, ...); 00152 #define thread_debug(fmt, ...) rb_thread_debug(__FILE__, __LINE__, fmt, ##__VA_ARGS__) 00153 #define POSITION_FORMAT "%s:%d:" 00154 #define POSITION_ARGS ,file, line 00155 #else 00156 void rb_thread_debug(const char *fmt, ...); 00157 #define thread_debug rb_thread_debug 00158 #define POSITION_FORMAT 00159 #define POSITION_ARGS 00160 #endif 00161 00162 # if THREAD_DEBUG < 0 00163 static int rb_thread_debug_enabled; 00164 00165 /* 00166 * call-seq: 00167 * Thread.DEBUG -> num 00168 * 00169 * Returns the thread debug level. Available only if compiled with 00170 * THREAD_DEBUG=-1. 00171 */ 00172 00173 static VALUE 00174 rb_thread_s_debug(void) 00175 { 00176 return INT2NUM(rb_thread_debug_enabled); 00177 } 00178 00179 /* 00180 * call-seq: 00181 * Thread.DEBUG = num 00182 * 00183 * Sets the thread debug level. Available only if compiled with 00184 * THREAD_DEBUG=-1. 00185 */ 00186 00187 static VALUE 00188 rb_thread_s_debug_set(VALUE self, VALUE val) 00189 { 00190 rb_thread_debug_enabled = RTEST(val) ? NUM2INT(val) : 0; 00191 return val; 00192 } 00193 # else 00194 # define rb_thread_debug_enabled THREAD_DEBUG 00195 # endif 00196 #else 00197 #define thread_debug if(0)printf 00198 #endif 00199 00200 #ifndef __ia64 00201 #define thread_start_func_2(th, st, rst) thread_start_func_2(th, st) 00202 #endif 00203 NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start, 00204 VALUE *register_stack_start)); 00205 static void timer_thread_function(void *); 00206 00207 #if defined(_WIN32) 00208 #include "thread_win32.c" 00209 00210 #define DEBUG_OUT() \ 00211 WaitForSingleObject(&debug_mutex, INFINITE); \ 00212 printf(POSITION_FORMAT"%p - %s" POSITION_ARGS, GetCurrentThreadId(), buf); \ 00213 fflush(stdout); \ 00214 ReleaseMutex(&debug_mutex); 00215 00216 #elif defined(HAVE_PTHREAD_H) 00217 #include "thread_pthread.c" 00218 00219 #define DEBUG_OUT() \ 00220 pthread_mutex_lock(&debug_mutex); \ 00221 printf(POSITION_FORMAT"%#"PRIxVALUE" - %s" POSITION_ARGS, (VALUE)pthread_self(), buf); \ 00222 fflush(stdout); \ 00223 pthread_mutex_unlock(&debug_mutex); 00224 00225 #else 00226 #error "unsupported thread type" 00227 #endif 00228 00229 #if THREAD_DEBUG 00230 static int debug_mutex_initialized = 1; 00231 static rb_thread_lock_t debug_mutex; 00232 00233 void 00234 rb_thread_debug( 00235 #ifdef HAVE_VA_ARGS_MACRO 00236 const char *file, int line, 00237 #endif 00238 const char *fmt, ...) 00239 { 00240 va_list args; 00241 char buf[BUFSIZ]; 00242 00243 if (!rb_thread_debug_enabled) return; 00244 00245 if (debug_mutex_initialized == 1) { 00246 debug_mutex_initialized = 0; 00247 native_mutex_initialize(&debug_mutex); 00248 } 00249 00250 va_start(args, fmt); 00251 vsnprintf(buf, BUFSIZ, fmt, args); 00252 va_end(args); 00253 00254 DEBUG_OUT(); 00255 } 00256 #endif 00257 00258 void 00259 rb_vm_gvl_destroy(rb_vm_t *vm) 00260 { 00261 gvl_release(vm); 00262 gvl_destroy(vm); 00263 } 00264 00265 void 00266 rb_thread_lock_unlock(rb_thread_lock_t *lock) 00267 { 00268 native_mutex_unlock(lock); 00269 } 00270 00271 void 00272 rb_thread_lock_destroy(rb_thread_lock_t *lock) 00273 { 00274 native_mutex_destroy(lock); 00275 } 00276 00277 static void 00278 set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, 00279 struct rb_unblock_callback *old) 00280 { 00281 check_ints: 00282 RUBY_VM_CHECK_INTS(); /* check signal or so */ 00283 native_mutex_lock(&th->interrupt_lock); 00284 if (th->interrupt_flag) { 00285 native_mutex_unlock(&th->interrupt_lock); 00286 goto check_ints; 00287 } 00288 else { 00289 if (old) *old = th->unblock; 00290 th->unblock.func = func; 00291 th->unblock.arg = arg; 00292 } 00293 native_mutex_unlock(&th->interrupt_lock); 00294 } 00295 00296 static void 00297 reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old) 00298 { 00299 native_mutex_lock(&th->interrupt_lock); 00300 th->unblock = *old; 00301 native_mutex_unlock(&th->interrupt_lock); 00302 } 00303 00304 void 00305 rb_threadptr_interrupt(rb_thread_t *th) 00306 { 00307 native_mutex_lock(&th->interrupt_lock); 00308 RUBY_VM_SET_INTERRUPT(th); 00309 if (th->unblock.func) { 00310 (th->unblock.func)(th->unblock.arg); 00311 } 00312 else { 00313 /* none */ 00314 } 00315 native_mutex_unlock(&th->interrupt_lock); 00316 } 00317 00318 00319 static int 00320 terminate_i(st_data_t key, st_data_t val, rb_thread_t *main_thread) 00321 { 00322 VALUE thval = key; 00323 rb_thread_t *th; 00324 GetThreadPtr(thval, th); 00325 00326 if (th != main_thread) { 00327 thread_debug("terminate_i: %p\n", (void *)th); 00328 rb_threadptr_interrupt(th); 00329 th->thrown_errinfo = eTerminateSignal; 00330 th->status = THREAD_TO_KILL; 00331 } 00332 else { 00333 thread_debug("terminate_i: main thread (%p)\n", (void *)th); 00334 } 00335 return ST_CONTINUE; 00336 } 00337 00338 typedef struct rb_mutex_struct 00339 { 00340 rb_thread_lock_t lock; 00341 rb_thread_cond_t cond; 00342 struct rb_thread_struct volatile *th; 00343 int cond_waiting; 00344 struct rb_mutex_struct *next_mutex; 00345 } rb_mutex_t; 00346 00347 static void rb_mutex_abandon_all(rb_mutex_t *mutexes); 00348 static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th); 00349 00350 void 00351 rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) 00352 { 00353 const char *err; 00354 rb_mutex_t *mutex; 00355 rb_mutex_t *mutexes = th->keeping_mutexes; 00356 00357 while (mutexes) { 00358 mutex = mutexes; 00359 /* rb_warn("mutex #<%p> remains to be locked by terminated thread", 00360 mutexes); */ 00361 mutexes = mutex->next_mutex; 00362 err = rb_mutex_unlock_th(mutex, th); 00363 if (err) rb_bug("invalid keeping_mutexes: %s", err); 00364 } 00365 } 00366 00367 void 00368 rb_thread_terminate_all(void) 00369 { 00370 rb_thread_t *th = GET_THREAD(); /* main thread */ 00371 rb_vm_t *vm = th->vm; 00372 00373 if (vm->main_thread != th) { 00374 rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)", 00375 (void *)vm->main_thread, (void *)th); 00376 } 00377 00378 /* unlock all locking mutexes */ 00379 rb_threadptr_unlock_all_locking_mutexes(th); 00380 00381 thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th); 00382 st_foreach(vm->living_threads, terminate_i, (st_data_t)th); 00383 vm->inhibit_thread_creation = 1; 00384 00385 while (!rb_thread_alone()) { 00386 PUSH_TAG(); 00387 if (EXEC_TAG() == 0) { 00388 rb_thread_schedule(); 00389 } 00390 else { 00391 /* ignore exception */ 00392 } 00393 POP_TAG(); 00394 } 00395 } 00396 00397 static void 00398 thread_cleanup_func_before_exec(void *th_ptr) 00399 { 00400 rb_thread_t *th = th_ptr; 00401 th->status = THREAD_KILLED; 00402 th->machine_stack_start = th->machine_stack_end = 0; 00403 #ifdef __ia64 00404 th->machine_register_stack_start = th->machine_register_stack_end = 0; 00405 #endif 00406 } 00407 00408 static void 00409 thread_cleanup_func(void *th_ptr, int atfork) 00410 { 00411 rb_thread_t *th = th_ptr; 00412 00413 th->locking_mutex = Qfalse; 00414 thread_cleanup_func_before_exec(th_ptr); 00415 00416 /* 00417 * Unfortunately, we can't release native threading resource at fork 00418 * because libc may have unstable locking state therefore touching 00419 * a threading resource may cause a deadlock. 00420 */ 00421 if (atfork) 00422 return; 00423 00424 native_mutex_destroy(&th->interrupt_lock); 00425 native_thread_destroy(th); 00426 } 00427 00428 static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *); 00429 00430 void 00431 ruby_thread_init_stack(rb_thread_t *th) 00432 { 00433 native_thread_init_stack(th); 00434 } 00435 00436 static int 00437 thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_start) 00438 { 00439 int state; 00440 VALUE args = th->first_args; 00441 rb_proc_t *proc; 00442 rb_thread_t *join_th; 00443 rb_thread_t *main_th; 00444 VALUE errinfo = Qnil; 00445 # ifdef USE_SIGALTSTACK 00446 void rb_register_sigaltstack(rb_thread_t *th); 00447 00448 rb_register_sigaltstack(th); 00449 # endif 00450 00451 ruby_thread_set_native(th); 00452 00453 th->machine_stack_start = stack_start; 00454 #ifdef __ia64 00455 th->machine_register_stack_start = register_stack_start; 00456 #endif 00457 thread_debug("thread start: %p\n", (void *)th); 00458 00459 gvl_acquire(th->vm, th); 00460 { 00461 thread_debug("thread start (get lock): %p\n", (void *)th); 00462 rb_thread_set_current(th); 00463 00464 TH_PUSH_TAG(th); 00465 if ((state = EXEC_TAG()) == 0) { 00466 SAVE_ROOT_JMPBUF(th, { 00467 if (!th->first_func) { 00468 GetProcPtr(th->first_proc, proc); 00469 th->errinfo = Qnil; 00470 th->local_lfp = proc->block.lfp; 00471 th->local_svar = Qnil; 00472 th->value = rb_vm_invoke_proc(th, proc, proc->block.self, 00473 (int)RARRAY_LEN(args), RARRAY_PTR(args), 0); 00474 } 00475 else { 00476 th->value = (*th->first_func)((void *)args); 00477 } 00478 }); 00479 } 00480 else { 00481 errinfo = th->errinfo; 00482 if (NIL_P(errinfo)) errinfo = rb_errinfo(); 00483 if (state == TAG_FATAL) { 00484 /* fatal error within this thread, need to stop whole script */ 00485 } 00486 else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) { 00487 if (th->safe_level >= 4) { 00488 th->errinfo = rb_exc_new3(rb_eSecurityError, 00489 rb_sprintf("Insecure exit at level %d", th->safe_level)); 00490 errinfo = Qnil; 00491 } 00492 } 00493 else if (th->safe_level < 4 && 00494 (th->vm->thread_abort_on_exception || 00495 th->abort_on_exception || RTEST(ruby_debug))) { 00496 /* exit on main_thread */ 00497 } 00498 else { 00499 errinfo = Qnil; 00500 } 00501 th->value = Qnil; 00502 } 00503 00504 th->status = THREAD_KILLED; 00505 thread_debug("thread end: %p\n", (void *)th); 00506 00507 main_th = th->vm->main_thread; 00508 if (th != main_th) { 00509 if (TYPE(errinfo) == T_OBJECT) { 00510 /* treat with normal error object */ 00511 rb_threadptr_raise(main_th, 1, &errinfo); 00512 } 00513 } 00514 TH_POP_TAG(); 00515 00516 /* locking_mutex must be Qfalse */ 00517 if (th->locking_mutex != Qfalse) { 00518 rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")", 00519 (void *)th, th->locking_mutex); 00520 } 00521 00522 /* delete self other than main thread from living_threads */ 00523 if (th != main_th) { 00524 st_delete_wrap(th->vm->living_threads, th->self); 00525 } 00526 00527 /* wake up joining threads */ 00528 join_th = th->join_list_head; 00529 while (join_th) { 00530 if (join_th == main_th) errinfo = Qnil; 00531 rb_threadptr_interrupt(join_th); 00532 switch (join_th->status) { 00533 case THREAD_STOPPED: case THREAD_STOPPED_FOREVER: 00534 join_th->status = THREAD_RUNNABLE; 00535 default: break; 00536 } 00537 join_th = join_th->join_list_next; 00538 } 00539 00540 rb_threadptr_unlock_all_locking_mutexes(th); 00541 if (th != main_th) rb_check_deadlock(th->vm); 00542 00543 if (!th->root_fiber) { 00544 rb_thread_recycle_stack_release(th->stack); 00545 th->stack = 0; 00546 } 00547 } 00548 if (th->vm->main_thread == th) { 00549 ruby_cleanup(state); 00550 } 00551 else { 00552 thread_cleanup_func(th, FALSE); 00553 gvl_release(th->vm); 00554 } 00555 00556 return 0; 00557 } 00558 00559 static VALUE 00560 thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS)) 00561 { 00562 rb_thread_t *th; 00563 int err; 00564 00565 if (OBJ_FROZEN(GET_THREAD()->thgroup)) { 00566 rb_raise(rb_eThreadError, 00567 "can't start a new thread (frozen ThreadGroup)"); 00568 } 00569 GetThreadPtr(thval, th); 00570 00571 /* setup thread environment */ 00572 th->first_func = fn; 00573 th->first_proc = fn ? Qfalse : rb_block_proc(); 00574 th->first_args = args; /* GC: shouldn't put before above line */ 00575 00576 th->priority = GET_THREAD()->priority; 00577 th->thgroup = GET_THREAD()->thgroup; 00578 00579 native_mutex_initialize(&th->interrupt_lock); 00580 if (GET_VM()->event_hooks != NULL) 00581 th->event_flags |= RUBY_EVENT_VM; 00582 00583 /* kick thread */ 00584 st_insert(th->vm->living_threads, thval, (st_data_t) th->thread_id); 00585 err = native_thread_create(th); 00586 if (err) { 00587 st_delete_wrap(th->vm->living_threads, th->self); 00588 th->status = THREAD_KILLED; 00589 rb_raise(rb_eThreadError, "can't create Thread (%d)", err); 00590 } 00591 return thval; 00592 } 00593 00594 /* :nodoc: */ 00595 static VALUE 00596 thread_s_new(int argc, VALUE *argv, VALUE klass) 00597 { 00598 rb_thread_t *th; 00599 VALUE thread = rb_thread_alloc(klass); 00600 00601 if (GET_VM()->inhibit_thread_creation) 00602 rb_raise(rb_eThreadError, "can't alloc thread"); 00603 00604 rb_obj_call_init(thread, argc, argv); 00605 GetThreadPtr(thread, th); 00606 if (!th->first_args) { 00607 rb_raise(rb_eThreadError, "uninitialized thread - check `%s#initialize'", 00608 rb_class2name(klass)); 00609 } 00610 return thread; 00611 } 00612 00613 /* 00614 * call-seq: 00615 * Thread.start([args]*) {|args| block } -> thread 00616 * Thread.fork([args]*) {|args| block } -> thread 00617 * 00618 * Basically the same as <code>Thread::new</code>. However, if class 00619 * <code>Thread</code> is subclassed, then calling <code>start</code> in that 00620 * subclass will not invoke the subclass's <code>initialize</code> method. 00621 */ 00622 00623 static VALUE 00624 thread_start(VALUE klass, VALUE args) 00625 { 00626 return thread_create_core(rb_thread_alloc(klass), args, 0); 00627 } 00628 00629 /* :nodoc: */ 00630 static VALUE 00631 thread_initialize(VALUE thread, VALUE args) 00632 { 00633 rb_thread_t *th; 00634 if (!rb_block_given_p()) { 00635 rb_raise(rb_eThreadError, "must be called with a block"); 00636 } 00637 GetThreadPtr(thread, th); 00638 if (th->first_args) { 00639 VALUE proc = th->first_proc, line, loc; 00640 const char *file; 00641 if (!proc || !RTEST(loc = rb_proc_location(proc))) { 00642 rb_raise(rb_eThreadError, "already initialized thread"); 00643 } 00644 file = RSTRING_PTR(RARRAY_PTR(loc)[0]); 00645 if (NIL_P(line = RARRAY_PTR(loc)[1])) { 00646 rb_raise(rb_eThreadError, "already initialized thread - %s", 00647 file); 00648 } 00649 rb_raise(rb_eThreadError, "already initialized thread - %s:%d", 00650 file, NUM2INT(line)); 00651 } 00652 return thread_create_core(thread, args, 0); 00653 } 00654 00655 VALUE 00656 rb_thread_create(VALUE (*fn)(ANYARGS), void *arg) 00657 { 00658 return thread_create_core(rb_thread_alloc(rb_cThread), (VALUE)arg, fn); 00659 } 00660 00661 00662 /* +infty, for this purpose */ 00663 #define DELAY_INFTY 1E30 00664 00665 struct join_arg { 00666 rb_thread_t *target, *waiting; 00667 double limit; 00668 int forever; 00669 }; 00670 00671 static VALUE 00672 remove_from_join_list(VALUE arg) 00673 { 00674 struct join_arg *p = (struct join_arg *)arg; 00675 rb_thread_t *target_th = p->target, *th = p->waiting; 00676 00677 if (target_th->status != THREAD_KILLED) { 00678 rb_thread_t **pth = &target_th->join_list_head; 00679 00680 while (*pth) { 00681 if (*pth == th) { 00682 *pth = th->join_list_next; 00683 break; 00684 } 00685 pth = &(*pth)->join_list_next; 00686 } 00687 } 00688 00689 return Qnil; 00690 } 00691 00692 static VALUE 00693 thread_join_sleep(VALUE arg) 00694 { 00695 struct join_arg *p = (struct join_arg *)arg; 00696 rb_thread_t *target_th = p->target, *th = p->waiting; 00697 double now, limit = p->limit; 00698 00699 while (target_th->status != THREAD_KILLED) { 00700 if (p->forever) { 00701 sleep_forever(th, 1); 00702 } 00703 else { 00704 now = timeofday(); 00705 if (now > limit) { 00706 thread_debug("thread_join: timeout (thid: %p)\n", 00707 (void *)target_th->thread_id); 00708 return Qfalse; 00709 } 00710 sleep_wait_for_interrupt(th, limit - now); 00711 } 00712 thread_debug("thread_join: interrupted (thid: %p)\n", 00713 (void *)target_th->thread_id); 00714 } 00715 return Qtrue; 00716 } 00717 00718 static VALUE 00719 thread_join(rb_thread_t *target_th, double delay) 00720 { 00721 rb_thread_t *th = GET_THREAD(); 00722 struct join_arg arg; 00723 00724 arg.target = target_th; 00725 arg.waiting = th; 00726 arg.limit = timeofday() + delay; 00727 arg.forever = delay == DELAY_INFTY; 00728 00729 thread_debug("thread_join (thid: %p)\n", (void *)target_th->thread_id); 00730 00731 if (target_th->status != THREAD_KILLED) { 00732 th->join_list_next = target_th->join_list_head; 00733 target_th->join_list_head = th; 00734 if (!rb_ensure(thread_join_sleep, (VALUE)&arg, 00735 remove_from_join_list, (VALUE)&arg)) { 00736 return Qnil; 00737 } 00738 } 00739 00740 thread_debug("thread_join: success (thid: %p)\n", 00741 (void *)target_th->thread_id); 00742 00743 if (target_th->errinfo != Qnil) { 00744 VALUE err = target_th->errinfo; 00745 00746 if (FIXNUM_P(err)) { 00747 /* */ 00748 } 00749 else if (TYPE(target_th->errinfo) == T_NODE) { 00750 rb_exc_raise(rb_vm_make_jump_tag_but_local_jump( 00751 GET_THROWOBJ_STATE(err), GET_THROWOBJ_VAL(err))); 00752 } 00753 else { 00754 /* normal exception */ 00755 rb_exc_raise(err); 00756 } 00757 } 00758 return target_th->self; 00759 } 00760 00761 /* 00762 * call-seq: 00763 * thr.join -> thr 00764 * thr.join(limit) -> thr 00765 * 00766 * The calling thread will suspend execution and run <i>thr</i>. Does not 00767 * return until <i>thr</i> exits or until <i>limit</i> seconds have passed. If 00768 * the time limit expires, <code>nil</code> will be returned, otherwise 00769 * <i>thr</i> is returned. 00770 * 00771 * Any threads not joined will be killed when the main program exits. If 00772 * <i>thr</i> had previously raised an exception and the 00773 * <code>abort_on_exception</code> and <code>$DEBUG</code> flags are not set 00774 * (so the exception has not yet been processed) it will be processed at this 00775 * time. 00776 * 00777 * a = Thread.new { print "a"; sleep(10); print "b"; print "c" } 00778 * x = Thread.new { print "x"; Thread.pass; print "y"; print "z" } 00779 * x.join # Let x thread finish, a will be killed on exit. 00780 * 00781 * <em>produces:</em> 00782 * 00783 * axyz 00784 * 00785 * The following example illustrates the <i>limit</i> parameter. 00786 * 00787 * y = Thread.new { 4.times { sleep 0.1; puts 'tick... ' }} 00788 * puts "Waiting" until y.join(0.15) 00789 * 00790 * <em>produces:</em> 00791 * 00792 * tick... 00793 * Waiting 00794 * tick... 00795 * Waitingtick... 00796 * 00797 * 00798 * tick... 00799 */ 00800 00801 static VALUE 00802 thread_join_m(int argc, VALUE *argv, VALUE self) 00803 { 00804 rb_thread_t *target_th; 00805 double delay = DELAY_INFTY; 00806 VALUE limit; 00807 00808 GetThreadPtr(self, target_th); 00809 00810 rb_scan_args(argc, argv, "01", &limit); 00811 if (!NIL_P(limit)) { 00812 delay = rb_num2dbl(limit); 00813 } 00814 00815 return thread_join(target_th, delay); 00816 } 00817 00818 /* 00819 * call-seq: 00820 * thr.value -> obj 00821 * 00822 * Waits for <i>thr</i> to complete (via <code>Thread#join</code>) and returns 00823 * its value. 00824 * 00825 * a = Thread.new { 2 + 2 } 00826 * a.value #=> 4 00827 */ 00828 00829 static VALUE 00830 thread_value(VALUE self) 00831 { 00832 rb_thread_t *th; 00833 GetThreadPtr(self, th); 00834 thread_join(th, DELAY_INFTY); 00835 return th->value; 00836 } 00837 00838 /* 00839 * Thread Scheduling 00840 */ 00841 00842 static struct timeval 00843 double2timeval(double d) 00844 { 00845 struct timeval time; 00846 00847 time.tv_sec = (int)d; 00848 time.tv_usec = (int)((d - (int)d) * 1e6); 00849 if (time.tv_usec < 0) { 00850 time.tv_usec += (int)1e6; 00851 time.tv_sec -= 1; 00852 } 00853 return time; 00854 } 00855 00856 static void 00857 sleep_forever(rb_thread_t *th, int deadlockable) 00858 { 00859 enum rb_thread_status prev_status = th->status; 00860 enum rb_thread_status status = deadlockable ? THREAD_STOPPED_FOREVER : THREAD_STOPPED; 00861 00862 th->status = status; 00863 do { 00864 if (deadlockable) { 00865 th->vm->sleeper++; 00866 rb_check_deadlock(th->vm); 00867 } 00868 native_sleep(th, 0); 00869 if (deadlockable) { 00870 th->vm->sleeper--; 00871 } 00872 RUBY_VM_CHECK_INTS(); 00873 } while (th->status == status); 00874 th->status = prev_status; 00875 } 00876 00877 static void 00878 getclockofday(struct timeval *tp) 00879 { 00880 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 00881 struct timespec ts; 00882 00883 if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) { 00884 tp->tv_sec = ts.tv_sec; 00885 tp->tv_usec = ts.tv_nsec / 1000; 00886 } else 00887 #endif 00888 { 00889 gettimeofday(tp, NULL); 00890 } 00891 } 00892 00893 static void 00894 sleep_timeval(rb_thread_t *th, struct timeval tv) 00895 { 00896 struct timeval to, tvn; 00897 enum rb_thread_status prev_status = th->status; 00898 00899 getclockofday(&to); 00900 to.tv_sec += tv.tv_sec; 00901 if ((to.tv_usec += tv.tv_usec) >= 1000000) { 00902 to.tv_sec++; 00903 to.tv_usec -= 1000000; 00904 } 00905 00906 th->status = THREAD_STOPPED; 00907 do { 00908 native_sleep(th, &tv); 00909 RUBY_VM_CHECK_INTS(); 00910 getclockofday(&tvn); 00911 if (to.tv_sec < tvn.tv_sec) break; 00912 if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break; 00913 thread_debug("sleep_timeval: %ld.%.6ld > %ld.%.6ld\n", 00914 (long)to.tv_sec, (long)to.tv_usec, 00915 (long)tvn.tv_sec, (long)tvn.tv_usec); 00916 tv.tv_sec = to.tv_sec - tvn.tv_sec; 00917 if ((tv.tv_usec = to.tv_usec - tvn.tv_usec) < 0) { 00918 --tv.tv_sec; 00919 tv.tv_usec += 1000000; 00920 } 00921 } while (th->status == THREAD_STOPPED); 00922 th->status = prev_status; 00923 } 00924 00925 void 00926 rb_thread_sleep_forever(void) 00927 { 00928 thread_debug("rb_thread_sleep_forever\n"); 00929 sleep_forever(GET_THREAD(), 0); 00930 } 00931 00932 static void 00933 rb_thread_sleep_deadly(void) 00934 { 00935 thread_debug("rb_thread_sleep_deadly\n"); 00936 sleep_forever(GET_THREAD(), 1); 00937 } 00938 00939 static double 00940 timeofday(void) 00941 { 00942 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 00943 struct timespec tp; 00944 00945 if (clock_gettime(CLOCK_MONOTONIC, &tp) == 0) { 00946 return (double)tp.tv_sec + (double)tp.tv_nsec * 1e-9; 00947 } else 00948 #endif 00949 { 00950 struct timeval tv; 00951 gettimeofday(&tv, NULL); 00952 return (double)tv.tv_sec + (double)tv.tv_usec * 1e-6; 00953 } 00954 } 00955 00956 static void 00957 sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec) 00958 { 00959 sleep_timeval(th, double2timeval(sleepsec)); 00960 } 00961 00962 static void 00963 sleep_for_polling(rb_thread_t *th) 00964 { 00965 struct timeval time; 00966 time.tv_sec = 0; 00967 time.tv_usec = 100 * 1000; /* 0.1 sec */ 00968 sleep_timeval(th, time); 00969 } 00970 00971 void 00972 rb_thread_wait_for(struct timeval time) 00973 { 00974 rb_thread_t *th = GET_THREAD(); 00975 sleep_timeval(th, time); 00976 } 00977 00978 void 00979 rb_thread_polling(void) 00980 { 00981 RUBY_VM_CHECK_INTS(); 00982 if (!rb_thread_alone()) { 00983 rb_thread_t *th = GET_THREAD(); 00984 sleep_for_polling(th); 00985 } 00986 } 00987 00988 /* 00989 * CAUTION: This function causes thread switching. 00990 * rb_thread_check_ints() check ruby's interrupts. 00991 * some interrupt needs thread switching/invoke handlers, 00992 * and so on. 00993 */ 00994 00995 void 00996 rb_thread_check_ints(void) 00997 { 00998 RUBY_VM_CHECK_INTS(); 00999 } 01000 01001 /* 01002 * Hidden API for tcl/tk wrapper. 01003 * There is no guarantee to perpetuate it. 01004 */ 01005 int 01006 rb_thread_check_trap_pending(void) 01007 { 01008 return rb_signal_buff_size() != 0; 01009 } 01010 01011 /* This function can be called in blocking region. */ 01012 int 01013 rb_thread_interrupted(VALUE thval) 01014 { 01015 rb_thread_t *th; 01016 GetThreadPtr(thval, th); 01017 return RUBY_VM_INTERRUPTED(th); 01018 } 01019 01020 void 01021 rb_thread_sleep(int sec) 01022 { 01023 rb_thread_wait_for(rb_time_timeval(INT2FIX(sec))); 01024 } 01025 01026 static void rb_threadptr_execute_interrupts_common(rb_thread_t *); 01027 01028 static void 01029 rb_thread_schedule_limits(unsigned long limits_us) 01030 { 01031 thread_debug("rb_thread_schedule\n"); 01032 if (!rb_thread_alone()) { 01033 rb_thread_t *th = GET_THREAD(); 01034 01035 if (th->running_time_us >= limits_us) { 01036 thread_debug("rb_thread_schedule/switch start\n"); 01037 RB_GC_SAVE_MACHINE_CONTEXT(th); 01038 gvl_yield(th->vm, th); 01039 rb_thread_set_current(th); 01040 thread_debug("rb_thread_schedule/switch done\n"); 01041 } 01042 } 01043 } 01044 01045 void 01046 rb_thread_schedule(void) 01047 { 01048 rb_thread_schedule_limits(0); 01049 01050 if (UNLIKELY(GET_THREAD()->interrupt_flag)) { 01051 rb_threadptr_execute_interrupts_common(GET_THREAD()); 01052 } 01053 } 01054 01055 /* blocking region */ 01056 01057 static inline void 01058 blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region) 01059 { 01060 gvl_acquire(th->vm, th); 01061 rb_thread_set_current(th); 01062 thread_debug("leave blocking region (%p)\n", (void *)th); 01063 remove_signal_thread_list(th); 01064 th->blocking_region_buffer = 0; 01065 reset_unblock_function(th, ®ion->oldubf); 01066 if (th->status == THREAD_STOPPED) { 01067 th->status = region->prev_status; 01068 } 01069 } 01070 01071 struct rb_blocking_region_buffer * 01072 rb_thread_blocking_region_begin(void) 01073 { 01074 rb_thread_t *th = GET_THREAD(); 01075 struct rb_blocking_region_buffer *region = ALLOC(struct rb_blocking_region_buffer); 01076 blocking_region_begin(th, region, ubf_select, th); 01077 return region; 01078 } 01079 01080 void 01081 rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region) 01082 { 01083 int saved_errno = errno; 01084 rb_thread_t *th = GET_THREAD(); 01085 blocking_region_end(th, region); 01086 xfree(region); 01087 RUBY_VM_CHECK_INTS(); 01088 errno = saved_errno; 01089 } 01090 01091 /* 01092 * rb_thread_blocking_region - permit concurrent/parallel execution. 01093 * 01094 * This function does: 01095 * (1) release GVL. 01096 * Other Ruby threads may run in parallel. 01097 * (2) call func with data1. 01098 * (3) acquire GVL. 01099 * Other Ruby threads can not run in parallel any more. 01100 * 01101 * If another thread interrupts this thread (Thread#kill, signal delivery, 01102 * VM-shutdown request, and so on), `ubf()' is called (`ubf()' means 01103 * "un-blocking function"). `ubf()' should interrupt `func()' execution. 01104 * 01105 * There are built-in ubfs and you can specify these ubfs. 01106 * However, we can not guarantee our built-in ubfs interrupt 01107 * your `func()' correctly. Be careful to use rb_thread_blocking_region(). 01108 * 01109 * * RUBY_UBF_IO: ubf for IO operation 01110 * * RUBY_UBF_PROCESS: ubf for process operation 01111 * 01112 * NOTE: You can not execute most of Ruby C API and touch Ruby 01113 * objects in `func()' and `ubf()', including raising an 01114 * exception, because current thread doesn't acquire GVL 01115 * (cause synchronization problem). If you need to do it, 01116 * read source code of C APIs and confirm by yourself. 01117 * 01118 * NOTE: In short, this API is difficult to use safely. I recommend you 01119 * use other ways if you have. We lack experiences to use this API. 01120 * Please report your problem related on it. 01121 * 01122 * Safe C API: 01123 * * rb_thread_interrupted() - check interrupt flag 01124 * * ruby_xalloc(), ruby_xrealloc(), ruby_xfree() - 01125 * if they called without GVL, acquire GVL automatically. 01126 */ 01127 VALUE 01128 rb_thread_blocking_region( 01129 rb_blocking_function_t *func, void *data1, 01130 rb_unblock_function_t *ubf, void *data2) 01131 { 01132 VALUE val; 01133 rb_thread_t *th = GET_THREAD(); 01134 int saved_errno = 0; 01135 01136 th->waiting_fd = -1; 01137 if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) { 01138 ubf = ubf_select; 01139 data2 = th; 01140 } 01141 01142 BLOCKING_REGION({ 01143 val = func(data1); 01144 saved_errno = errno; 01145 }, ubf, data2); 01146 errno = saved_errno; 01147 01148 return val; 01149 } 01150 01151 VALUE 01152 rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) 01153 { 01154 VALUE val; 01155 rb_thread_t *th = GET_THREAD(); 01156 int saved_errno = 0; 01157 01158 th->waiting_fd = fd; 01159 BLOCKING_REGION({ 01160 val = func(data1); 01161 saved_errno = errno; 01162 }, ubf_select, th); 01163 th->waiting_fd = -1; 01164 errno = saved_errno; 01165 01166 return val; 01167 } 01168 01169 /* alias of rb_thread_blocking_region() */ 01170 01171 VALUE 01172 rb_thread_call_without_gvl( 01173 rb_blocking_function_t *func, void *data1, 01174 rb_unblock_function_t *ubf, void *data2) 01175 { 01176 return rb_thread_blocking_region(func, data1, ubf, data2); 01177 } 01178 01179 /* 01180 * rb_thread_call_with_gvl - re-enter into Ruby world while releasing GVL. 01181 * 01182 *** 01183 *** This API is EXPERIMENTAL! 01184 *** We do not guarantee that this API remains in ruby 1.9.2 or later. 01185 *** 01186 * 01187 * While releasing GVL using rb_thread_blocking_region() or 01188 * rb_thread_call_without_gvl(), you can not access Ruby values or invoke methods. 01189 * If you need to access it, you must use this function rb_thread_call_with_gvl(). 01190 * 01191 * This function rb_thread_call_with_gvl() does: 01192 * (1) acquire GVL. 01193 * (2) call passed function `func'. 01194 * (3) release GVL. 01195 * (4) return a value which is returned at (2). 01196 * 01197 * NOTE: You should not return Ruby object at (2) because such Object 01198 * will not marked. 01199 * 01200 * NOTE: If an exception is raised in `func', this function "DOES NOT" 01201 * protect (catch) the exception. If you have any resources 01202 * which should free before throwing exception, you need use 01203 * rb_protect() in `func' and return a value which represents 01204 * exception is raised. 01205 * 01206 * NOTE: This functions should not be called by a thread which 01207 * is not created as Ruby thread (created by Thread.new or so). 01208 * In other words, this function *DOES NOT* associate 01209 * NON-Ruby thread to Ruby thread. 01210 */ 01211 void * 01212 rb_thread_call_with_gvl(void *(*func)(void *), void *data1) 01213 { 01214 rb_thread_t *th = ruby_thread_from_native(); 01215 struct rb_blocking_region_buffer *brb; 01216 struct rb_unblock_callback prev_unblock; 01217 void *r; 01218 01219 if (th == 0) { 01220 /* Error is occurred, but we can't use rb_bug() 01221 * because this thread is not Ruby's thread. 01222 * What should we do? 01223 */ 01224 01225 fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n"); 01226 exit(EXIT_FAILURE); 01227 } 01228 01229 brb = (struct rb_blocking_region_buffer *)th->blocking_region_buffer; 01230 prev_unblock = th->unblock; 01231 01232 if (brb == 0) { 01233 rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL."); 01234 } 01235 01236 blocking_region_end(th, brb); 01237 /* enter to Ruby world: You can access Ruby values, methods and so on. */ 01238 r = (*func)(data1); 01239 /* leave from Ruby world: You can not access Ruby values, etc. */ 01240 blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg); 01241 return r; 01242 } 01243 01244 /* 01245 * ruby_thread_has_gvl_p - check if current native thread has GVL. 01246 * 01247 *** 01248 *** This API is EXPERIMENTAL! 01249 *** We do not guarantee that this API remains in ruby 1.9.2 or later. 01250 *** 01251 */ 01252 01253 int 01254 ruby_thread_has_gvl_p(void) 01255 { 01256 rb_thread_t *th = ruby_thread_from_native(); 01257 01258 if (th && th->blocking_region_buffer == 0) { 01259 return 1; 01260 } 01261 else { 01262 return 0; 01263 } 01264 } 01265 01266 /* 01267 * call-seq: 01268 * Thread.pass -> nil 01269 * 01270 * Give the thread scheduler a hint to pass execution to another thread. 01271 * A running thread may or may not switch, it depends on OS and processor. 01272 */ 01273 01274 static VALUE 01275 thread_s_pass(VALUE klass) 01276 { 01277 rb_thread_schedule(); 01278 return Qnil; 01279 } 01280 01281 /* 01282 * 01283 */ 01284 01285 static void 01286 rb_threadptr_execute_interrupts_common(rb_thread_t *th) 01287 { 01288 rb_atomic_t interrupt; 01289 01290 if (th->raised_flag) return; 01291 01292 while ((interrupt = ATOMIC_EXCHANGE(th->interrupt_flag, 0)) != 0) { 01293 enum rb_thread_status status = th->status; 01294 int timer_interrupt = interrupt & 0x01; 01295 int finalizer_interrupt = interrupt & 0x04; 01296 int sig; 01297 01298 th->status = THREAD_RUNNABLE; 01299 01300 /* signal handling */ 01301 if (th == th->vm->main_thread) { 01302 while ((sig = rb_get_next_signal()) != 0) { 01303 rb_signal_exec(th, sig); 01304 } 01305 } 01306 01307 /* exception from another thread */ 01308 if (th->thrown_errinfo) { 01309 VALUE err = th->thrown_errinfo; 01310 th->thrown_errinfo = 0; 01311 thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err); 01312 01313 if (err == eKillSignal || err == eTerminateSignal) { 01314 th->errinfo = INT2FIX(TAG_FATAL); 01315 TH_JUMP_TAG(th, TAG_FATAL); 01316 } 01317 else { 01318 rb_exc_raise(err); 01319 } 01320 } 01321 th->status = status; 01322 01323 if (finalizer_interrupt) { 01324 rb_gc_finalize_deferred(); 01325 } 01326 01327 if (timer_interrupt) { 01328 unsigned long limits_us = 250 * 1000; 01329 01330 if (th->priority > 0) 01331 limits_us <<= th->priority; 01332 else 01333 limits_us >>= -th->priority; 01334 01335 if (status == THREAD_RUNNABLE) 01336 th->running_time_us += TIME_QUANTUM_USEC; 01337 01338 EXEC_EVENT_HOOK(th, RUBY_EVENT_SWITCH, th->cfp->self, 0, 0); 01339 01340 rb_thread_schedule_limits(limits_us); 01341 } 01342 } 01343 } 01344 01345 void 01346 rb_threadptr_execute_interrupts(rb_thread_t *th) 01347 { 01348 rb_threadptr_execute_interrupts_common(th); 01349 } 01350 01351 void 01352 rb_thread_execute_interrupts(VALUE thval) 01353 { 01354 rb_thread_t *th; 01355 GetThreadPtr(thval, th); 01356 rb_threadptr_execute_interrupts_common(th); 01357 } 01358 01359 void 01360 rb_gc_mark_threads(void) 01361 { 01362 rb_bug("deprecated function rb_gc_mark_threads is called"); 01363 } 01364 01365 /*****************************************************/ 01366 01367 static void 01368 rb_threadptr_ready(rb_thread_t *th) 01369 { 01370 rb_threadptr_interrupt(th); 01371 } 01372 01373 static VALUE 01374 rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv) 01375 { 01376 VALUE exc; 01377 01378 again: 01379 if (rb_threadptr_dead(th)) { 01380 return Qnil; 01381 } 01382 01383 if (th->thrown_errinfo != 0 || th->raised_flag) { 01384 rb_thread_schedule(); 01385 goto again; 01386 } 01387 01388 exc = rb_make_exception(argc, argv); 01389 th->thrown_errinfo = exc; 01390 rb_threadptr_ready(th); 01391 return Qnil; 01392 } 01393 01394 void 01395 rb_threadptr_signal_raise(rb_thread_t *th, int sig) 01396 { 01397 VALUE argv[2]; 01398 01399 argv[0] = rb_eSignal; 01400 argv[1] = INT2FIX(sig); 01401 rb_threadptr_raise(th->vm->main_thread, 2, argv); 01402 } 01403 01404 void 01405 rb_threadptr_signal_exit(rb_thread_t *th) 01406 { 01407 VALUE argv[2]; 01408 01409 argv[0] = rb_eSystemExit; 01410 argv[1] = rb_str_new2("exit"); 01411 rb_threadptr_raise(th->vm->main_thread, 2, argv); 01412 } 01413 01414 #if defined(POSIX_SIGNAL) && defined(SIGSEGV) && defined(HAVE_SIGALTSTACK) 01415 #define USE_SIGALTSTACK 01416 #endif 01417 01418 void 01419 ruby_thread_stack_overflow(rb_thread_t *th) 01420 { 01421 th->raised_flag = 0; 01422 #ifdef USE_SIGALTSTACK 01423 rb_exc_raise(sysstack_error); 01424 #else 01425 th->errinfo = sysstack_error; 01426 TH_JUMP_TAG(th, TAG_RAISE); 01427 #endif 01428 } 01429 01430 int 01431 rb_threadptr_set_raised(rb_thread_t *th) 01432 { 01433 if (th->raised_flag & RAISED_EXCEPTION) { 01434 return 1; 01435 } 01436 th->raised_flag |= RAISED_EXCEPTION; 01437 return 0; 01438 } 01439 01440 int 01441 rb_threadptr_reset_raised(rb_thread_t *th) 01442 { 01443 if (!(th->raised_flag & RAISED_EXCEPTION)) { 01444 return 0; 01445 } 01446 th->raised_flag &= ~RAISED_EXCEPTION; 01447 return 1; 01448 } 01449 01450 #define THREAD_IO_WAITING_P(th) ( \ 01451 ((th)->status == THREAD_STOPPED || \ 01452 (th)->status == THREAD_STOPPED_FOREVER) && \ 01453 (th)->blocking_region_buffer && \ 01454 (th)->unblock.func == ubf_select && \ 01455 1) 01456 01457 static int 01458 thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data) 01459 { 01460 int fd = (int)data; 01461 rb_thread_t *th; 01462 GetThreadPtr((VALUE)key, th); 01463 01464 if (THREAD_IO_WAITING_P(th)) { 01465 native_mutex_lock(&th->interrupt_lock); 01466 if (THREAD_IO_WAITING_P(th) && th->waiting_fd == fd) { 01467 th->thrown_errinfo = th->vm->special_exceptions[ruby_error_closed_stream]; 01468 RUBY_VM_SET_INTERRUPT(th); 01469 (th->unblock.func)(th->unblock.arg); 01470 } 01471 native_mutex_unlock(&th->interrupt_lock); 01472 } 01473 return ST_CONTINUE; 01474 } 01475 01476 void 01477 rb_thread_fd_close(int fd) 01478 { 01479 st_foreach(GET_THREAD()->vm->living_threads, thread_fd_close_i, (st_index_t)fd); 01480 } 01481 01482 /* 01483 * call-seq: 01484 * thr.raise 01485 * thr.raise(string) 01486 * thr.raise(exception [, string [, array]]) 01487 * 01488 * Raises an exception (see <code>Kernel::raise</code>) from <i>thr</i>. The 01489 * caller does not have to be <i>thr</i>. 01490 * 01491 * Thread.abort_on_exception = true 01492 * a = Thread.new { sleep(200) } 01493 * a.raise("Gotcha") 01494 * 01495 * <em>produces:</em> 01496 * 01497 * prog.rb:3: Gotcha (RuntimeError) 01498 * from prog.rb:2:in `initialize' 01499 * from prog.rb:2:in `new' 01500 * from prog.rb:2 01501 */ 01502 01503 static VALUE 01504 thread_raise_m(int argc, VALUE *argv, VALUE self) 01505 { 01506 rb_thread_t *th; 01507 GetThreadPtr(self, th); 01508 rb_threadptr_raise(th, argc, argv); 01509 return Qnil; 01510 } 01511 01512 01513 /* 01514 * call-seq: 01515 * thr.exit -> thr or nil 01516 * thr.kill -> thr or nil 01517 * thr.terminate -> thr or nil 01518 * 01519 * Terminates <i>thr</i> and schedules another thread to be run. If this thread 01520 * is already marked to be killed, <code>exit</code> returns the 01521 * <code>Thread</code>. If this is the main thread, or the last thread, exits 01522 * the process. 01523 */ 01524 01525 VALUE 01526 rb_thread_kill(VALUE thread) 01527 { 01528 rb_thread_t *th; 01529 01530 GetThreadPtr(thread, th); 01531 01532 if (th != GET_THREAD() && th->safe_level < 4) { 01533 rb_secure(4); 01534 } 01535 if (th->status == THREAD_TO_KILL || th->status == THREAD_KILLED) { 01536 return thread; 01537 } 01538 if (th == th->vm->main_thread) { 01539 rb_exit(EXIT_SUCCESS); 01540 } 01541 01542 thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id); 01543 01544 rb_threadptr_interrupt(th); 01545 th->thrown_errinfo = eKillSignal; 01546 th->status = THREAD_TO_KILL; 01547 01548 return thread; 01549 } 01550 01551 01552 /* 01553 * call-seq: 01554 * Thread.kill(thread) -> thread 01555 * 01556 * Causes the given <em>thread</em> to exit (see <code>Thread::exit</code>). 01557 * 01558 * count = 0 01559 * a = Thread.new { loop { count += 1 } } 01560 * sleep(0.1) #=> 0 01561 * Thread.kill(a) #=> #<Thread:0x401b3d30 dead> 01562 * count #=> 93947 01563 * a.alive? #=> false 01564 */ 01565 01566 static VALUE 01567 rb_thread_s_kill(VALUE obj, VALUE th) 01568 { 01569 return rb_thread_kill(th); 01570 } 01571 01572 01573 /* 01574 * call-seq: 01575 * Thread.exit -> thread 01576 * 01577 * Terminates the currently running thread and schedules another thread to be 01578 * run. If this thread is already marked to be killed, <code>exit</code> 01579 * returns the <code>Thread</code>. If this is the main thread, or the last 01580 * thread, exit the process. 01581 */ 01582 01583 static VALUE 01584 rb_thread_exit(void) 01585 { 01586 return rb_thread_kill(GET_THREAD()->self); 01587 } 01588 01589 01590 /* 01591 * call-seq: 01592 * thr.wakeup -> thr 01593 * 01594 * Marks <i>thr</i> as eligible for scheduling (it may still remain blocked on 01595 * I/O, however). Does not invoke the scheduler (see <code>Thread#run</code>). 01596 * 01597 * c = Thread.new { Thread.stop; puts "hey!" } 01598 * sleep 0.1 while c.status!='sleep' 01599 * c.wakeup 01600 * c.join 01601 * 01602 * <em>produces:</em> 01603 * 01604 * hey! 01605 */ 01606 01607 VALUE 01608 rb_thread_wakeup(VALUE thread) 01609 { 01610 if (!RTEST(rb_thread_wakeup_alive(thread))) { 01611 rb_raise(rb_eThreadError, "killed thread"); 01612 } 01613 return thread; 01614 } 01615 01616 VALUE 01617 rb_thread_wakeup_alive(VALUE thread) 01618 { 01619 rb_thread_t *th; 01620 GetThreadPtr(thread, th); 01621 01622 if (th->status == THREAD_KILLED) { 01623 return Qnil; 01624 } 01625 rb_threadptr_ready(th); 01626 if (th->status != THREAD_TO_KILL) { 01627 th->status = THREAD_RUNNABLE; 01628 } 01629 return thread; 01630 } 01631 01632 01633 /* 01634 * call-seq: 01635 * thr.run -> thr 01636 * 01637 * Wakes up <i>thr</i>, making it eligible for scheduling. 01638 * 01639 * a = Thread.new { puts "a"; Thread.stop; puts "c" } 01640 * sleep 0.1 while a.status!='sleep' 01641 * puts "Got here" 01642 * a.run 01643 * a.join 01644 * 01645 * <em>produces:</em> 01646 * 01647 * a 01648 * Got here 01649 * c 01650 */ 01651 01652 VALUE 01653 rb_thread_run(VALUE thread) 01654 { 01655 rb_thread_wakeup(thread); 01656 rb_thread_schedule(); 01657 return thread; 01658 } 01659 01660 01661 /* 01662 * call-seq: 01663 * Thread.stop -> nil 01664 * 01665 * Stops execution of the current thread, putting it into a ``sleep'' state, 01666 * and schedules execution of another thread. 01667 * 01668 * a = Thread.new { print "a"; Thread.stop; print "c" } 01669 * sleep 0.1 while a.status!='sleep' 01670 * print "b" 01671 * a.run 01672 * a.join 01673 * 01674 * <em>produces:</em> 01675 * 01676 * abc 01677 */ 01678 01679 VALUE 01680 rb_thread_stop(void) 01681 { 01682 if (rb_thread_alone()) { 01683 rb_raise(rb_eThreadError, 01684 "stopping only thread\n\tnote: use sleep to stop forever"); 01685 } 01686 rb_thread_sleep_deadly(); 01687 return Qnil; 01688 } 01689 01690 static int 01691 thread_list_i(st_data_t key, st_data_t val, void *data) 01692 { 01693 VALUE ary = (VALUE)data; 01694 rb_thread_t *th; 01695 GetThreadPtr((VALUE)key, th); 01696 01697 switch (th->status) { 01698 case THREAD_RUNNABLE: 01699 case THREAD_STOPPED: 01700 case THREAD_STOPPED_FOREVER: 01701 case THREAD_TO_KILL: 01702 rb_ary_push(ary, th->self); 01703 default: 01704 break; 01705 } 01706 return ST_CONTINUE; 01707 } 01708 01709 /********************************************************************/ 01710 01711 /* 01712 * call-seq: 01713 * Thread.list -> array 01714 * 01715 * Returns an array of <code>Thread</code> objects for all threads that are 01716 * either runnable or stopped. 01717 * 01718 * Thread.new { sleep(200) } 01719 * Thread.new { 1000000.times {|i| i*i } } 01720 * Thread.new { Thread.stop } 01721 * Thread.list.each {|t| p t} 01722 * 01723 * <em>produces:</em> 01724 * 01725 * #<Thread:0x401b3e84 sleep> 01726 * #<Thread:0x401b3f38 run> 01727 * #<Thread:0x401b3fb0 sleep> 01728 * #<Thread:0x401bdf4c run> 01729 */ 01730 01731 VALUE 01732 rb_thread_list(void) 01733 { 01734 VALUE ary = rb_ary_new(); 01735 st_foreach(GET_THREAD()->vm->living_threads, thread_list_i, ary); 01736 return ary; 01737 } 01738 01739 VALUE 01740 rb_thread_current(void) 01741 { 01742 return GET_THREAD()->self; 01743 } 01744 01745 /* 01746 * call-seq: 01747 * Thread.current -> thread 01748 * 01749 * Returns the currently executing thread. 01750 * 01751 * Thread.current #=> #<Thread:0x401bdf4c run> 01752 */ 01753 01754 static VALUE 01755 thread_s_current(VALUE klass) 01756 { 01757 return rb_thread_current(); 01758 } 01759 01760 VALUE 01761 rb_thread_main(void) 01762 { 01763 return GET_THREAD()->vm->main_thread->self; 01764 } 01765 01766 /* 01767 * call-seq: 01768 * Thread.main -> thread 01769 * 01770 * Returns the main thread. 01771 */ 01772 01773 static VALUE 01774 rb_thread_s_main(VALUE klass) 01775 { 01776 return rb_thread_main(); 01777 } 01778 01779 01780 /* 01781 * call-seq: 01782 * Thread.abort_on_exception -> true or false 01783 * 01784 * Returns the status of the global ``abort on exception'' condition. The 01785 * default is <code>false</code>. When set to <code>true</code>, or if the 01786 * global <code>$DEBUG</code> flag is <code>true</code> (perhaps because the 01787 * command line option <code>-d</code> was specified) all threads will abort 01788 * (the process will <code>exit(0)</code>) if an exception is raised in any 01789 * thread. See also <code>Thread::abort_on_exception=</code>. 01790 */ 01791 01792 static VALUE 01793 rb_thread_s_abort_exc(void) 01794 { 01795 return GET_THREAD()->vm->thread_abort_on_exception ? Qtrue : Qfalse; 01796 } 01797 01798 01799 /* 01800 * call-seq: 01801 * Thread.abort_on_exception= boolean -> true or false 01802 * 01803 * When set to <code>true</code>, all threads will abort if an exception is 01804 * raised. Returns the new state. 01805 * 01806 * Thread.abort_on_exception = true 01807 * t1 = Thread.new do 01808 * puts "In new thread" 01809 * raise "Exception from thread" 01810 * end 01811 * sleep(1) 01812 * puts "not reached" 01813 * 01814 * <em>produces:</em> 01815 * 01816 * In new thread 01817 * prog.rb:4: Exception from thread (RuntimeError) 01818 * from prog.rb:2:in `initialize' 01819 * from prog.rb:2:in `new' 01820 * from prog.rb:2 01821 */ 01822 01823 static VALUE 01824 rb_thread_s_abort_exc_set(VALUE self, VALUE val) 01825 { 01826 rb_secure(4); 01827 GET_THREAD()->vm->thread_abort_on_exception = RTEST(val); 01828 return val; 01829 } 01830 01831 01832 /* 01833 * call-seq: 01834 * thr.abort_on_exception -> true or false 01835 * 01836 * Returns the status of the thread-local ``abort on exception'' condition for 01837 * <i>thr</i>. The default is <code>false</code>. See also 01838 * <code>Thread::abort_on_exception=</code>. 01839 */ 01840 01841 static VALUE 01842 rb_thread_abort_exc(VALUE thread) 01843 { 01844 rb_thread_t *th; 01845 GetThreadPtr(thread, th); 01846 return th->abort_on_exception ? Qtrue : Qfalse; 01847 } 01848 01849 01850 /* 01851 * call-seq: 01852 * thr.abort_on_exception= boolean -> true or false 01853 * 01854 * When set to <code>true</code>, causes all threads (including the main 01855 * program) to abort if an exception is raised in <i>thr</i>. The process will 01856 * effectively <code>exit(0)</code>. 01857 */ 01858 01859 static VALUE 01860 rb_thread_abort_exc_set(VALUE thread, VALUE val) 01861 { 01862 rb_thread_t *th; 01863 rb_secure(4); 01864 01865 GetThreadPtr(thread, th); 01866 th->abort_on_exception = RTEST(val); 01867 return val; 01868 } 01869 01870 01871 /* 01872 * call-seq: 01873 * thr.group -> thgrp or nil 01874 * 01875 * Returns the <code>ThreadGroup</code> which contains <i>thr</i>, or nil if 01876 * the thread is not a member of any group. 01877 * 01878 * Thread.main.group #=> #<ThreadGroup:0x4029d914> 01879 */ 01880 01881 VALUE 01882 rb_thread_group(VALUE thread) 01883 { 01884 rb_thread_t *th; 01885 VALUE group; 01886 GetThreadPtr(thread, th); 01887 group = th->thgroup; 01888 01889 if (!group) { 01890 group = Qnil; 01891 } 01892 return group; 01893 } 01894 01895 static const char * 01896 thread_status_name(enum rb_thread_status status) 01897 { 01898 switch (status) { 01899 case THREAD_RUNNABLE: 01900 return "run"; 01901 case THREAD_STOPPED: 01902 case THREAD_STOPPED_FOREVER: 01903 return "sleep"; 01904 case THREAD_TO_KILL: 01905 return "aborting"; 01906 case THREAD_KILLED: 01907 return "dead"; 01908 default: 01909 return "unknown"; 01910 } 01911 } 01912 01913 static int 01914 rb_threadptr_dead(rb_thread_t *th) 01915 { 01916 return th->status == THREAD_KILLED; 01917 } 01918 01919 01920 /* 01921 * call-seq: 01922 * thr.status -> string, false or nil 01923 * 01924 * Returns the status of <i>thr</i>: ``<code>sleep</code>'' if <i>thr</i> is 01925 * sleeping or waiting on I/O, ``<code>run</code>'' if <i>thr</i> is executing, 01926 * ``<code>aborting</code>'' if <i>thr</i> is aborting, <code>false</code> if 01927 * <i>thr</i> terminated normally, and <code>nil</code> if <i>thr</i> 01928 * terminated with an exception. 01929 * 01930 * a = Thread.new { raise("die now") } 01931 * b = Thread.new { Thread.stop } 01932 * c = Thread.new { Thread.exit } 01933 * d = Thread.new { sleep } 01934 * d.kill #=> #<Thread:0x401b3678 aborting> 01935 * a.status #=> nil 01936 * b.status #=> "sleep" 01937 * c.status #=> false 01938 * d.status #=> "aborting" 01939 * Thread.current.status #=> "run" 01940 */ 01941 01942 static VALUE 01943 rb_thread_status(VALUE thread) 01944 { 01945 rb_thread_t *th; 01946 GetThreadPtr(thread, th); 01947 01948 if (rb_threadptr_dead(th)) { 01949 if (!NIL_P(th->errinfo) && !FIXNUM_P(th->errinfo) 01950 /* TODO */ ) { 01951 return Qnil; 01952 } 01953 return Qfalse; 01954 } 01955 return rb_str_new2(thread_status_name(th->status)); 01956 } 01957 01958 01959 /* 01960 * call-seq: 01961 * thr.alive? -> true or false 01962 * 01963 * Returns <code>true</code> if <i>thr</i> is running or sleeping. 01964 * 01965 * thr = Thread.new { } 01966 * thr.join #=> #<Thread:0x401b3fb0 dead> 01967 * Thread.current.alive? #=> true 01968 * thr.alive? #=> false 01969 */ 01970 01971 static VALUE 01972 rb_thread_alive_p(VALUE thread) 01973 { 01974 rb_thread_t *th; 01975 GetThreadPtr(thread, th); 01976 01977 if (rb_threadptr_dead(th)) 01978 return Qfalse; 01979 return Qtrue; 01980 } 01981 01982 /* 01983 * call-seq: 01984 * thr.stop? -> true or false 01985 * 01986 * Returns <code>true</code> if <i>thr</i> is dead or sleeping. 01987 * 01988 * a = Thread.new { Thread.stop } 01989 * b = Thread.current 01990 * a.stop? #=> true 01991 * b.stop? #=> false 01992 */ 01993 01994 static VALUE 01995 rb_thread_stop_p(VALUE thread) 01996 { 01997 rb_thread_t *th; 01998 GetThreadPtr(thread, th); 01999 02000 if (rb_threadptr_dead(th)) 02001 return Qtrue; 02002 if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER) 02003 return Qtrue; 02004 return Qfalse; 02005 } 02006 02007 /* 02008 * call-seq: 02009 * thr.safe_level -> integer 02010 * 02011 * Returns the safe level in effect for <i>thr</i>. Setting thread-local safe 02012 * levels can help when implementing sandboxes which run insecure code. 02013 * 02014 * thr = Thread.new { $SAFE = 3; sleep } 02015 * Thread.current.safe_level #=> 0 02016 * thr.safe_level #=> 3 02017 */ 02018 02019 static VALUE 02020 rb_thread_safe_level(VALUE thread) 02021 { 02022 rb_thread_t *th; 02023 GetThreadPtr(thread, th); 02024 02025 return INT2NUM(th->safe_level); 02026 } 02027 02028 /* 02029 * call-seq: 02030 * thr.inspect -> string 02031 * 02032 * Dump the name, id, and status of _thr_ to a string. 02033 */ 02034 02035 static VALUE 02036 rb_thread_inspect(VALUE thread) 02037 { 02038 const char *cname = rb_obj_classname(thread); 02039 rb_thread_t *th; 02040 const char *status; 02041 VALUE str; 02042 02043 GetThreadPtr(thread, th); 02044 status = thread_status_name(th->status); 02045 str = rb_sprintf("#<%s:%p %s>", cname, (void *)thread, status); 02046 OBJ_INFECT(str, thread); 02047 02048 return str; 02049 } 02050 02051 VALUE 02052 rb_thread_local_aref(VALUE thread, ID id) 02053 { 02054 rb_thread_t *th; 02055 st_data_t val; 02056 02057 GetThreadPtr(thread, th); 02058 if (rb_safe_level() >= 4 && th != GET_THREAD()) { 02059 rb_raise(rb_eSecurityError, "Insecure: thread locals"); 02060 } 02061 if (!th->local_storage) { 02062 return Qnil; 02063 } 02064 if (st_lookup(th->local_storage, id, &val)) { 02065 return (VALUE)val; 02066 } 02067 return Qnil; 02068 } 02069 02070 /* 02071 * call-seq: 02072 * thr[sym] -> obj or nil 02073 * 02074 * Attribute Reference---Returns the value of a thread-local variable, using 02075 * either a symbol or a string name. If the specified variable does not exist, 02076 * returns <code>nil</code>. 02077 * 02078 * [ 02079 * Thread.new { Thread.current["name"] = "A" }, 02080 * Thread.new { Thread.current[:name] = "B" }, 02081 * Thread.new { Thread.current["name"] = "C" } 02082 * ].each do |th| 02083 * th.join 02084 * puts "#{th.inspect}: #{th[:name]}" 02085 * end 02086 * 02087 * <em>produces:</em> 02088 * 02089 * #<Thread:0x00000002a54220 dead>: A 02090 * #<Thread:0x00000002a541a8 dead>: B 02091 * #<Thread:0x00000002a54130 dead>: C 02092 */ 02093 02094 static VALUE 02095 rb_thread_aref(VALUE thread, VALUE id) 02096 { 02097 return rb_thread_local_aref(thread, rb_to_id(id)); 02098 } 02099 02100 VALUE 02101 rb_thread_local_aset(VALUE thread, ID id, VALUE val) 02102 { 02103 rb_thread_t *th; 02104 GetThreadPtr(thread, th); 02105 02106 if (rb_safe_level() >= 4 && th != GET_THREAD()) { 02107 rb_raise(rb_eSecurityError, "Insecure: can't modify thread locals"); 02108 } 02109 if (OBJ_FROZEN(thread)) { 02110 rb_error_frozen("thread locals"); 02111 } 02112 if (!th->local_storage) { 02113 th->local_storage = st_init_numtable(); 02114 } 02115 if (NIL_P(val)) { 02116 st_delete_wrap(th->local_storage, id); 02117 return Qnil; 02118 } 02119 st_insert(th->local_storage, id, val); 02120 return val; 02121 } 02122 02123 /* 02124 * call-seq: 02125 * thr[sym] = obj -> obj 02126 * 02127 * Attribute Assignment---Sets or creates the value of a thread-local variable, 02128 * using either a symbol or a string. See also <code>Thread#[]</code>. 02129 */ 02130 02131 static VALUE 02132 rb_thread_aset(VALUE self, VALUE id, VALUE val) 02133 { 02134 return rb_thread_local_aset(self, rb_to_id(id), val); 02135 } 02136 02137 /* 02138 * call-seq: 02139 * thr.key?(sym) -> true or false 02140 * 02141 * Returns <code>true</code> if the given string (or symbol) exists as a 02142 * thread-local variable. 02143 * 02144 * me = Thread.current 02145 * me[:oliver] = "a" 02146 * me.key?(:oliver) #=> true 02147 * me.key?(:stanley) #=> false 02148 */ 02149 02150 static VALUE 02151 rb_thread_key_p(VALUE self, VALUE key) 02152 { 02153 rb_thread_t *th; 02154 ID id = rb_to_id(key); 02155 02156 GetThreadPtr(self, th); 02157 02158 if (!th->local_storage) { 02159 return Qfalse; 02160 } 02161 if (st_lookup(th->local_storage, id, 0)) { 02162 return Qtrue; 02163 } 02164 return Qfalse; 02165 } 02166 02167 static int 02168 thread_keys_i(ID key, VALUE value, VALUE ary) 02169 { 02170 rb_ary_push(ary, ID2SYM(key)); 02171 return ST_CONTINUE; 02172 } 02173 02174 static int 02175 vm_living_thread_num(rb_vm_t *vm) 02176 { 02177 return (int)vm->living_threads->num_entries; 02178 } 02179 02180 int 02181 rb_thread_alone(void) 02182 { 02183 int num = 1; 02184 if (GET_THREAD()->vm->living_threads) { 02185 num = vm_living_thread_num(GET_THREAD()->vm); 02186 thread_debug("rb_thread_alone: %d\n", num); 02187 } 02188 return num == 1; 02189 } 02190 02191 /* 02192 * call-seq: 02193 * thr.keys -> array 02194 * 02195 * Returns an an array of the names of the thread-local variables (as Symbols). 02196 * 02197 * thr = Thread.new do 02198 * Thread.current[:cat] = 'meow' 02199 * Thread.current["dog"] = 'woof' 02200 * end 02201 * thr.join #=> #<Thread:0x401b3f10 dead> 02202 * thr.keys #=> [:dog, :cat] 02203 */ 02204 02205 static VALUE 02206 rb_thread_keys(VALUE self) 02207 { 02208 rb_thread_t *th; 02209 VALUE ary = rb_ary_new(); 02210 GetThreadPtr(self, th); 02211 02212 if (th->local_storage) { 02213 st_foreach(th->local_storage, thread_keys_i, ary); 02214 } 02215 return ary; 02216 } 02217 02218 /* 02219 * call-seq: 02220 * thr.priority -> integer 02221 * 02222 * Returns the priority of <i>thr</i>. Default is inherited from the 02223 * current thread which creating the new thread, or zero for the 02224 * initial main thread; higher-priority thread will run more frequently 02225 * than lower-priority threads (but lower-priority threads can also run). 02226 * 02227 * This is just hint for Ruby thread scheduler. It may be ignored on some 02228 * platform. 02229 * 02230 * Thread.current.priority #=> 0 02231 */ 02232 02233 static VALUE 02234 rb_thread_priority(VALUE thread) 02235 { 02236 rb_thread_t *th; 02237 GetThreadPtr(thread, th); 02238 return INT2NUM(th->priority); 02239 } 02240 02241 02242 /* 02243 * call-seq: 02244 * thr.priority= integer -> thr 02245 * 02246 * Sets the priority of <i>thr</i> to <i>integer</i>. Higher-priority threads 02247 * will run more frequently than lower-priority threads (but lower-priority 02248 * threads can also run). 02249 * 02250 * This is just hint for Ruby thread scheduler. It may be ignored on some 02251 * platform. 02252 * 02253 * count1 = count2 = 0 02254 * a = Thread.new do 02255 * loop { count1 += 1 } 02256 * end 02257 * a.priority = -1 02258 * 02259 * b = Thread.new do 02260 * loop { count2 += 1 } 02261 * end 02262 * b.priority = -2 02263 * sleep 1 #=> 1 02264 * count1 #=> 622504 02265 * count2 #=> 5832 02266 */ 02267 02268 static VALUE 02269 rb_thread_priority_set(VALUE thread, VALUE prio) 02270 { 02271 rb_thread_t *th; 02272 int priority; 02273 GetThreadPtr(thread, th); 02274 02275 rb_secure(4); 02276 02277 #if USE_NATIVE_THREAD_PRIORITY 02278 th->priority = NUM2INT(prio); 02279 native_thread_apply_priority(th); 02280 #else 02281 priority = NUM2INT(prio); 02282 if (priority > RUBY_THREAD_PRIORITY_MAX) { 02283 priority = RUBY_THREAD_PRIORITY_MAX; 02284 } 02285 else if (priority < RUBY_THREAD_PRIORITY_MIN) { 02286 priority = RUBY_THREAD_PRIORITY_MIN; 02287 } 02288 th->priority = priority; 02289 #endif 02290 return INT2NUM(th->priority); 02291 } 02292 02293 /* for IO */ 02294 02295 #if defined(NFDBITS) && defined(HAVE_RB_FD_INIT) 02296 02297 /* 02298 * several Unix platforms support file descriptors bigger than FD_SETSIZE 02299 * in select(2) system call. 02300 * 02301 * - Linux 2.2.12 (?) 02302 * - NetBSD 1.2 (src/sys/kern/sys_generic.c:1.25) 02303 * select(2) documents how to allocate fd_set dynamically. 02304 * http://netbsd.gw.com/cgi-bin/man-cgi?select++NetBSD-4.0 02305 * - FreeBSD 2.2 (src/sys/kern/sys_generic.c:1.19) 02306 * - OpenBSD 2.0 (src/sys/kern/sys_generic.c:1.4) 02307 * select(2) documents how to allocate fd_set dynamically. 02308 * http://www.openbsd.org/cgi-bin/man.cgi?query=select&manpath=OpenBSD+4.4 02309 * - HP-UX documents how to allocate fd_set dynamically. 02310 * http://docs.hp.com/en/B2355-60105/select.2.html 02311 * - Solaris 8 has select_large_fdset 02312 * 02313 * When fd_set is not big enough to hold big file descriptors, 02314 * it should be allocated dynamically. 02315 * Note that this assumes fd_set is structured as bitmap. 02316 * 02317 * rb_fd_init allocates the memory. 02318 * rb_fd_term free the memory. 02319 * rb_fd_set may re-allocates bitmap. 02320 * 02321 * So rb_fd_set doesn't reject file descriptors bigger than FD_SETSIZE. 02322 */ 02323 02324 void 02325 rb_fd_init(rb_fdset_t *fds) 02326 { 02327 fds->maxfd = 0; 02328 fds->fdset = ALLOC(fd_set); 02329 FD_ZERO(fds->fdset); 02330 } 02331 02332 void 02333 rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) 02334 { 02335 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); 02336 02337 if (size < sizeof(fd_set)) 02338 size = sizeof(fd_set); 02339 dst->maxfd = src->maxfd; 02340 dst->fdset = xmalloc(size); 02341 memcpy(dst->fdset, src->fdset, size); 02342 } 02343 02344 void 02345 rb_fd_term(rb_fdset_t *fds) 02346 { 02347 if (fds->fdset) xfree(fds->fdset); 02348 fds->maxfd = 0; 02349 fds->fdset = 0; 02350 } 02351 02352 void 02353 rb_fd_zero(rb_fdset_t *fds) 02354 { 02355 if (fds->fdset) 02356 MEMZERO(fds->fdset, fd_mask, howmany(fds->maxfd, NFDBITS)); 02357 } 02358 02359 static void 02360 rb_fd_resize(int n, rb_fdset_t *fds) 02361 { 02362 size_t m = howmany(n + 1, NFDBITS) * sizeof(fd_mask); 02363 size_t o = howmany(fds->maxfd, NFDBITS) * sizeof(fd_mask); 02364 02365 if (m < sizeof(fd_set)) m = sizeof(fd_set); 02366 if (o < sizeof(fd_set)) o = sizeof(fd_set); 02367 02368 if (m > o) { 02369 fds->fdset = xrealloc(fds->fdset, m); 02370 memset((char *)fds->fdset + o, 0, m - o); 02371 } 02372 if (n >= fds->maxfd) fds->maxfd = n + 1; 02373 } 02374 02375 void 02376 rb_fd_set(int n, rb_fdset_t *fds) 02377 { 02378 rb_fd_resize(n, fds); 02379 FD_SET(n, fds->fdset); 02380 } 02381 02382 void 02383 rb_fd_clr(int n, rb_fdset_t *fds) 02384 { 02385 if (n >= fds->maxfd) return; 02386 FD_CLR(n, fds->fdset); 02387 } 02388 02389 int 02390 rb_fd_isset(int n, const rb_fdset_t *fds) 02391 { 02392 if (n >= fds->maxfd) return 0; 02393 return FD_ISSET(n, fds->fdset) != 0; /* "!= 0" avoids FreeBSD PR 91421 */ 02394 } 02395 02396 void 02397 rb_fd_copy(rb_fdset_t *dst, const fd_set *src, int max) 02398 { 02399 size_t size = howmany(max, NFDBITS) * sizeof(fd_mask); 02400 02401 if (size < sizeof(fd_set)) size = sizeof(fd_set); 02402 dst->maxfd = max; 02403 dst->fdset = xrealloc(dst->fdset, size); 02404 memcpy(dst->fdset, src, size); 02405 } 02406 02407 static void 02408 rb_fd_rcopy(fd_set *dst, rb_fdset_t *src) 02409 { 02410 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); 02411 02412 if (size > sizeof(fd_set)) { 02413 rb_raise(rb_eArgError, "too large fdsets"); 02414 } 02415 memcpy(dst, rb_fd_ptr(src), sizeof(fd_set)); 02416 } 02417 02418 void 02419 rb_fd_dup(rb_fdset_t *dst, const rb_fdset_t *src) 02420 { 02421 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); 02422 02423 if (size < sizeof(fd_set)) 02424 size = sizeof(fd_set); 02425 dst->maxfd = src->maxfd; 02426 dst->fdset = xrealloc(dst->fdset, size); 02427 memcpy(dst->fdset, src->fdset, size); 02428 } 02429 02430 int 02431 rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *exceptfds, struct timeval *timeout) 02432 { 02433 fd_set *r = NULL, *w = NULL, *e = NULL; 02434 if (readfds) { 02435 rb_fd_resize(n - 1, readfds); 02436 r = rb_fd_ptr(readfds); 02437 } 02438 if (writefds) { 02439 rb_fd_resize(n - 1, writefds); 02440 w = rb_fd_ptr(writefds); 02441 } 02442 if (exceptfds) { 02443 rb_fd_resize(n - 1, exceptfds); 02444 e = rb_fd_ptr(exceptfds); 02445 } 02446 return select(n, r, w, e, timeout); 02447 } 02448 02449 #undef FD_ZERO 02450 #undef FD_SET 02451 #undef FD_CLR 02452 #undef FD_ISSET 02453 02454 #define FD_ZERO(f) rb_fd_zero(f) 02455 #define FD_SET(i, f) rb_fd_set((i), (f)) 02456 #define FD_CLR(i, f) rb_fd_clr((i), (f)) 02457 #define FD_ISSET(i, f) rb_fd_isset((i), (f)) 02458 02459 #elif defined(_WIN32) 02460 02461 void 02462 rb_fd_init(rb_fdset_t *set) 02463 { 02464 set->capa = FD_SETSIZE; 02465 set->fdset = ALLOC(fd_set); 02466 FD_ZERO(set->fdset); 02467 } 02468 02469 void 02470 rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) 02471 { 02472 rb_fd_init(dst); 02473 rb_fd_dup(dst, src); 02474 } 02475 02476 static void 02477 rb_fd_rcopy(fd_set *dst, rb_fdset_t *src) 02478 { 02479 int max = rb_fd_max(src); 02480 02481 /* we assume src is the result of select() with dst, so dst should be 02482 * larger or equal than src. */ 02483 if (max > FD_SETSIZE || max > dst->fd_count) { 02484 rb_raise(rb_eArgError, "too large fdsets"); 02485 } 02486 02487 memcpy(dst->fd_array, src->fdset->fd_array, max); 02488 dst->fd_count = max; 02489 } 02490 02491 void 02492 rb_fd_term(rb_fdset_t *set) 02493 { 02494 xfree(set->fdset); 02495 set->fdset = NULL; 02496 set->capa = 0; 02497 } 02498 02499 void 02500 rb_fd_set(int fd, rb_fdset_t *set) 02501 { 02502 unsigned int i; 02503 SOCKET s = rb_w32_get_osfhandle(fd); 02504 02505 for (i = 0; i < set->fdset->fd_count; i++) { 02506 if (set->fdset->fd_array[i] == s) { 02507 return; 02508 } 02509 } 02510 if (set->fdset->fd_count >= (unsigned)set->capa) { 02511 set->capa = (set->fdset->fd_count / FD_SETSIZE + 1) * FD_SETSIZE; 02512 set->fdset = xrealloc(set->fdset, sizeof(unsigned int) + sizeof(SOCKET) * set->capa); 02513 } 02514 set->fdset->fd_array[set->fdset->fd_count++] = s; 02515 } 02516 02517 #undef FD_ZERO 02518 #undef FD_SET 02519 #undef FD_CLR 02520 #undef FD_ISSET 02521 02522 #define FD_ZERO(f) rb_fd_zero(f) 02523 #define FD_SET(i, f) rb_fd_set((i), (f)) 02524 #define FD_CLR(i, f) rb_fd_clr((i), (f)) 02525 #define FD_ISSET(i, f) rb_fd_isset((i), (f)) 02526 02527 #else 02528 #define rb_fd_rcopy(d, s) (*(d) = *(s)) 02529 #endif 02530 02531 #if defined(__CYGWIN__) 02532 static long 02533 cmp_tv(const struct timeval *a, const struct timeval *b) 02534 { 02535 long d = (a->tv_sec - b->tv_sec); 02536 return (d != 0) ? d : (a->tv_usec - b->tv_usec); 02537 } 02538 02539 static int 02540 subtract_tv(struct timeval *rest, const struct timeval *wait) 02541 { 02542 if (rest->tv_sec < wait->tv_sec) { 02543 return 0; 02544 } 02545 while (rest->tv_usec < wait->tv_usec) { 02546 if (rest->tv_sec <= wait->tv_sec) { 02547 return 0; 02548 } 02549 rest->tv_sec -= 1; 02550 rest->tv_usec += 1000 * 1000; 02551 } 02552 rest->tv_sec -= wait->tv_sec; 02553 rest->tv_usec -= wait->tv_usec; 02554 return rest->tv_sec != 0 || rest->tv_usec != 0; 02555 } 02556 #endif 02557 02558 static int 02559 do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except, 02560 struct timeval *timeout) 02561 { 02562 int result, lerrno; 02563 rb_fdset_t UNINITIALIZED_VAR(orig_read); 02564 rb_fdset_t UNINITIALIZED_VAR(orig_write); 02565 rb_fdset_t UNINITIALIZED_VAR(orig_except); 02566 double limit = 0; 02567 struct timeval wait_rest; 02568 # if defined(__CYGWIN__) 02569 struct timeval start_time; 02570 # endif 02571 02572 if (timeout) { 02573 # if defined(__CYGWIN__) 02574 gettimeofday(&start_time, NULL); 02575 limit = (double)start_time.tv_sec + (double)start_time.tv_usec*1e-6; 02576 # else 02577 limit = timeofday(); 02578 # endif 02579 limit += (double)timeout->tv_sec+(double)timeout->tv_usec*1e-6; 02580 wait_rest = *timeout; 02581 timeout = &wait_rest; 02582 } 02583 02584 if (read) 02585 rb_fd_init_copy(&orig_read, read); 02586 if (write) 02587 rb_fd_init_copy(&orig_write, write); 02588 if (except) 02589 rb_fd_init_copy(&orig_except, except); 02590 02591 retry: 02592 lerrno = 0; 02593 02594 #if defined(__CYGWIN__) 02595 { 02596 int finish = 0; 02597 /* polling duration: 100ms */ 02598 struct timeval wait_100ms, *wait; 02599 wait_100ms.tv_sec = 0; 02600 wait_100ms.tv_usec = 100 * 1000; /* 100 ms */ 02601 02602 do { 02603 wait = (timeout == 0 || cmp_tv(&wait_100ms, timeout) < 0) ? &wait_100ms : timeout; 02604 BLOCKING_REGION({ 02605 do { 02606 result = rb_fd_select(n, read, write, except, wait); 02607 if (result < 0) lerrno = errno; 02608 if (result != 0) break; 02609 02610 if (read) 02611 rb_fd_dup(read, &orig_read); 02612 if (write) 02613 rb_fd_dup(write, &orig_write); 02614 if (except) 02615 rb_fd_dup(except, &orig_except); 02616 if (timeout) { 02617 struct timeval elapsed; 02618 gettimeofday(&elapsed, NULL); 02619 subtract_tv(&elapsed, &start_time); 02620 gettimeofday(&start_time, NULL); 02621 if (!subtract_tv(timeout, &elapsed)) { 02622 finish = 1; 02623 break; 02624 } 02625 if (cmp_tv(&wait_100ms, timeout) > 0) wait = timeout; 02626 } 02627 } while (__th->interrupt_flag == 0); 02628 }, 0, 0); 02629 } while (result == 0 && !finish); 02630 } 02631 #elif defined(_WIN32) 02632 { 02633 rb_thread_t *th = GET_THREAD(); 02634 BLOCKING_REGION({ 02635 result = native_fd_select(n, read, write, except, timeout, th); 02636 if (result < 0) lerrno = errno; 02637 }, ubf_select, th); 02638 } 02639 #else 02640 BLOCKING_REGION({ 02641 result = rb_fd_select(n, read, write, except, timeout); 02642 if (result < 0) lerrno = errno; 02643 }, ubf_select, GET_THREAD()); 02644 #endif 02645 02646 errno = lerrno; 02647 02648 if (result < 0) { 02649 switch (errno) { 02650 case EINTR: 02651 #ifdef ERESTART 02652 case ERESTART: 02653 #endif 02654 if (read) 02655 rb_fd_dup(read, &orig_read); 02656 if (write) 02657 rb_fd_dup(write, &orig_write); 02658 if (except) 02659 rb_fd_dup(except, &orig_except); 02660 02661 if (timeout) { 02662 double d = limit - timeofday(); 02663 02664 wait_rest.tv_sec = (unsigned int)d; 02665 wait_rest.tv_usec = (int)((d-(double)wait_rest.tv_sec)*1e6); 02666 if (wait_rest.tv_sec < 0) wait_rest.tv_sec = 0; 02667 if (wait_rest.tv_usec < 0) wait_rest.tv_usec = 0; 02668 } 02669 02670 goto retry; 02671 default: 02672 break; 02673 } 02674 } 02675 02676 if (read) 02677 rb_fd_term(&orig_read); 02678 if (write) 02679 rb_fd_term(&orig_write); 02680 if (except) 02681 rb_fd_term(&orig_except); 02682 02683 return result; 02684 } 02685 02686 static void 02687 rb_thread_wait_fd_rw(int fd, int read) 02688 { 02689 int result = 0; 02690 int events = read ? RB_WAITFD_IN : RB_WAITFD_OUT; 02691 02692 thread_debug("rb_thread_wait_fd_rw(%d, %s)\n", fd, read ? "read" : "write"); 02693 02694 if (fd < 0) { 02695 rb_raise(rb_eIOError, "closed stream"); 02696 } 02697 if (rb_thread_alone()) return; 02698 while (result <= 0) { 02699 result = rb_wait_for_single_fd(fd, events, NULL); 02700 02701 if (result < 0) { 02702 rb_sys_fail(0); 02703 } 02704 } 02705 02706 thread_debug("rb_thread_wait_fd_rw(%d, %s): done\n", fd, read ? "read" : "write"); 02707 } 02708 02709 void 02710 rb_thread_wait_fd(int fd) 02711 { 02712 rb_thread_wait_fd_rw(fd, 1); 02713 } 02714 02715 int 02716 rb_thread_fd_writable(int fd) 02717 { 02718 rb_thread_wait_fd_rw(fd, 0); 02719 return TRUE; 02720 } 02721 02722 int 02723 rb_thread_select(int max, fd_set * read, fd_set * write, fd_set * except, 02724 struct timeval *timeout) 02725 { 02726 rb_fdset_t fdsets[3]; 02727 rb_fdset_t *rfds = NULL; 02728 rb_fdset_t *wfds = NULL; 02729 rb_fdset_t *efds = NULL; 02730 int retval; 02731 02732 if (read) { 02733 rfds = &fdsets[0]; 02734 rb_fd_init(rfds); 02735 rb_fd_copy(rfds, read, max); 02736 } 02737 if (write) { 02738 wfds = &fdsets[1]; 02739 rb_fd_init(wfds); 02740 rb_fd_copy(wfds, write, max); 02741 } 02742 if (except) { 02743 efds = &fdsets[2]; 02744 rb_fd_init(efds); 02745 rb_fd_copy(efds, except, max); 02746 } 02747 02748 retval = rb_thread_fd_select(max, rfds, wfds, efds, timeout); 02749 02750 if (rfds) { 02751 rb_fd_rcopy(read, rfds); 02752 rb_fd_term(rfds); 02753 } 02754 if (wfds) { 02755 rb_fd_rcopy(write, wfds); 02756 rb_fd_term(wfds); 02757 } 02758 if (efds) { 02759 rb_fd_rcopy(except, efds); 02760 rb_fd_term(efds); 02761 } 02762 02763 return retval; 02764 } 02765 02766 int 02767 rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except, 02768 struct timeval *timeout) 02769 { 02770 if (!read && !write && !except) { 02771 if (!timeout) { 02772 rb_thread_sleep_forever(); 02773 return 0; 02774 } 02775 rb_thread_wait_for(*timeout); 02776 return 0; 02777 } 02778 02779 if (read) { 02780 rb_fd_resize(max - 1, read); 02781 } 02782 if (write) { 02783 rb_fd_resize(max - 1, write); 02784 } 02785 if (except) { 02786 rb_fd_resize(max - 1, except); 02787 } 02788 return do_select(max, read, write, except, timeout); 02789 } 02790 02791 /* 02792 * poll() is supported by many OSes, but so far Linux is the only 02793 * one we know of that supports using poll() in all places select() 02794 * would work. 02795 */ 02796 #if defined(HAVE_POLL) && defined(linux) 02797 # define USE_POLL 02798 #endif 02799 02800 #ifdef USE_POLL 02801 02802 /* The same with linux kernel. TODO: make platform independent definition. */ 02803 #define POLLIN_SET (POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR) 02804 #define POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR) 02805 #define POLLEX_SET (POLLPRI) 02806 02807 #define TIMET_MAX (~(time_t)0 <= 0 ? (time_t)((~(unsigned_time_t)0) >> 1) : (time_t)(~(unsigned_time_t)0)) 02808 #define TIMET_MIN (~(time_t)0 <= 0 ? (time_t)(((unsigned_time_t)1) << (sizeof(time_t) * CHAR_BIT - 1)) : (time_t)0) 02809 02810 #ifndef HAVE_PPOLL 02811 /* TODO: don't ignore sigmask */ 02812 int ppoll(struct pollfd *fds, nfds_t nfds, 02813 const struct timespec *ts, const sigset_t *sigmask) 02814 { 02815 int timeout_ms; 02816 02817 if (ts) { 02818 int tmp, tmp2; 02819 02820 if (ts->tv_sec > TIMET_MAX/1000) 02821 timeout_ms = -1; 02822 else { 02823 tmp = ts->tv_sec * 1000; 02824 tmp2 = ts->tv_nsec / (1000 * 1000); 02825 if (TIMET_MAX - tmp < tmp2) 02826 timeout_ms = -1; 02827 else 02828 timeout_ms = tmp + tmp2; 02829 } 02830 } else 02831 timeout_ms = -1; 02832 02833 return poll(fds, nfds, timeout_ms); 02834 } 02835 #endif 02836 02837 /* 02838 * returns a mask of events 02839 */ 02840 int 02841 rb_wait_for_single_fd(int fd, int events, struct timeval *tv) 02842 { 02843 struct pollfd fds; 02844 int result, lerrno; 02845 double limit = 0; 02846 struct timespec ts; 02847 struct timespec *timeout = NULL; 02848 02849 if (tv) { 02850 ts.tv_sec = tv->tv_sec; 02851 ts.tv_nsec = tv->tv_usec * 1000; 02852 limit = timeofday(); 02853 limit += (double)tv->tv_sec + (double)tv->tv_usec * 1e-6; 02854 timeout = &ts; 02855 } 02856 02857 fds.fd = fd; 02858 fds.events = (short)events; 02859 02860 retry: 02861 lerrno = 0; 02862 BLOCKING_REGION({ 02863 result = ppoll(&fds, 1, timeout, NULL); 02864 if (result < 0) lerrno = errno; 02865 }, ubf_select, GET_THREAD()); 02866 02867 if (result < 0) { 02868 errno = lerrno; 02869 switch (errno) { 02870 case EINTR: 02871 #ifdef ERESTART 02872 case ERESTART: 02873 #endif 02874 if (timeout) { 02875 double d = limit - timeofday(); 02876 02877 ts.tv_sec = (long)d; 02878 ts.tv_nsec = (long)((d - (double)ts.tv_sec) * 1e9); 02879 if (ts.tv_sec < 0) 02880 ts.tv_sec = 0; 02881 if (ts.tv_nsec < 0) 02882 ts.tv_nsec = 0; 02883 } 02884 goto retry; 02885 } 02886 return -1; 02887 } 02888 02889 if (fds.revents & POLLNVAL) { 02890 errno = EBADF; 02891 return -1; 02892 } 02893 02894 /* 02895 * POLLIN, POLLOUT have a different meanings from select(2)'s read/write bit. 02896 * Therefore we need fix it up. 02897 */ 02898 result = 0; 02899 if (fds.revents & POLLIN_SET) 02900 result |= RB_WAITFD_IN; 02901 if (fds.revents & POLLOUT_SET) 02902 result |= RB_WAITFD_OUT; 02903 if (fds.revents & POLLEX_SET) 02904 result |= RB_WAITFD_PRI; 02905 02906 return result; 02907 } 02908 #else /* ! USE_POLL - implement rb_io_poll_fd() using select() */ 02909 static rb_fdset_t *init_set_fd(int fd, rb_fdset_t *fds) 02910 { 02911 rb_fd_init(fds); 02912 rb_fd_set(fd, fds); 02913 02914 return fds; 02915 } 02916 02917 struct select_args { 02918 union { 02919 int fd; 02920 int error; 02921 } as; 02922 rb_fdset_t *read; 02923 rb_fdset_t *write; 02924 rb_fdset_t *except; 02925 struct timeval *tv; 02926 }; 02927 02928 static VALUE 02929 select_single(VALUE ptr) 02930 { 02931 struct select_args *args = (struct select_args *)ptr; 02932 int r; 02933 02934 r = rb_thread_fd_select(args->as.fd + 1, 02935 args->read, args->write, args->except, args->tv); 02936 if (r == -1) 02937 args->as.error = errno; 02938 if (r > 0) { 02939 r = 0; 02940 if (args->read && rb_fd_isset(args->as.fd, args->read)) 02941 r |= RB_WAITFD_IN; 02942 if (args->write && rb_fd_isset(args->as.fd, args->write)) 02943 r |= RB_WAITFD_OUT; 02944 if (args->except && rb_fd_isset(args->as.fd, args->except)) 02945 r |= RB_WAITFD_PRI; 02946 } 02947 return (VALUE)r; 02948 } 02949 02950 static VALUE 02951 select_single_cleanup(VALUE ptr) 02952 { 02953 struct select_args *args = (struct select_args *)ptr; 02954 02955 if (args->read) rb_fd_term(args->read); 02956 if (args->write) rb_fd_term(args->write); 02957 if (args->except) rb_fd_term(args->except); 02958 02959 return (VALUE)-1; 02960 } 02961 02962 int 02963 rb_wait_for_single_fd(int fd, int events, struct timeval *tv) 02964 { 02965 rb_fdset_t rfds, wfds, efds; 02966 struct select_args args; 02967 int r; 02968 VALUE ptr = (VALUE)&args; 02969 02970 args.as.fd = fd; 02971 args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; 02972 args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; 02973 args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL; 02974 args.tv = tv; 02975 02976 r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr); 02977 if (r == -1) 02978 errno = args.as.error; 02979 02980 return r; 02981 } 02982 #endif /* ! USE_POLL */ 02983 02984 /* 02985 * for GC 02986 */ 02987 02988 #ifdef USE_CONSERVATIVE_STACK_END 02989 void 02990 rb_gc_set_stack_end(VALUE **stack_end_p) 02991 { 02992 VALUE stack_end; 02993 *stack_end_p = &stack_end; 02994 } 02995 #endif 02996 02997 void 02998 rb_gc_save_machine_context(rb_thread_t *th) 02999 { 03000 FLUSH_REGISTER_WINDOWS; 03001 #ifdef __ia64 03002 th->machine_register_stack_end = rb_ia64_bsp(); 03003 #endif 03004 setjmp(th->machine_regs); 03005 } 03006 03007 /* 03008 * 03009 */ 03010 03011 void 03012 rb_threadptr_check_signal(rb_thread_t *mth) 03013 { 03014 /* mth must be main_thread */ 03015 if (rb_signal_buff_size() > 0) { 03016 /* wakeup main thread */ 03017 rb_threadptr_interrupt(mth); 03018 } 03019 } 03020 03021 static void 03022 timer_thread_function(void *arg) 03023 { 03024 rb_vm_t *vm = GET_VM(); /* TODO: fix me for Multi-VM */ 03025 03026 /* for time slice */ 03027 RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread); 03028 03029 /* check signal */ 03030 rb_threadptr_check_signal(vm->main_thread); 03031 03032 #if 0 03033 /* prove profiler */ 03034 if (vm->prove_profile.enable) { 03035 rb_thread_t *th = vm->running_thread; 03036 03037 if (vm->during_gc) { 03038 /* GC prove profiling */ 03039 } 03040 } 03041 #endif 03042 } 03043 03044 void 03045 rb_thread_stop_timer_thread(int close_anyway) 03046 { 03047 if (timer_thread_id && native_stop_timer_thread(close_anyway)) { 03048 native_reset_timer_thread(); 03049 } 03050 } 03051 03052 void 03053 rb_thread_reset_timer_thread(void) 03054 { 03055 native_reset_timer_thread(); 03056 } 03057 03058 void 03059 rb_thread_start_timer_thread(void) 03060 { 03061 system_working = 1; 03062 rb_thread_create_timer_thread(); 03063 } 03064 03065 static int 03066 clear_coverage_i(st_data_t key, st_data_t val, st_data_t dummy) 03067 { 03068 int i; 03069 VALUE lines = (VALUE)val; 03070 03071 for (i = 0; i < RARRAY_LEN(lines); i++) { 03072 if (RARRAY_PTR(lines)[i] != Qnil) { 03073 RARRAY_PTR(lines)[i] = INT2FIX(0); 03074 } 03075 } 03076 return ST_CONTINUE; 03077 } 03078 03079 static void 03080 clear_coverage(void) 03081 { 03082 VALUE coverages = rb_get_coverages(); 03083 if (RTEST(coverages)) { 03084 st_foreach(RHASH_TBL(coverages), clear_coverage_i, 0); 03085 } 03086 } 03087 03088 static void 03089 rb_thread_atfork_internal(int (*atfork)(st_data_t, st_data_t, st_data_t)) 03090 { 03091 rb_thread_t *th = GET_THREAD(); 03092 rb_vm_t *vm = th->vm; 03093 VALUE thval = th->self; 03094 vm->main_thread = th; 03095 03096 gvl_atfork(th->vm); 03097 st_foreach(vm->living_threads, atfork, (st_data_t)th); 03098 st_clear(vm->living_threads); 03099 st_insert(vm->living_threads, thval, (st_data_t)th->thread_id); 03100 vm->sleeper = 0; 03101 clear_coverage(); 03102 } 03103 03104 static int 03105 terminate_atfork_i(st_data_t key, st_data_t val, st_data_t current_th) 03106 { 03107 VALUE thval = key; 03108 rb_thread_t *th; 03109 GetThreadPtr(thval, th); 03110 03111 if (th != (rb_thread_t *)current_th) { 03112 if (th->keeping_mutexes) { 03113 rb_mutex_abandon_all(th->keeping_mutexes); 03114 } 03115 th->keeping_mutexes = NULL; 03116 thread_cleanup_func(th, TRUE); 03117 } 03118 return ST_CONTINUE; 03119 } 03120 03121 void 03122 rb_thread_atfork(void) 03123 { 03124 rb_thread_atfork_internal(terminate_atfork_i); 03125 GET_THREAD()->join_list_head = 0; 03126 03127 /* We don't want reproduce CVE-2003-0900. */ 03128 rb_reset_random_seed(); 03129 } 03130 03131 static int 03132 terminate_atfork_before_exec_i(st_data_t key, st_data_t val, st_data_t current_th) 03133 { 03134 VALUE thval = key; 03135 rb_thread_t *th; 03136 GetThreadPtr(thval, th); 03137 03138 if (th != (rb_thread_t *)current_th) { 03139 thread_cleanup_func_before_exec(th); 03140 } 03141 return ST_CONTINUE; 03142 } 03143 03144 void 03145 rb_thread_atfork_before_exec(void) 03146 { 03147 rb_thread_atfork_internal(terminate_atfork_before_exec_i); 03148 } 03149 03150 struct thgroup { 03151 int enclosed; 03152 VALUE group; 03153 }; 03154 03155 static size_t 03156 thgroup_memsize(const void *ptr) 03157 { 03158 return ptr ? sizeof(struct thgroup) : 0; 03159 } 03160 03161 static const rb_data_type_t thgroup_data_type = { 03162 "thgroup", 03163 {NULL, RUBY_TYPED_DEFAULT_FREE, thgroup_memsize,}, 03164 }; 03165 03166 /* 03167 * Document-class: ThreadGroup 03168 * 03169 * <code>ThreadGroup</code> provides a means of keeping track of a number of 03170 * threads as a group. A <code>Thread</code> can belong to only one 03171 * <code>ThreadGroup</code> at a time; adding a thread to a new group will 03172 * remove it from any previous group. 03173 * 03174 * Newly created threads belong to the same group as the thread from which they 03175 * were created. 03176 */ 03177 03178 static VALUE 03179 thgroup_s_alloc(VALUE klass) 03180 { 03181 VALUE group; 03182 struct thgroup *data; 03183 03184 group = TypedData_Make_Struct(klass, struct thgroup, &thgroup_data_type, data); 03185 data->enclosed = 0; 03186 data->group = group; 03187 03188 return group; 03189 } 03190 03191 struct thgroup_list_params { 03192 VALUE ary; 03193 VALUE group; 03194 }; 03195 03196 static int 03197 thgroup_list_i(st_data_t key, st_data_t val, st_data_t data) 03198 { 03199 VALUE thread = (VALUE)key; 03200 VALUE ary = ((struct thgroup_list_params *)data)->ary; 03201 VALUE group = ((struct thgroup_list_params *)data)->group; 03202 rb_thread_t *th; 03203 GetThreadPtr(thread, th); 03204 03205 if (th->thgroup == group) { 03206 rb_ary_push(ary, thread); 03207 } 03208 return ST_CONTINUE; 03209 } 03210 03211 /* 03212 * call-seq: 03213 * thgrp.list -> array 03214 * 03215 * Returns an array of all existing <code>Thread</code> objects that belong to 03216 * this group. 03217 * 03218 * ThreadGroup::Default.list #=> [#<Thread:0x401bdf4c run>] 03219 */ 03220 03221 static VALUE 03222 thgroup_list(VALUE group) 03223 { 03224 VALUE ary = rb_ary_new(); 03225 struct thgroup_list_params param; 03226 03227 param.ary = ary; 03228 param.group = group; 03229 st_foreach(GET_THREAD()->vm->living_threads, thgroup_list_i, (st_data_t) & param); 03230 return ary; 03231 } 03232 03233 03234 /* 03235 * call-seq: 03236 * thgrp.enclose -> thgrp 03237 * 03238 * Prevents threads from being added to or removed from the receiving 03239 * <code>ThreadGroup</code>. New threads can still be started in an enclosed 03240 * <code>ThreadGroup</code>. 03241 * 03242 * ThreadGroup::Default.enclose #=> #<ThreadGroup:0x4029d914> 03243 * thr = Thread::new { Thread.stop } #=> #<Thread:0x402a7210 sleep> 03244 * tg = ThreadGroup::new #=> #<ThreadGroup:0x402752d4> 03245 * tg.add thr 03246 * 03247 * <em>produces:</em> 03248 * 03249 * ThreadError: can't move from the enclosed thread group 03250 */ 03251 03252 static VALUE 03253 thgroup_enclose(VALUE group) 03254 { 03255 struct thgroup *data; 03256 03257 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); 03258 data->enclosed = 1; 03259 03260 return group; 03261 } 03262 03263 03264 /* 03265 * call-seq: 03266 * thgrp.enclosed? -> true or false 03267 * 03268 * Returns <code>true</code> if <em>thgrp</em> is enclosed. See also 03269 * ThreadGroup#enclose. 03270 */ 03271 03272 static VALUE 03273 thgroup_enclosed_p(VALUE group) 03274 { 03275 struct thgroup *data; 03276 03277 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); 03278 if (data->enclosed) 03279 return Qtrue; 03280 return Qfalse; 03281 } 03282 03283 03284 /* 03285 * call-seq: 03286 * thgrp.add(thread) -> thgrp 03287 * 03288 * Adds the given <em>thread</em> to this group, removing it from any other 03289 * group to which it may have previously belonged. 03290 * 03291 * puts "Initial group is #{ThreadGroup::Default.list}" 03292 * tg = ThreadGroup.new 03293 * t1 = Thread.new { sleep } 03294 * t2 = Thread.new { sleep } 03295 * puts "t1 is #{t1}" 03296 * puts "t2 is #{t2}" 03297 * tg.add(t1) 03298 * puts "Initial group now #{ThreadGroup::Default.list}" 03299 * puts "tg group now #{tg.list}" 03300 * 03301 * <em>produces:</em> 03302 * 03303 * Initial group is #<Thread:0x401bdf4c> 03304 * t1 is #<Thread:0x401b3c90> 03305 * t2 is #<Thread:0x401b3c18> 03306 * Initial group now #<Thread:0x401b3c18>#<Thread:0x401bdf4c> 03307 * tg group now #<Thread:0x401b3c90> 03308 */ 03309 03310 static VALUE 03311 thgroup_add(VALUE group, VALUE thread) 03312 { 03313 rb_thread_t *th; 03314 struct thgroup *data; 03315 03316 rb_secure(4); 03317 GetThreadPtr(thread, th); 03318 03319 if (OBJ_FROZEN(group)) { 03320 rb_raise(rb_eThreadError, "can't move to the frozen thread group"); 03321 } 03322 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); 03323 if (data->enclosed) { 03324 rb_raise(rb_eThreadError, "can't move to the enclosed thread group"); 03325 } 03326 03327 if (!th->thgroup) { 03328 return Qnil; 03329 } 03330 03331 if (OBJ_FROZEN(th->thgroup)) { 03332 rb_raise(rb_eThreadError, "can't move from the frozen thread group"); 03333 } 03334 TypedData_Get_Struct(th->thgroup, struct thgroup, &thgroup_data_type, data); 03335 if (data->enclosed) { 03336 rb_raise(rb_eThreadError, 03337 "can't move from the enclosed thread group"); 03338 } 03339 03340 th->thgroup = group; 03341 return group; 03342 } 03343 03344 03345 /* 03346 * Document-class: Mutex 03347 * 03348 * Mutex implements a simple semaphore that can be used to coordinate access to 03349 * shared data from multiple concurrent threads. 03350 * 03351 * Example: 03352 * 03353 * require 'thread' 03354 * semaphore = Mutex.new 03355 * 03356 * a = Thread.new { 03357 * semaphore.synchronize { 03358 * # access shared resource 03359 * } 03360 * } 03361 * 03362 * b = Thread.new { 03363 * semaphore.synchronize { 03364 * # access shared resource 03365 * } 03366 * } 03367 * 03368 */ 03369 03370 #define GetMutexPtr(obj, tobj) \ 03371 TypedData_Get_Struct((obj), rb_mutex_t, &mutex_data_type, (tobj)) 03372 03373 static const char *rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th); 03374 03375 #define mutex_mark NULL 03376 03377 static void 03378 mutex_free(void *ptr) 03379 { 03380 if (ptr) { 03381 rb_mutex_t *mutex = ptr; 03382 if (mutex->th) { 03383 /* rb_warn("free locked mutex"); */ 03384 const char *err = rb_mutex_unlock_th(mutex, mutex->th); 03385 if (err) rb_bug("%s", err); 03386 } 03387 native_mutex_destroy(&mutex->lock); 03388 native_cond_destroy(&mutex->cond); 03389 } 03390 ruby_xfree(ptr); 03391 } 03392 03393 static size_t 03394 mutex_memsize(const void *ptr) 03395 { 03396 return ptr ? sizeof(rb_mutex_t) : 0; 03397 } 03398 03399 static const rb_data_type_t mutex_data_type = { 03400 "mutex", 03401 {mutex_mark, mutex_free, mutex_memsize,}, 03402 }; 03403 03404 VALUE 03405 rb_obj_is_mutex(VALUE obj) 03406 { 03407 if (rb_typeddata_is_kind_of(obj, &mutex_data_type)) { 03408 return Qtrue; 03409 } 03410 else { 03411 return Qfalse; 03412 } 03413 } 03414 03415 static VALUE 03416 mutex_alloc(VALUE klass) 03417 { 03418 VALUE volatile obj; 03419 rb_mutex_t *mutex; 03420 03421 obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex); 03422 native_mutex_initialize(&mutex->lock); 03423 native_cond_initialize(&mutex->cond, RB_CONDATTR_CLOCK_MONOTONIC); 03424 return obj; 03425 } 03426 03427 /* 03428 * call-seq: 03429 * Mutex.new -> mutex 03430 * 03431 * Creates a new Mutex 03432 */ 03433 static VALUE 03434 mutex_initialize(VALUE self) 03435 { 03436 return self; 03437 } 03438 03439 VALUE 03440 rb_mutex_new(void) 03441 { 03442 return mutex_alloc(rb_cMutex); 03443 } 03444 03445 /* 03446 * call-seq: 03447 * mutex.locked? -> true or false 03448 * 03449 * Returns +true+ if this lock is currently held by some thread. 03450 */ 03451 VALUE 03452 rb_mutex_locked_p(VALUE self) 03453 { 03454 rb_mutex_t *mutex; 03455 GetMutexPtr(self, mutex); 03456 return mutex->th ? Qtrue : Qfalse; 03457 } 03458 03459 static void 03460 mutex_locked(rb_thread_t *th, VALUE self) 03461 { 03462 rb_mutex_t *mutex; 03463 GetMutexPtr(self, mutex); 03464 03465 if (th->keeping_mutexes) { 03466 mutex->next_mutex = th->keeping_mutexes; 03467 } 03468 th->keeping_mutexes = mutex; 03469 } 03470 03471 /* 03472 * call-seq: 03473 * mutex.try_lock -> true or false 03474 * 03475 * Attempts to obtain the lock and returns immediately. Returns +true+ if the 03476 * lock was granted. 03477 */ 03478 VALUE 03479 rb_mutex_trylock(VALUE self) 03480 { 03481 rb_mutex_t *mutex; 03482 VALUE locked = Qfalse; 03483 GetMutexPtr(self, mutex); 03484 03485 native_mutex_lock(&mutex->lock); 03486 if (mutex->th == 0) { 03487 mutex->th = GET_THREAD(); 03488 locked = Qtrue; 03489 03490 mutex_locked(GET_THREAD(), self); 03491 } 03492 native_mutex_unlock(&mutex->lock); 03493 03494 return locked; 03495 } 03496 03497 static int 03498 lock_func(rb_thread_t *th, rb_mutex_t *mutex, int timeout_ms) 03499 { 03500 int interrupted = 0; 03501 int err = 0; 03502 03503 mutex->cond_waiting++; 03504 for (;;) { 03505 if (!mutex->th) { 03506 mutex->th = th; 03507 break; 03508 } 03509 if (RUBY_VM_INTERRUPTED(th)) { 03510 interrupted = 1; 03511 break; 03512 } 03513 if (err == ETIMEDOUT) { 03514 interrupted = 2; 03515 break; 03516 } 03517 03518 if (timeout_ms) { 03519 struct timespec timeout_rel; 03520 struct timespec timeout; 03521 03522 timeout_rel.tv_sec = 0; 03523 timeout_rel.tv_nsec = timeout_ms * 1000 * 1000; 03524 timeout = native_cond_timeout(&mutex->cond, timeout_rel); 03525 err = native_cond_timedwait(&mutex->cond, &mutex->lock, &timeout); 03526 } 03527 else { 03528 native_cond_wait(&mutex->cond, &mutex->lock); 03529 err = 0; 03530 } 03531 } 03532 mutex->cond_waiting--; 03533 03534 return interrupted; 03535 } 03536 03537 static void 03538 lock_interrupt(void *ptr) 03539 { 03540 rb_mutex_t *mutex = (rb_mutex_t *)ptr; 03541 native_mutex_lock(&mutex->lock); 03542 if (mutex->cond_waiting > 0) 03543 native_cond_broadcast(&mutex->cond); 03544 native_mutex_unlock(&mutex->lock); 03545 } 03546 03547 /* 03548 * At maximum, only one thread can use cond_timedwait and watch deadlock 03549 * periodically. Multiple polling thread (i.e. concurrent deadlock check) 03550 * introduces new race conditions. [Bug #6278] [ruby-core:44275] 03551 */ 03552 rb_thread_t *patrol_thread = NULL; 03553 03554 /* 03555 * call-seq: 03556 * mutex.lock -> self 03557 * 03558 * Attempts to grab the lock and waits if it isn't available. 03559 * Raises +ThreadError+ if +mutex+ was locked by the current thread. 03560 */ 03561 VALUE 03562 rb_mutex_lock(VALUE self) 03563 { 03564 03565 if (rb_mutex_trylock(self) == Qfalse) { 03566 rb_mutex_t *mutex; 03567 rb_thread_t *th = GET_THREAD(); 03568 GetMutexPtr(self, mutex); 03569 03570 if (mutex->th == GET_THREAD()) { 03571 rb_raise(rb_eThreadError, "deadlock; recursive locking"); 03572 } 03573 03574 while (mutex->th != th) { 03575 int interrupted; 03576 enum rb_thread_status prev_status = th->status; 03577 int timeout_ms = 0; 03578 struct rb_unblock_callback oldubf; 03579 03580 set_unblock_function(th, lock_interrupt, mutex, &oldubf); 03581 th->status = THREAD_STOPPED_FOREVER; 03582 th->locking_mutex = self; 03583 03584 native_mutex_lock(&mutex->lock); 03585 th->vm->sleeper++; 03586 /* 03587 * Carefully! while some contended threads are in lock_func(), 03588 * vm->sleepr is unstable value. we have to avoid both deadlock 03589 * and busy loop. 03590 */ 03591 if ((vm_living_thread_num(th->vm) == th->vm->sleeper) && 03592 !patrol_thread) { 03593 timeout_ms = 100; 03594 patrol_thread = th; 03595 } 03596 03597 GVL_UNLOCK_BEGIN(); 03598 interrupted = lock_func(th, mutex, timeout_ms); 03599 native_mutex_unlock(&mutex->lock); 03600 GVL_UNLOCK_END(); 03601 03602 if (patrol_thread == th) 03603 patrol_thread = NULL; 03604 03605 reset_unblock_function(th, &oldubf); 03606 03607 th->locking_mutex = Qfalse; 03608 if (mutex->th && interrupted == 2) { 03609 rb_check_deadlock(th->vm); 03610 } 03611 if (th->status == THREAD_STOPPED_FOREVER) { 03612 th->status = prev_status; 03613 } 03614 th->vm->sleeper--; 03615 03616 if (mutex->th == th) mutex_locked(th, self); 03617 03618 if (interrupted) { 03619 RUBY_VM_CHECK_INTS(); 03620 } 03621 } 03622 } 03623 return self; 03624 } 03625 03626 static const char * 03627 rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th) 03628 { 03629 const char *err = NULL; 03630 rb_mutex_t *th_mutex; 03631 03632 native_mutex_lock(&mutex->lock); 03633 03634 if (mutex->th == 0) { 03635 err = "Attempt to unlock a mutex which is not locked"; 03636 } 03637 else if (mutex->th != th) { 03638 err = "Attempt to unlock a mutex which is locked by another thread"; 03639 } 03640 else { 03641 mutex->th = 0; 03642 if (mutex->cond_waiting > 0) 03643 native_cond_signal(&mutex->cond); 03644 } 03645 03646 native_mutex_unlock(&mutex->lock); 03647 03648 if (!err) { 03649 th_mutex = th->keeping_mutexes; 03650 if (th_mutex == mutex) { 03651 th->keeping_mutexes = mutex->next_mutex; 03652 } 03653 else { 03654 while (1) { 03655 rb_mutex_t *tmp_mutex; 03656 tmp_mutex = th_mutex->next_mutex; 03657 if (tmp_mutex == mutex) { 03658 th_mutex->next_mutex = tmp_mutex->next_mutex; 03659 break; 03660 } 03661 th_mutex = tmp_mutex; 03662 } 03663 } 03664 mutex->next_mutex = NULL; 03665 } 03666 03667 return err; 03668 } 03669 03670 /* 03671 * call-seq: 03672 * mutex.unlock -> self 03673 * 03674 * Releases the lock. 03675 * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread. 03676 */ 03677 VALUE 03678 rb_mutex_unlock(VALUE self) 03679 { 03680 const char *err; 03681 rb_mutex_t *mutex; 03682 GetMutexPtr(self, mutex); 03683 03684 err = rb_mutex_unlock_th(mutex, GET_THREAD()); 03685 if (err) rb_raise(rb_eThreadError, "%s", err); 03686 03687 return self; 03688 } 03689 03690 static void 03691 rb_mutex_abandon_all(rb_mutex_t *mutexes) 03692 { 03693 rb_mutex_t *mutex; 03694 03695 while (mutexes) { 03696 mutex = mutexes; 03697 mutexes = mutex->next_mutex; 03698 mutex->th = 0; 03699 mutex->next_mutex = 0; 03700 } 03701 } 03702 03703 static VALUE 03704 rb_mutex_sleep_forever(VALUE time) 03705 { 03706 rb_thread_sleep_deadly(); 03707 return Qnil; 03708 } 03709 03710 static VALUE 03711 rb_mutex_wait_for(VALUE time) 03712 { 03713 const struct timeval *t = (struct timeval *)time; 03714 rb_thread_wait_for(*t); 03715 return Qnil; 03716 } 03717 03718 VALUE 03719 rb_mutex_sleep(VALUE self, VALUE timeout) 03720 { 03721 time_t beg, end; 03722 struct timeval t; 03723 03724 if (!NIL_P(timeout)) { 03725 t = rb_time_interval(timeout); 03726 } 03727 rb_mutex_unlock(self); 03728 beg = time(0); 03729 if (NIL_P(timeout)) { 03730 rb_ensure(rb_mutex_sleep_forever, Qnil, rb_mutex_lock, self); 03731 } 03732 else { 03733 rb_ensure(rb_mutex_wait_for, (VALUE)&t, rb_mutex_lock, self); 03734 } 03735 end = time(0) - beg; 03736 return INT2FIX(end); 03737 } 03738 03739 /* 03740 * call-seq: 03741 * mutex.sleep(timeout = nil) -> number 03742 * 03743 * Releases the lock and sleeps +timeout+ seconds if it is given and 03744 * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by 03745 * the current thread. 03746 */ 03747 static VALUE 03748 mutex_sleep(int argc, VALUE *argv, VALUE self) 03749 { 03750 VALUE timeout; 03751 03752 rb_scan_args(argc, argv, "01", &timeout); 03753 return rb_mutex_sleep(self, timeout); 03754 } 03755 03756 /* 03757 * call-seq: 03758 * mutex.synchronize { ... } -> result of the block 03759 * 03760 * Obtains a lock, runs the block, and releases the lock when the block 03761 * completes. See the example under +Mutex+. 03762 */ 03763 03764 VALUE 03765 rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg) 03766 { 03767 rb_mutex_lock(mutex); 03768 return rb_ensure(func, arg, rb_mutex_unlock, mutex); 03769 } 03770 03771 /* 03772 * Document-class: Barrier 03773 */ 03774 static void 03775 barrier_mark(void *ptr) 03776 { 03777 rb_gc_mark((VALUE)ptr); 03778 } 03779 03780 static const rb_data_type_t barrier_data_type = { 03781 "barrier", 03782 {barrier_mark, 0, 0,}, 03783 }; 03784 03785 static VALUE 03786 barrier_alloc(VALUE klass) 03787 { 03788 return TypedData_Wrap_Struct(klass, &barrier_data_type, (void *)mutex_alloc(0)); 03789 } 03790 03791 #define GetBarrierPtr(obj) ((VALUE)rb_check_typeddata((obj), &barrier_data_type)) 03792 03793 VALUE 03794 rb_barrier_new(void) 03795 { 03796 VALUE barrier = barrier_alloc(rb_cBarrier); 03797 rb_mutex_lock((VALUE)DATA_PTR(barrier)); 03798 return barrier; 03799 } 03800 03801 VALUE 03802 rb_barrier_wait(VALUE self) 03803 { 03804 VALUE mutex = GetBarrierPtr(self); 03805 rb_mutex_t *m; 03806 03807 if (!mutex) return Qfalse; 03808 GetMutexPtr(mutex, m); 03809 if (m->th == GET_THREAD()) return Qfalse; 03810 rb_mutex_lock(mutex); 03811 if (DATA_PTR(self)) return Qtrue; 03812 rb_mutex_unlock(mutex); 03813 return Qfalse; 03814 } 03815 03816 VALUE 03817 rb_barrier_release(VALUE self) 03818 { 03819 return rb_mutex_unlock(GetBarrierPtr(self)); 03820 } 03821 03822 VALUE 03823 rb_barrier_destroy(VALUE self) 03824 { 03825 VALUE mutex = GetBarrierPtr(self); 03826 DATA_PTR(self) = 0; 03827 return rb_mutex_unlock(mutex); 03828 } 03829 03830 /* variables for recursive traversals */ 03831 static ID recursive_key; 03832 03833 /* 03834 * Returns the current "recursive list" used to detect recursion. 03835 * This list is a hash table, unique for the current thread and for 03836 * the current __callee__. 03837 */ 03838 03839 static VALUE 03840 recursive_list_access(void) 03841 { 03842 volatile VALUE hash = rb_thread_local_aref(rb_thread_current(), recursive_key); 03843 VALUE sym = ID2SYM(rb_frame_this_func()); 03844 VALUE list; 03845 if (NIL_P(hash) || TYPE(hash) != T_HASH) { 03846 hash = rb_hash_new(); 03847 OBJ_UNTRUST(hash); 03848 rb_thread_local_aset(rb_thread_current(), recursive_key, hash); 03849 list = Qnil; 03850 } 03851 else { 03852 list = rb_hash_aref(hash, sym); 03853 } 03854 if (NIL_P(list) || TYPE(list) != T_HASH) { 03855 list = rb_hash_new(); 03856 OBJ_UNTRUST(list); 03857 rb_hash_aset(hash, sym, list); 03858 } 03859 return list; 03860 } 03861 03862 /* 03863 * Returns Qtrue iff obj_id (or the pair <obj, paired_obj>) is already 03864 * in the recursion list. 03865 * Assumes the recursion list is valid. 03866 */ 03867 03868 static VALUE 03869 recursive_check(VALUE list, VALUE obj_id, VALUE paired_obj_id) 03870 { 03871 #if SIZEOF_LONG == SIZEOF_VOIDP 03872 #define OBJ_ID_EQL(obj_id, other) ((obj_id) == (other)) 03873 #elif SIZEOF_LONG_LONG == SIZEOF_VOIDP 03874 #define OBJ_ID_EQL(obj_id, other) (RB_TYPE_P((obj_id), T_BIGNUM) ? \ 03875 rb_big_eql((obj_id), (other)) : ((obj_id) == (other))) 03876 #endif 03877 03878 VALUE pair_list = rb_hash_lookup2(list, obj_id, Qundef); 03879 if (pair_list == Qundef) 03880 return Qfalse; 03881 if (paired_obj_id) { 03882 if (TYPE(pair_list) != T_HASH) { 03883 if (!OBJ_ID_EQL(paired_obj_id, pair_list)) 03884 return Qfalse; 03885 } 03886 else { 03887 if (NIL_P(rb_hash_lookup(pair_list, paired_obj_id))) 03888 return Qfalse; 03889 } 03890 } 03891 return Qtrue; 03892 } 03893 03894 /* 03895 * Pushes obj_id (or the pair <obj_id, paired_obj_id>) in the recursion list. 03896 * For a single obj_id, it sets list[obj_id] to Qtrue. 03897 * For a pair, it sets list[obj_id] to paired_obj_id if possible, 03898 * otherwise list[obj_id] becomes a hash like: 03899 * {paired_obj_id_1 => true, paired_obj_id_2 => true, ... } 03900 * Assumes the recursion list is valid. 03901 */ 03902 03903 static void 03904 recursive_push(VALUE list, VALUE obj, VALUE paired_obj) 03905 { 03906 VALUE pair_list; 03907 03908 if (!paired_obj) { 03909 rb_hash_aset(list, obj, Qtrue); 03910 } 03911 else if ((pair_list = rb_hash_lookup2(list, obj, Qundef)) == Qundef) { 03912 rb_hash_aset(list, obj, paired_obj); 03913 } 03914 else { 03915 if (TYPE(pair_list) != T_HASH){ 03916 VALUE other_paired_obj = pair_list; 03917 pair_list = rb_hash_new(); 03918 OBJ_UNTRUST(pair_list); 03919 rb_hash_aset(pair_list, other_paired_obj, Qtrue); 03920 rb_hash_aset(list, obj, pair_list); 03921 } 03922 rb_hash_aset(pair_list, paired_obj, Qtrue); 03923 } 03924 } 03925 03926 /* 03927 * Pops obj_id (or the pair <obj_id, paired_obj_id>) from the recursion list. 03928 * For a pair, if list[obj_id] is a hash, then paired_obj_id is 03929 * removed from the hash and no attempt is made to simplify 03930 * list[obj_id] from {only_one_paired_id => true} to only_one_paired_id 03931 * Assumes the recursion list is valid. 03932 */ 03933 03934 static void 03935 recursive_pop(VALUE list, VALUE obj, VALUE paired_obj) 03936 { 03937 if (paired_obj) { 03938 VALUE pair_list = rb_hash_lookup2(list, obj, Qundef); 03939 if (pair_list == Qundef) { 03940 VALUE symname = rb_inspect(ID2SYM(rb_frame_this_func())); 03941 VALUE thrname = rb_inspect(rb_thread_current()); 03942 rb_raise(rb_eTypeError, "invalid inspect_tbl pair_list for %s in %s", 03943 StringValuePtr(symname), StringValuePtr(thrname)); 03944 } 03945 if (TYPE(pair_list) == T_HASH) { 03946 rb_hash_delete(pair_list, paired_obj); 03947 if (!RHASH_EMPTY_P(pair_list)) { 03948 return; /* keep hash until is empty */ 03949 } 03950 } 03951 } 03952 rb_hash_delete(list, obj); 03953 } 03954 03955 struct exec_recursive_params { 03956 VALUE (*func) (VALUE, VALUE, int); 03957 VALUE list; 03958 VALUE obj; 03959 VALUE objid; 03960 VALUE pairid; 03961 VALUE arg; 03962 }; 03963 03964 static VALUE 03965 exec_recursive_i(VALUE tag, struct exec_recursive_params *p) 03966 { 03967 VALUE result = Qundef; 03968 int state; 03969 03970 recursive_push(p->list, p->objid, p->pairid); 03971 PUSH_TAG(); 03972 if ((state = EXEC_TAG()) == 0) { 03973 result = (*p->func)(p->obj, p->arg, FALSE); 03974 } 03975 POP_TAG(); 03976 recursive_pop(p->list, p->objid, p->pairid); 03977 if (state) 03978 JUMP_TAG(state); 03979 return result; 03980 } 03981 03982 /* 03983 * Calls func(obj, arg, recursive), where recursive is non-zero if the 03984 * current method is called recursively on obj, or on the pair <obj, pairid> 03985 * If outer is 0, then the innermost func will be called with recursive set 03986 * to Qtrue, otherwise the outermost func will be called. In the latter case, 03987 * all inner func are short-circuited by throw. 03988 * Implementation details: the value thrown is the recursive list which is 03989 * proper to the current method and unlikely to be catched anywhere else. 03990 * list[recursive_key] is used as a flag for the outermost call. 03991 */ 03992 03993 static VALUE 03994 exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE arg, int outer) 03995 { 03996 VALUE result = Qundef; 03997 struct exec_recursive_params p; 03998 int outermost; 03999 p.list = recursive_list_access(); 04000 p.objid = rb_obj_id(obj); 04001 p.obj = obj; 04002 p.pairid = pairid; 04003 p.arg = arg; 04004 outermost = outer && !recursive_check(p.list, ID2SYM(recursive_key), 0); 04005 04006 if (recursive_check(p.list, p.objid, pairid)) { 04007 if (outer && !outermost) { 04008 rb_throw_obj(p.list, p.list); 04009 } 04010 return (*func)(obj, arg, TRUE); 04011 } 04012 else { 04013 p.func = func; 04014 04015 if (outermost) { 04016 recursive_push(p.list, ID2SYM(recursive_key), 0); 04017 result = rb_catch_obj(p.list, exec_recursive_i, (VALUE)&p); 04018 recursive_pop(p.list, ID2SYM(recursive_key), 0); 04019 if (result == p.list) { 04020 result = (*func)(obj, arg, TRUE); 04021 } 04022 } 04023 else { 04024 result = exec_recursive_i(0, &p); 04025 } 04026 } 04027 *(volatile struct exec_recursive_params *)&p; 04028 return result; 04029 } 04030 04031 /* 04032 * Calls func(obj, arg, recursive), where recursive is non-zero if the 04033 * current method is called recursively on obj 04034 */ 04035 04036 VALUE 04037 rb_exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg) 04038 { 04039 return exec_recursive(func, obj, 0, arg, 0); 04040 } 04041 04042 /* 04043 * Calls func(obj, arg, recursive), where recursive is non-zero if the 04044 * current method is called recursively on the ordered pair <obj, paired_obj> 04045 */ 04046 04047 VALUE 04048 rb_exec_recursive_paired(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg) 04049 { 04050 return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 0); 04051 } 04052 04053 /* 04054 * If recursion is detected on the current method and obj, the outermost 04055 * func will be called with (obj, arg, Qtrue). All inner func will be 04056 * short-circuited using throw. 04057 */ 04058 04059 VALUE 04060 rb_exec_recursive_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg) 04061 { 04062 return exec_recursive(func, obj, 0, arg, 1); 04063 } 04064 04065 /* tracer */ 04066 #define RUBY_EVENT_REMOVED 0x1000000 04067 04068 enum { 04069 EVENT_RUNNING_NOTHING, 04070 EVENT_RUNNING_TRACE = 1, 04071 EVENT_RUNNING_THREAD = 2, 04072 EVENT_RUNNING_VM = 4, 04073 EVENT_RUNNING_EVENT_MASK = EVENT_RUNNING_VM|EVENT_RUNNING_THREAD 04074 }; 04075 04076 static VALUE thread_suppress_tracing(rb_thread_t *th, int ev, VALUE (*func)(VALUE, int), VALUE arg, int always, int pop_p); 04077 04078 struct event_call_args { 04079 rb_thread_t *th; 04080 VALUE klass; 04081 VALUE self; 04082 VALUE proc; 04083 ID id; 04084 rb_event_flag_t event; 04085 }; 04086 04087 static rb_event_hook_t * 04088 alloc_event_hook(rb_event_hook_func_t func, rb_event_flag_t events, VALUE data) 04089 { 04090 rb_event_hook_t *hook = ALLOC(rb_event_hook_t); 04091 hook->func = func; 04092 hook->flag = events; 04093 hook->data = data; 04094 return hook; 04095 } 04096 04097 static void 04098 thread_reset_event_flags(rb_thread_t *th) 04099 { 04100 rb_event_hook_t *hook = th->event_hooks; 04101 rb_event_flag_t flag = th->event_flags & RUBY_EVENT_VM; 04102 04103 while (hook) { 04104 if (!(flag & RUBY_EVENT_REMOVED)) 04105 flag |= hook->flag; 04106 hook = hook->next; 04107 } 04108 th->event_flags = flag; 04109 } 04110 04111 static void 04112 rb_threadptr_add_event_hook(rb_thread_t *th, 04113 rb_event_hook_func_t func, rb_event_flag_t events, VALUE data) 04114 { 04115 rb_event_hook_t *hook = alloc_event_hook(func, events, data); 04116 hook->next = th->event_hooks; 04117 th->event_hooks = hook; 04118 thread_reset_event_flags(th); 04119 } 04120 04121 static rb_thread_t * 04122 thval2thread_t(VALUE thval) 04123 { 04124 rb_thread_t *th; 04125 GetThreadPtr(thval, th); 04126 return th; 04127 } 04128 04129 void 04130 rb_thread_add_event_hook(VALUE thval, 04131 rb_event_hook_func_t func, rb_event_flag_t events, VALUE data) 04132 { 04133 rb_threadptr_add_event_hook(thval2thread_t(thval), func, events, data); 04134 } 04135 04136 static int 04137 set_threads_event_flags_i(st_data_t key, st_data_t val, st_data_t flag) 04138 { 04139 VALUE thval = key; 04140 rb_thread_t *th; 04141 GetThreadPtr(thval, th); 04142 04143 if (flag) { 04144 th->event_flags |= RUBY_EVENT_VM; 04145 } 04146 else { 04147 th->event_flags &= (~RUBY_EVENT_VM); 04148 } 04149 return ST_CONTINUE; 04150 } 04151 04152 static void 04153 set_threads_event_flags(int flag) 04154 { 04155 st_foreach(GET_VM()->living_threads, set_threads_event_flags_i, (st_data_t) flag); 04156 } 04157 04158 static inline int 04159 exec_event_hooks(const rb_event_hook_t *hook, rb_event_flag_t flag, VALUE self, ID id, VALUE klass) 04160 { 04161 int removed = 0; 04162 for (; hook; hook = hook->next) { 04163 if (hook->flag & RUBY_EVENT_REMOVED) { 04164 removed++; 04165 continue; 04166 } 04167 if (flag & hook->flag) { 04168 (*hook->func)(flag, hook->data, self, id, klass); 04169 } 04170 } 04171 return removed; 04172 } 04173 04174 static int remove_defered_event_hook(rb_event_hook_t **root); 04175 04176 static VALUE 04177 thread_exec_event_hooks(VALUE args, int running) 04178 { 04179 struct event_call_args *argp = (struct event_call_args *)args; 04180 rb_thread_t *th = argp->th; 04181 rb_event_flag_t flag = argp->event; 04182 VALUE self = argp->self; 04183 ID id = argp->id; 04184 VALUE klass = argp->klass; 04185 const rb_event_flag_t wait_event = th->event_flags; 04186 int removed; 04187 04188 if (self == rb_mRubyVMFrozenCore) return 0; 04189 04190 if ((wait_event & flag) && !(running & EVENT_RUNNING_THREAD)) { 04191 th->tracing |= EVENT_RUNNING_THREAD; 04192 removed = exec_event_hooks(th->event_hooks, flag, self, id, klass); 04193 th->tracing &= ~EVENT_RUNNING_THREAD; 04194 if (removed) { 04195 remove_defered_event_hook(&th->event_hooks); 04196 } 04197 } 04198 if (wait_event & RUBY_EVENT_VM) { 04199 if (th->vm->event_hooks == NULL) { 04200 th->event_flags &= (~RUBY_EVENT_VM); 04201 } 04202 else if (!(running & EVENT_RUNNING_VM)) { 04203 th->tracing |= EVENT_RUNNING_VM; 04204 removed = exec_event_hooks(th->vm->event_hooks, flag, self, id, klass); 04205 th->tracing &= ~EVENT_RUNNING_VM; 04206 if (removed) { 04207 remove_defered_event_hook(&th->vm->event_hooks); 04208 } 04209 } 04210 } 04211 return 0; 04212 } 04213 04214 void 04215 rb_threadptr_exec_event_hooks(rb_thread_t *th, rb_event_flag_t flag, VALUE self, ID id, VALUE klass, int pop_p) 04216 { 04217 const VALUE errinfo = th->errinfo; 04218 struct event_call_args args; 04219 args.th = th; 04220 args.event = flag; 04221 args.self = self; 04222 args.id = id; 04223 args.klass = klass; 04224 args.proc = 0; 04225 thread_suppress_tracing(th, EVENT_RUNNING_EVENT_MASK, thread_exec_event_hooks, (VALUE)&args, FALSE, pop_p); 04226 th->errinfo = errinfo; 04227 } 04228 04229 void 04230 rb_add_event_hook(rb_event_hook_func_t func, rb_event_flag_t events, VALUE data) 04231 { 04232 rb_event_hook_t *hook = alloc_event_hook(func, events, data); 04233 rb_vm_t *vm = GET_VM(); 04234 04235 hook->next = vm->event_hooks; 04236 vm->event_hooks = hook; 04237 04238 set_threads_event_flags(1); 04239 } 04240 04241 static int 04242 defer_remove_event_hook(rb_event_hook_t *hook, rb_event_hook_func_t func) 04243 { 04244 while (hook) { 04245 if (func == 0 || hook->func == func) { 04246 hook->flag |= RUBY_EVENT_REMOVED; 04247 } 04248 hook = hook->next; 04249 } 04250 return -1; 04251 } 04252 04253 static int 04254 remove_event_hook(rb_event_hook_t **root, rb_event_hook_func_t func) 04255 { 04256 rb_event_hook_t *hook = *root, *next; 04257 04258 while (hook) { 04259 next = hook->next; 04260 if (func == 0 || hook->func == func || (hook->flag & RUBY_EVENT_REMOVED)) { 04261 *root = next; 04262 xfree(hook); 04263 } 04264 else { 04265 root = &hook->next; 04266 } 04267 hook = next; 04268 } 04269 return -1; 04270 } 04271 04272 static int 04273 remove_defered_event_hook(rb_event_hook_t **root) 04274 { 04275 rb_event_hook_t *hook = *root, *next; 04276 04277 while (hook) { 04278 next = hook->next; 04279 if (hook->flag & RUBY_EVENT_REMOVED) { 04280 *root = next; 04281 xfree(hook); 04282 } 04283 else { 04284 root = &hook->next; 04285 } 04286 hook = next; 04287 } 04288 return -1; 04289 } 04290 04291 static int 04292 rb_threadptr_remove_event_hook(rb_thread_t *th, rb_event_hook_func_t func) 04293 { 04294 int ret; 04295 if (th->tracing & EVENT_RUNNING_THREAD) { 04296 ret = defer_remove_event_hook(th->event_hooks, func); 04297 } 04298 else { 04299 ret = remove_event_hook(&th->event_hooks, func); 04300 } 04301 thread_reset_event_flags(th); 04302 return ret; 04303 } 04304 04305 int 04306 rb_thread_remove_event_hook(VALUE thval, rb_event_hook_func_t func) 04307 { 04308 return rb_threadptr_remove_event_hook(thval2thread_t(thval), func); 04309 } 04310 04311 static rb_event_hook_t * 04312 search_live_hook(rb_event_hook_t *hook) 04313 { 04314 while (hook) { 04315 if (!(hook->flag & RUBY_EVENT_REMOVED)) 04316 return hook; 04317 hook = hook->next; 04318 } 04319 return NULL; 04320 } 04321 04322 static int 04323 running_vm_event_hooks(st_data_t key, st_data_t val, st_data_t data) 04324 { 04325 rb_thread_t *th = thval2thread_t((VALUE)key); 04326 if (!(th->tracing & EVENT_RUNNING_VM)) return ST_CONTINUE; 04327 *(rb_thread_t **)data = th; 04328 return ST_STOP; 04329 } 04330 04331 static rb_thread_t * 04332 vm_event_hooks_running_thread(rb_vm_t *vm) 04333 { 04334 rb_thread_t *found = NULL; 04335 st_foreach(vm->living_threads, running_vm_event_hooks, (st_data_t)&found); 04336 return found; 04337 } 04338 04339 int 04340 rb_remove_event_hook(rb_event_hook_func_t func) 04341 { 04342 rb_vm_t *vm = GET_VM(); 04343 rb_event_hook_t *hook = search_live_hook(vm->event_hooks); 04344 int ret; 04345 04346 if (vm_event_hooks_running_thread(vm)) { 04347 ret = defer_remove_event_hook(vm->event_hooks, func); 04348 } 04349 else { 04350 ret = remove_event_hook(&vm->event_hooks, func); 04351 } 04352 04353 if (hook && !search_live_hook(vm->event_hooks)) { 04354 set_threads_event_flags(0); 04355 } 04356 04357 return ret; 04358 } 04359 04360 static int 04361 clear_trace_func_i(st_data_t key, st_data_t val, st_data_t flag) 04362 { 04363 rb_thread_t *th; 04364 GetThreadPtr((VALUE)key, th); 04365 rb_threadptr_remove_event_hook(th, 0); 04366 return ST_CONTINUE; 04367 } 04368 04369 void 04370 rb_clear_trace_func(void) 04371 { 04372 st_foreach(GET_VM()->living_threads, clear_trace_func_i, (st_data_t) 0); 04373 rb_remove_event_hook(0); 04374 } 04375 04376 static void call_trace_func(rb_event_flag_t, VALUE data, VALUE self, ID id, VALUE klass); 04377 04378 /* 04379 * call-seq: 04380 * set_trace_func(proc) -> proc 04381 * set_trace_func(nil) -> nil 04382 * 04383 * Establishes _proc_ as the handler for tracing, or disables 04384 * tracing if the parameter is +nil+. _proc_ takes up 04385 * to six parameters: an event name, a filename, a line number, an 04386 * object id, a binding, and the name of a class. _proc_ is 04387 * invoked whenever an event occurs. Events are: <code>c-call</code> 04388 * (call a C-language routine), <code>c-return</code> (return from a 04389 * C-language routine), <code>call</code> (call a Ruby method), 04390 * <code>class</code> (start a class or module definition), 04391 * <code>end</code> (finish a class or module definition), 04392 * <code>line</code> (execute code on a new line), <code>raise</code> 04393 * (raise an exception), and <code>return</code> (return from a Ruby 04394 * method). Tracing is disabled within the context of _proc_. 04395 * 04396 * class Test 04397 * def test 04398 * a = 1 04399 * b = 2 04400 * end 04401 * end 04402 * 04403 * set_trace_func proc { |event, file, line, id, binding, classname| 04404 * printf "%8s %s:%-2d %10s %8s\n", event, file, line, id, classname 04405 * } 04406 * t = Test.new 04407 * t.test 04408 * 04409 * line prog.rb:11 false 04410 * c-call prog.rb:11 new Class 04411 * c-call prog.rb:11 initialize Object 04412 * c-return prog.rb:11 initialize Object 04413 * c-return prog.rb:11 new Class 04414 * line prog.rb:12 false 04415 * call prog.rb:2 test Test 04416 * line prog.rb:3 test Test 04417 * line prog.rb:4 test Test 04418 * return prog.rb:4 test Test 04419 */ 04420 04421 static VALUE 04422 set_trace_func(VALUE obj, VALUE trace) 04423 { 04424 rb_remove_event_hook(call_trace_func); 04425 04426 if (NIL_P(trace)) { 04427 GET_THREAD()->tracing = EVENT_RUNNING_NOTHING; 04428 return Qnil; 04429 } 04430 04431 if (!rb_obj_is_proc(trace)) { 04432 rb_raise(rb_eTypeError, "trace_func needs to be Proc"); 04433 } 04434 04435 rb_add_event_hook(call_trace_func, RUBY_EVENT_ALL, trace); 04436 return trace; 04437 } 04438 04439 static void 04440 thread_add_trace_func(rb_thread_t *th, VALUE trace) 04441 { 04442 if (!rb_obj_is_proc(trace)) { 04443 rb_raise(rb_eTypeError, "trace_func needs to be Proc"); 04444 } 04445 04446 rb_threadptr_add_event_hook(th, call_trace_func, RUBY_EVENT_ALL, trace); 04447 } 04448 04449 /* 04450 * call-seq: 04451 * thr.add_trace_func(proc) -> proc 04452 * 04453 * Adds _proc_ as a handler for tracing. 04454 * See <code>Thread#set_trace_func</code> and +set_trace_func+. 04455 */ 04456 04457 static VALUE 04458 thread_add_trace_func_m(VALUE obj, VALUE trace) 04459 { 04460 rb_thread_t *th; 04461 GetThreadPtr(obj, th); 04462 thread_add_trace_func(th, trace); 04463 return trace; 04464 } 04465 04466 /* 04467 * call-seq: 04468 * thr.set_trace_func(proc) -> proc 04469 * thr.set_trace_func(nil) -> nil 04470 * 04471 * Establishes _proc_ on _thr_ as the handler for tracing, or 04472 * disables tracing if the parameter is +nil+. 04473 * See +set_trace_func+. 04474 */ 04475 04476 static VALUE 04477 thread_set_trace_func_m(VALUE obj, VALUE trace) 04478 { 04479 rb_thread_t *th; 04480 GetThreadPtr(obj, th); 04481 rb_threadptr_remove_event_hook(th, call_trace_func); 04482 04483 if (NIL_P(trace)) { 04484 th->tracing = EVENT_RUNNING_NOTHING; 04485 return Qnil; 04486 } 04487 thread_add_trace_func(th, trace); 04488 return trace; 04489 } 04490 04491 static const char * 04492 get_event_name(rb_event_flag_t event) 04493 { 04494 switch (event) { 04495 case RUBY_EVENT_LINE: 04496 return "line"; 04497 case RUBY_EVENT_CLASS: 04498 return "class"; 04499 case RUBY_EVENT_END: 04500 return "end"; 04501 case RUBY_EVENT_CALL: 04502 return "call"; 04503 case RUBY_EVENT_RETURN: 04504 return "return"; 04505 case RUBY_EVENT_C_CALL: 04506 return "c-call"; 04507 case RUBY_EVENT_C_RETURN: 04508 return "c-return"; 04509 case RUBY_EVENT_RAISE: 04510 return "raise"; 04511 default: 04512 return "unknown"; 04513 } 04514 } 04515 04516 static VALUE 04517 call_trace_proc(VALUE args, int tracing) 04518 { 04519 struct event_call_args *p = (struct event_call_args *)args; 04520 const char *srcfile = rb_sourcefile(); 04521 VALUE eventname = rb_str_new2(get_event_name(p->event)); 04522 VALUE filename = srcfile ? rb_str_new2(srcfile) : Qnil; 04523 VALUE argv[6]; 04524 int line = rb_sourceline(); 04525 ID id = 0; 04526 VALUE klass = 0; 04527 04528 if (p->klass != 0) { 04529 id = p->id; 04530 klass = p->klass; 04531 } 04532 else { 04533 rb_thread_method_id_and_class(p->th, &id, &klass); 04534 } 04535 if (id == ID_ALLOCATOR) 04536 return Qnil; 04537 if (klass) { 04538 if (TYPE(klass) == T_ICLASS) { 04539 klass = RBASIC(klass)->klass; 04540 } 04541 else if (FL_TEST(klass, FL_SINGLETON)) { 04542 klass = rb_iv_get(klass, "__attached__"); 04543 } 04544 } 04545 04546 argv[0] = eventname; 04547 argv[1] = filename; 04548 argv[2] = INT2FIX(line); 04549 argv[3] = id ? ID2SYM(id) : Qnil; 04550 argv[4] = (p->self && srcfile) ? rb_binding_new() : Qnil; 04551 argv[5] = klass ? klass : Qnil; 04552 04553 return rb_proc_call_with_block(p->proc, 6, argv, Qnil); 04554 } 04555 04556 static void 04557 call_trace_func(rb_event_flag_t event, VALUE proc, VALUE self, ID id, VALUE klass) 04558 { 04559 struct event_call_args args; 04560 04561 args.th = GET_THREAD(); 04562 args.event = event; 04563 args.proc = proc; 04564 args.self = self; 04565 args.id = id; 04566 args.klass = klass; 04567 ruby_suppress_tracing(call_trace_proc, (VALUE)&args, FALSE); 04568 } 04569 04570 VALUE 04571 ruby_suppress_tracing(VALUE (*func)(VALUE, int), VALUE arg, int always) 04572 { 04573 rb_thread_t *th = GET_THREAD(); 04574 return thread_suppress_tracing(th, EVENT_RUNNING_TRACE, func, arg, always, 0); 04575 } 04576 04577 static VALUE 04578 thread_suppress_tracing(rb_thread_t *th, int ev, VALUE (*func)(VALUE, int), VALUE arg, int always, int pop_p) 04579 { 04580 int state, tracing = th->tracing, running = tracing & ev; 04581 volatile int raised; 04582 volatile int outer_state; 04583 VALUE result = Qnil; 04584 04585 if (running == ev && !always) { 04586 return Qnil; 04587 } 04588 else { 04589 th->tracing |= ev; 04590 } 04591 04592 raised = rb_threadptr_reset_raised(th); 04593 outer_state = th->state; 04594 th->state = 0; 04595 04596 PUSH_TAG(); 04597 if ((state = EXEC_TAG()) == 0) { 04598 result = (*func)(arg, running); 04599 } 04600 04601 if (raised) { 04602 rb_threadptr_set_raised(th); 04603 } 04604 POP_TAG(); 04605 04606 th->tracing = tracing; 04607 if (state) { 04608 if (pop_p) { 04609 th->cfp = RUBY_VM_PREVIOUS_CONTROL_FRAME(th->cfp); 04610 } 04611 JUMP_TAG(state); 04612 } 04613 th->state = outer_state; 04614 04615 return result; 04616 } 04617 04618 /* 04619 * call-seq: 04620 * thr.backtrace -> array 04621 * 04622 * Returns the current back trace of the _thr_. 04623 */ 04624 04625 static VALUE 04626 rb_thread_backtrace_m(VALUE thval) 04627 { 04628 return rb_thread_backtrace(thval); 04629 } 04630 04631 /* 04632 * Document-class: ThreadError 04633 * 04634 * Raised when an invalid operation is attempted on a thread. 04635 * 04636 * For example, when no other thread has been started: 04637 * 04638 * Thread.stop 04639 * 04640 * <em>raises the exception:</em> 04641 * 04642 * ThreadError: stopping only thread 04643 */ 04644 04645 /* 04646 * +Thread+ encapsulates the behavior of a thread of 04647 * execution, including the main thread of the Ruby script. 04648 * 04649 * In the descriptions of the methods in this class, the parameter _sym_ 04650 * refers to a symbol, which is either a quoted string or a 04651 * +Symbol+ (such as <code>:name</code>). 04652 */ 04653 04654 void 04655 Init_Thread(void) 04656 { 04657 #undef rb_intern 04658 #define rb_intern(str) rb_intern_const(str) 04659 04660 VALUE cThGroup; 04661 rb_thread_t *th = GET_THREAD(); 04662 04663 rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1); 04664 rb_define_singleton_method(rb_cThread, "start", thread_start, -2); 04665 rb_define_singleton_method(rb_cThread, "fork", thread_start, -2); 04666 rb_define_singleton_method(rb_cThread, "main", rb_thread_s_main, 0); 04667 rb_define_singleton_method(rb_cThread, "current", thread_s_current, 0); 04668 rb_define_singleton_method(rb_cThread, "stop", rb_thread_stop, 0); 04669 rb_define_singleton_method(rb_cThread, "kill", rb_thread_s_kill, 1); 04670 rb_define_singleton_method(rb_cThread, "exit", rb_thread_exit, 0); 04671 rb_define_singleton_method(rb_cThread, "pass", thread_s_pass, 0); 04672 rb_define_singleton_method(rb_cThread, "list", rb_thread_list, 0); 04673 rb_define_singleton_method(rb_cThread, "abort_on_exception", rb_thread_s_abort_exc, 0); 04674 rb_define_singleton_method(rb_cThread, "abort_on_exception=", rb_thread_s_abort_exc_set, 1); 04675 #if THREAD_DEBUG < 0 04676 rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0); 04677 rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1); 04678 #endif 04679 04680 rb_define_method(rb_cThread, "initialize", thread_initialize, -2); 04681 rb_define_method(rb_cThread, "raise", thread_raise_m, -1); 04682 rb_define_method(rb_cThread, "join", thread_join_m, -1); 04683 rb_define_method(rb_cThread, "value", thread_value, 0); 04684 rb_define_method(rb_cThread, "kill", rb_thread_kill, 0); 04685 rb_define_method(rb_cThread, "terminate", rb_thread_kill, 0); 04686 rb_define_method(rb_cThread, "exit", rb_thread_kill, 0); 04687 rb_define_method(rb_cThread, "run", rb_thread_run, 0); 04688 rb_define_method(rb_cThread, "wakeup", rb_thread_wakeup, 0); 04689 rb_define_method(rb_cThread, "[]", rb_thread_aref, 1); 04690 rb_define_method(rb_cThread, "[]=", rb_thread_aset, 2); 04691 rb_define_method(rb_cThread, "key?", rb_thread_key_p, 1); 04692 rb_define_method(rb_cThread, "keys", rb_thread_keys, 0); 04693 rb_define_method(rb_cThread, "priority", rb_thread_priority, 0); 04694 rb_define_method(rb_cThread, "priority=", rb_thread_priority_set, 1); 04695 rb_define_method(rb_cThread, "status", rb_thread_status, 0); 04696 rb_define_method(rb_cThread, "alive?", rb_thread_alive_p, 0); 04697 rb_define_method(rb_cThread, "stop?", rb_thread_stop_p, 0); 04698 rb_define_method(rb_cThread, "abort_on_exception", rb_thread_abort_exc, 0); 04699 rb_define_method(rb_cThread, "abort_on_exception=", rb_thread_abort_exc_set, 1); 04700 rb_define_method(rb_cThread, "safe_level", rb_thread_safe_level, 0); 04701 rb_define_method(rb_cThread, "group", rb_thread_group, 0); 04702 rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, 0); 04703 04704 rb_define_method(rb_cThread, "inspect", rb_thread_inspect, 0); 04705 04706 closed_stream_error = rb_exc_new2(rb_eIOError, "stream closed"); 04707 OBJ_TAINT(closed_stream_error); 04708 OBJ_FREEZE(closed_stream_error); 04709 04710 cThGroup = rb_define_class("ThreadGroup", rb_cObject); 04711 rb_define_alloc_func(cThGroup, thgroup_s_alloc); 04712 rb_define_method(cThGroup, "list", thgroup_list, 0); 04713 rb_define_method(cThGroup, "enclose", thgroup_enclose, 0); 04714 rb_define_method(cThGroup, "enclosed?", thgroup_enclosed_p, 0); 04715 rb_define_method(cThGroup, "add", thgroup_add, 1); 04716 04717 { 04718 th->thgroup = th->vm->thgroup_default = rb_obj_alloc(cThGroup); 04719 rb_define_const(cThGroup, "Default", th->thgroup); 04720 } 04721 04722 rb_cMutex = rb_define_class("Mutex", rb_cObject); 04723 rb_define_alloc_func(rb_cMutex, mutex_alloc); 04724 rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0); 04725 rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0); 04726 rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0); 04727 rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0); 04728 rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0); 04729 rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1); 04730 04731 recursive_key = rb_intern("__recursive_key__"); 04732 rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError); 04733 04734 /* trace */ 04735 rb_define_global_function("set_trace_func", set_trace_func, 1); 04736 rb_define_method(rb_cThread, "set_trace_func", thread_set_trace_func_m, 1); 04737 rb_define_method(rb_cThread, "add_trace_func", thread_add_trace_func_m, 1); 04738 04739 /* init thread core */ 04740 { 04741 /* main thread setting */ 04742 { 04743 /* acquire global vm lock */ 04744 gvl_init(th->vm); 04745 gvl_acquire(th->vm, th); 04746 native_mutex_initialize(&th->interrupt_lock); 04747 } 04748 } 04749 04750 rb_thread_create_timer_thread(); 04751 04752 /* suppress warnings on cygwin, mingw and mswin.*/ 04753 (void)native_mutex_trylock; 04754 } 04755 04756 int 04757 ruby_native_thread_p(void) 04758 { 04759 rb_thread_t *th = ruby_thread_from_native(); 04760 04761 return th != 0; 04762 } 04763 04764 static int 04765 check_deadlock_i(st_data_t key, st_data_t val, int *found) 04766 { 04767 VALUE thval = key; 04768 rb_thread_t *th; 04769 GetThreadPtr(thval, th); 04770 04771 if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th)) { 04772 *found = 1; 04773 } 04774 else if (th->locking_mutex) { 04775 rb_mutex_t *mutex; 04776 GetMutexPtr(th->locking_mutex, mutex); 04777 04778 native_mutex_lock(&mutex->lock); 04779 if (mutex->th == th || (!mutex->th && mutex->cond_waiting)) { 04780 *found = 1; 04781 } 04782 native_mutex_unlock(&mutex->lock); 04783 } 04784 04785 return (*found) ? ST_STOP : ST_CONTINUE; 04786 } 04787 04788 #ifdef DEBUG_DEADLOCK_CHECK 04789 static int 04790 debug_i(st_data_t key, st_data_t val, int *found) 04791 { 04792 VALUE thval = key; 04793 rb_thread_t *th; 04794 GetThreadPtr(thval, th); 04795 04796 printf("th:%p %d %d", th, th->status, th->interrupt_flag); 04797 if (th->locking_mutex) { 04798 rb_mutex_t *mutex; 04799 GetMutexPtr(th->locking_mutex, mutex); 04800 04801 native_mutex_lock(&mutex->lock); 04802 printf(" %p %d\n", mutex->th, mutex->cond_waiting); 04803 native_mutex_unlock(&mutex->lock); 04804 } 04805 else 04806 puts(""); 04807 04808 return ST_CONTINUE; 04809 } 04810 #endif 04811 04812 static void 04813 rb_check_deadlock(rb_vm_t *vm) 04814 { 04815 int found = 0; 04816 04817 if (vm_living_thread_num(vm) > vm->sleeper) return; 04818 if (vm_living_thread_num(vm) < vm->sleeper) rb_bug("sleeper must not be more than vm_living_thread_num(vm)"); 04819 if (patrol_thread && patrol_thread != GET_THREAD()) return; 04820 04821 st_foreach(vm->living_threads, check_deadlock_i, (st_data_t)&found); 04822 04823 if (!found) { 04824 VALUE argv[2]; 04825 argv[0] = rb_eFatal; 04826 argv[1] = rb_str_new2("deadlock detected"); 04827 #ifdef DEBUG_DEADLOCK_CHECK 04828 printf("%d %d %p %p\n", vm->living_threads->num_entries, vm->sleeper, GET_THREAD(), vm->main_thread); 04829 st_foreach(vm->living_threads, debug_i, (st_data_t)0); 04830 #endif 04831 vm->sleeper--; 04832 rb_threadptr_raise(vm->main_thread, 2, argv); 04833 } 04834 } 04835 04836 static void 04837 update_coverage(rb_event_flag_t event, VALUE proc, VALUE self, ID id, VALUE klass) 04838 { 04839 VALUE coverage = GET_THREAD()->cfp->iseq->coverage; 04840 if (coverage && RBASIC(coverage)->klass == 0) { 04841 long line = rb_sourceline() - 1; 04842 long count; 04843 if (RARRAY_PTR(coverage)[line] == Qnil) { 04844 return; 04845 } 04846 count = FIX2LONG(RARRAY_PTR(coverage)[line]) + 1; 04847 if (POSFIXABLE(count)) { 04848 RARRAY_PTR(coverage)[line] = LONG2FIX(count); 04849 } 04850 } 04851 } 04852 04853 VALUE 04854 rb_get_coverages(void) 04855 { 04856 return GET_VM()->coverages; 04857 } 04858 04859 void 04860 rb_set_coverages(VALUE coverages) 04861 { 04862 GET_VM()->coverages = coverages; 04863 rb_add_event_hook(update_coverage, RUBY_EVENT_COVERAGE, Qnil); 04864 } 04865 04866 void 04867 rb_reset_coverages(void) 04868 { 04869 GET_VM()->coverages = Qfalse; 04870 rb_remove_event_hook(update_coverage); 04871 } 04872 04873
1.7.6.1