1 /** 2 * The `Event` type and facilities 3 */ 4 module libsnooze.event; 5 6 // TODO: Would be nice if this built without unit tests failing 7 // ... so I'd like libsnooze.clib to work as my IDE picks up on 8 // ... it then 9 10 version(release) 11 { 12 import libsnooze.clib : pipe, write, read; 13 import libsnooze.clib : select, fd_set, fdSetZero, fdSetSet; 14 import libsnooze.clib : timeval, time_t, suseconds_t; 15 } 16 else 17 { 18 import clib : pipe, write, read; 19 import clib : select, fd_set, fdSetZero, fdSetSet; 20 import clib : timeval, time_t, suseconds_t; 21 } 22 23 import core.thread : Thread, Duration, dur; 24 import core.sync.mutex : Mutex; 25 import libsnooze.exceptions; 26 import std.conv : to; 27 import core.stdc.errno; 28 29 /** 30 * Represents an object you can wait and notify/notifyAll on 31 */ 32 public class Event 33 { 34 /* Array of [readFD, writeFD] pairs/arrays */ 35 private int[2][Thread] pipes; 36 private Mutex pipesLock; 37 38 private bool nonFail = false; 39 40 /** 41 * Constructs a new Event 42 */ 43 this() 44 { 45 internalInit(); 46 } 47 48 private void internalInit() 49 { 50 version(Linux) 51 { 52 // TODO: Switch to eventfd in the future 53 initPipe(); 54 pragma(msg, "Buulding on linux uses the `pipe(int*)` system call"); 55 } 56 else version(Windows) 57 { 58 throw SnoozeError("Platform Windows is not supported"); 59 } 60 else 61 { 62 initPipe(); 63 } 64 } 65 66 private void initPipe() 67 { 68 /* Create a lock for the pipe-pair array */ 69 pipesLock = new Mutex(); 70 } 71 72 /** 73 * Wait on this event indefinately 74 * 75 * Throws: 76 * `InterruptedException` if the `wait()` 77 * was interrupted for some reason 78 * Throws: 79 * `FatalException` if a fatal error with 80 * the underlying mechanism occurs 81 */ 82 public final void wait() 83 { 84 /* Wait with no timeout (hence the null) */ 85 wait(null); 86 } 87 88 /** 89 * Ensures that the calling Thread gets a registered 90 * pipe added for it when called. 91 * 92 * This can be useful if one wants to initialize several 93 * threads that should be able to all be notified and wake up 94 * on their first call to wait instead of having wait 95 * ensure the pipe is created on first call. 96 * 97 * Throws: 98 * `FatalException` on creating the pipe-pair 99 * if needs be 100 */ 101 public final void ensure() 102 { 103 /* Get the thread object (TID) for the calling thread */ 104 Thread callingThread = Thread.getThis(); 105 106 /* Checks if a pipe-pair exists, if not creates it */ 107 pipeExistenceEnsure(callingThread); 108 } 109 110 /** 111 * Returns the pipe-pair for the mapped `Thread` 112 * provided. if one does not exist then it is 113 * created first and then returned. 114 * 115 * Throws: 116 * `FatalException` on creating the pipe-pair 117 * if needs be 118 * Params: 119 * thread = the `Thread` to ensure for 120 * Returns: the pipe-pair as an `int[]` 121 */ 122 private int[2] pipeExistenceEnsure(Thread thread) 123 { 124 int[2] pipePair; 125 126 /* Lock the pipe-pairs */ 127 pipesLock.lock(); 128 129 /* On successful return or error */ 130 scope(exit) 131 { 132 /* Unlock the pipe-pairs */ 133 pipesLock.unlock(); 134 } 135 136 /* If it is not in the pair, create a pipe-pair and save it */ 137 if(!(thread in pipes)) 138 { 139 pipes[thread] = newPipe(); 140 } 141 142 /* Grab the pair */ 143 pipePair = pipes[thread]; 144 145 return pipePair; 146 } 147 148 /** 149 * Waits for the time specified, returning `true` 150 * if awoken, `false` on timeout (if specified as 151 * non-zero). 152 * 153 * Throws: 154 * `InterruptedException` if the `wait()` was 155 * interrupted for some reason. 156 * Throws: 157 * `FatalException` on fatal error with the 158 * underlying mechanism. 159 * 160 * Params: 161 * timestruct = the `timeval*` to indicate timeout period 162 * Returns: `true` if awoken, `false` on timeout 163 */ 164 private final bool wait(timeval* timestruct) 165 { 166 /* Get the thread object (TID) for the calling thread */ 167 Thread callingThread = Thread.getThis(); 168 169 /* Checks if a pipe-pair exists, if not creates it */ 170 int[2] pipePair = pipeExistenceEnsure(callingThread); 171 172 173 /* Get the read-end of the pipe fd */ 174 int readFD = pipePair[0]; 175 176 // NOTE: Not sure why but nfdsmust be the highest fd number that is being monitored+1 177 // ... so in our case that must be `pipePair[0]+1` 178 179 /** 180 * Setup the fd_set for read file descriptors 181 * 182 * 1. Initialize the struct with FD_ZERO 183 * 2. Add the file descriptor of interest i.e. `readFD` 184 */ 185 fd_set readFDs; 186 fdSetZero(&readFDs); 187 fdSetSet(readFD, &readFDs); 188 189 /** 190 * Now block till we have a change in `readFD`'s state 191 * (i.e. it becomes readbale without a block). However, 192 * if a timeout was specified we can then return after 193 * said timeout. 194 */ 195 int status = select(readFD+1, &readFDs, null, null, timestruct); 196 197 /** 198 * The number of Fds (1 in this case) ready for reading is returned. 199 * 200 * This means that if we have: 201 * 202 * 1. `1` returned that then `readFD` is available for reading 203 * 204 * If timeout was 0 (timeval* is NULL) then it blocks till readable 205 * and hence the status would then be non-zero. The only way it can 206 * be `0` is if the timeout was non-zero (timeval* non-NULL) meaning 207 * it returned after timing out and nothing changed in any fd_set(s) 208 * (nothing became readable) 209 */ 210 if(status == 0) 211 { 212 return false; 213 } 214 /** 215 * On error doing `select()` 216 * 217 * We check the `errno` as this could be an error resulting 218 * from an unblock due to a signal having been received, in 219 * that specific case we don't want to throw an error but rather 220 * an interrupted exception. 221 */ 222 else if(status == -1) 223 { 224 // Retrieve the kind-of error 225 int errKind = errno(); 226 227 version(dbg) 228 { 229 import std.stdio; 230 writeln("select() interrupted, errno: ", errKind); 231 } 232 233 // Handle as an interrupt 234 if(errKind == EINTR) 235 { 236 throw new InterruptedException(this); 237 } 238 // Anything else is a legitimate error 239 else 240 { 241 throw new FatalException(this, FatalError.WAIT_FAILURE, "Error selecting pipe fd '"~to!(string)(readFD)~"' when trying to wait(), got errno '"~to!(string)(errKind)); 242 } 243 } 244 /* On success */ 245 else 246 { 247 /* Get the read end and read 1 byte (won't block) */ 248 byte singleBuff; 249 ptrdiff_t readCount = read(readFD, &singleBuff, 1); // TODO: We could get EINTR'd here too 250 251 /* If we did not read 1 byte then there was an error (either 1 or -1) */ 252 if(readCount != 1) 253 { 254 throw new FatalException(this, FatalError.WAIT_FAILURE, "Error reading pipe fd '"~to!(string)(readFD)~"' when trying to wait()"); 255 } 256 257 return true; 258 } 259 260 // TODO: Then perform read to remove the status of "readbale" 261 // ... such that the next call to select still blocks if a notify() 262 // ... is yet to be called 263 264 265 } 266 267 268 /** 269 * Determines whether this event is ready or not, useful for checking if 270 * a wait would block if called relatively soon 271 * 272 * Returns: true if it would block, false otherwise 273 * 274 * TODO: Test this and write a unit test (it has not yet been tested) 275 */ 276 private final bool wouldWait() 277 { 278 /* Would we wait? */ 279 bool waitStatus; 280 281 /* Get the thread object (TID) for the calling thread */ 282 Thread callingThread = Thread.getThis(); 283 284 /* Checks if a pipe-pair exists, if not creates it */ 285 int[2] pipePair = pipeExistenceEnsure(callingThread); 286 287 288 /* Get the read-end of the pipe fd */ 289 int readFD = pipePair[0]; 290 291 /** 292 * Setup the fd_set for read file descriptors 293 * 294 * 1. Initialize the struct with FD_ZERO 295 * 2. Add the file descriptor of interest i.e. `readFD` 296 */ 297 fd_set readFDs; 298 fdSetZero(&readFDs); 299 fdSetSet(readFD, &readFDs); 300 301 /** 302 * Now we set a timeout that is very low so we can return 303 * very quickly, and then determine if within the deadline 304 * it became readable ("would not wait") or we exceeded the deadline and it 305 * was not readable ("would wait") 306 */ 307 timeval timestruct; 308 timestruct.tv_sec = 0; 309 timestruct.tv_usec = 1; 310 int status = select(readFD+1, &readFDs, null, null, ×truct); 311 312 /* If we timed out (i.e. "it would wait") */ 313 if(status == 0) 314 { 315 return true; 316 } 317 /* TODO: Handle select() errors */ 318 else if(status == -1) 319 { 320 // TODO: Handle this 321 return false; 322 } 323 /* If we have a number of fds readable (only 1) (i.e. "it would NOT wait") */ 324 else 325 { 326 return false; 327 } 328 329 330 331 // return waitStatus; 332 } 333 334 /** 335 * Waits for the time specified, returning `true` 336 * if awoken, `false` on timeout 337 * 338 * Params: 339 * duration = the `Duration` to indicate timeout period 340 * Returns: `true` if awoken, `false` on timeout 341 * Throws: 342 * `FatalException` on fatal error with the 343 * underlying mechanism 344 * Throws: 345 * `InterruptedException` if the `wait()` was 346 * interrupted for some reason 347 */ 348 public final bool wait(Duration duration) 349 { 350 /* Split out the duration into seconds and microseconds */ 351 time_t seconds; 352 suseconds_t microseconds; 353 duration.split!("seconds", "msecs")(seconds, microseconds); 354 355 version(dbg) 356 { 357 /* If debugging enable, then print out these duirng compilation */ 358 pragma(msg, time_t); 359 pragma(msg, suseconds_t); 360 } 361 362 /* Generate the timeval struct */ 363 timeval timestruct; 364 timestruct.tv_sec = seconds; 365 timestruct.tv_usec = microseconds; 366 367 /* Call wait with this time duration */ 368 return wait(×truct); 369 } 370 371 /** 372 * Wakes up a single thread specified 373 * 374 * Params: 375 * thread = the Thread to wake up 376 * Throws: 377 * `FatalException` if the underlying 378 * mechanism failed to notify 379 */ 380 public final void notify(Thread thread) 381 { 382 // TODO: Throw error if the thread is not found 383 384 /* Lock the pipe-pairs */ 385 pipesLock.lock(); 386 387 /* If the thread provided is wait()-ing on this event */ 388 if(thread in pipes) 389 { 390 /* Obtain the pipe pair for this thread */ 391 int[2] pipePair = pipes[thread]; 392 393 /* Obtain the write FD */ 394 int pipeWriteEnd = pipePair[1]; 395 396 /* Write a single byte to it */ 397 byte wakeByte = 69; 398 write(pipeWriteEnd, &wakeByte, 1); // TODO: Collect status and if bad, unlock, throw exception 399 } 400 /* If the thread provided is NOT wait()-ing on this event */ 401 else 402 { 403 // TODO: Make this error configurable, maybe a non-fail mode should ne implementwd 404 if(!nonFail) 405 { 406 /* Unlock the pipe-pairs */ 407 pipesLock.unlock(); 408 409 throw new FatalException(this, FatalError.NOTIFY_FAILURE, "Provided thread has yet to call wait() atleast once"); 410 } 411 } 412 413 /* Unlock the pipe-pairs */ 414 pipesLock.unlock(); 415 } 416 417 /** 418 * Wakes up all threads waiting on this event 419 * 420 * Throws: 421 * `FatalException` if the underlying 422 * mechanism failed to notify 423 */ 424 public final void notifyAll() 425 { 426 /* Lock the pipe-pairs */ 427 pipesLock.lock(); 428 429 /* Loop through each thread */ 430 foreach(Thread curThread; pipes.keys()) 431 { 432 /* Notify the current thread */ 433 notify(curThread); 434 } 435 436 /* Unlock the pipe-pairs */ 437 pipesLock.unlock(); 438 } 439 440 /** 441 * Creates a new pipe-pair and returns it 442 * 443 * Throws: 444 * `FatalException` on error creating the pipe 445 * Returns: the pipe-pair as an `int[]` 446 */ 447 private int[2] newPipe() 448 { 449 /* Allocate space for the two FDs */ 450 int[2] pipePair; 451 452 /* Create a new pipe and put the fd of the read end in [0] and write end in [1] */ 453 int status = pipe(pipePair.ptr); 454 455 /* If the pipe creation failed */ 456 if(status != 0) 457 { 458 // Throw an exception is pipe creation failed 459 throw new FatalException(this, FatalError.WAIT_FAILURE, "Could not initialize the pipe"); 460 } 461 462 return pipePair; 463 } 464 } 465 466 version(unittest) 467 { 468 import std.conv : to; 469 import std.stdio : writeln; 470 } 471 472 /** 473 * Basic example of two threads, `thread1` and `thread2`, 474 * which will wait on the `event`. We will then, from the 475 * main thread, notify them all (causing them all to wake up) 476 */ 477 unittest 478 { 479 Event event = new Event(); 480 481 class TestThread : Thread 482 { 483 private Event event; 484 485 this(Event event) 486 { 487 super(&worker); 488 this.event = event; 489 } 490 491 public void worker() 492 { 493 writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting..."); 494 event.wait(); 495 writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting... [done]"); 496 } 497 } 498 499 TestThread thread1 = new TestThread(event); 500 thread1.start(); 501 502 TestThread thread2 = new TestThread(event); 503 thread2.start(); 504 505 Thread.sleep(dur!("seconds")(10)); 506 writeln("Main thread is going to notify two threads"); 507 508 509 // TODO: Add assert to check 510 511 /* Wake up all sleeping on this event */ 512 event.notifyAll(); 513 514 /* Wait for all threads to exit */ 515 thread1.join(); 516 thread2.join(); 517 } 518 519 /** 520 * An example of trying to `notify()` a thread 521 * which isn't registered (has never been `ensure()`'d 522 * or had `wait()` called from it at least once) 523 */ 524 unittest 525 { 526 Event event = new Event(); 527 528 class MyThread : Thread 529 { 530 this() 531 { 532 super(&worker); 533 } 534 535 public void worker() {} 536 } 537 Thread thread1 = new MyThread(); 538 539 try 540 { 541 /* Wake up a thread which isn't waiting (or ever registered) */ 542 event.notify(thread1); 543 544 assert(false); 545 } 546 catch(SnoozeError e) 547 { 548 assert(true); 549 } 550 } 551 552 /** 553 * Here we have an example of a thread which waits 554 * on `event` but never gets notified but because 555 * we are using a timeout-based wait it will unblock 556 * after the timeout 557 */ 558 unittest 559 { 560 Event event = new Event(); 561 562 class TestThread : Thread 563 { 564 private Event event; 565 566 this(Event event) 567 { 568 super(&worker); 569 this.event = event; 570 } 571 572 public void worker() 573 { 574 writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting..."); 575 576 /* Here we test timeout, we never notify so this should timeout and return false */ 577 assert(event.wait(dur!("seconds")(2)) == false); 578 writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting... [done]"); 579 } 580 } 581 582 TestThread thread1 = new TestThread(event); 583 thread1.start(); 584 585 /* Wait for the thread to exit */ 586 thread1.join(); 587 } 588 589 590 591 // unittest 592 // { 593 // Event event = new Event(); 594 595 // class TestThread : Thread 596 // { 597 // private Event event; 598 599 // this(Event event) 600 // { 601 // super(&worker); 602 // this.event = event; 603 // } 604 605 // public void worker() 606 // { 607 // writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting..."); 608 609 // try 610 // { 611 // /* Wait */ 612 // event.wait(); 613 // } 614 // catch(InterruptedException e) 615 // { 616 // writeln("Had an interrupt"); 617 // } 618 // } 619 // } 620 621 // TestThread thread1 = new TestThread(event); 622 // thread1.start(); 623 624 // Thread.sleep(dur!("seconds")(10)); 625 // writeln("Main thread is going to notify two threads"); 626 627 628 629 630 // // TODO: Add assert to check 631 632 // /* Wake up all sleeping on this event */ 633 // // event.notifyAll(); 634 635 // import core.thread.osthread : getpid; 636 // writeln("Cur pid: ", getpid()); 637 638 // /* Wait for all threads to exit */ 639 // thread1.join(); 640 // }