1 /** 2 * The `Event` type and facilities 3 */ 4 module libsnooze.event; 5 6 // TODO: Would be nice if this built without unit tests failing 7 // ... so I'd like libsnooze.clib to work as my IDE picks up on 8 // ... it then 9 10 version(release) 11 { 12 import libsnooze.clib : pipe, write, read; 13 import libsnooze.clib : select, fd_set, fdSetZero, fdSetSet; 14 import libsnooze.clib : timeval, time_t, suseconds_t; 15 } 16 else 17 { 18 import clib : pipe, write, read; 19 import clib : select, fd_set, fdSetZero, fdSetSet; 20 import clib : timeval, time_t, suseconds_t; 21 } 22 23 import core.thread : Thread, Duration, dur; 24 import core.sync.mutex : Mutex; 25 import libsnooze.exceptions : SnoozeError; 26 import std.conv : to; 27 28 /** 29 * Represents an object you can wait and notify/notifyAll on 30 */ 31 public class Event 32 { 33 /* Array of [readFD, writeFD] pairs/arrays */ 34 private int[2][Thread] pipes; 35 private Mutex pipesLock; 36 37 private bool nonFail = false; 38 39 /** 40 * Constructs a new Event 41 */ 42 this() 43 { 44 internalInit(); 45 } 46 47 private void internalInit() 48 { 49 version(Linux) 50 { 51 // TODO: Switch to eventfd in the future 52 initPipe(); 53 pragma(msg, "Buulding on linux uses the `pipe(int*)` system call"); 54 } 55 else version(Windows) 56 { 57 throw SnoozeError("Platform Windows is not supported"); 58 } 59 else 60 { 61 initPipe(); 62 } 63 } 64 65 private void initPipe() 66 { 67 /* Create a lock for the pipe-pair array */ 68 pipesLock = new Mutex(); 69 } 70 71 /** 72 * Wait on this event indefinately 73 */ 74 public final void wait() 75 { 76 wait(null); 77 } 78 79 /** 80 * Ensures that the calling Thread gets a registered 81 * pipe added for it when called. 82 * 83 * This can be useful if one wants to initialize several 84 * threads that should be able to all be notified and wake up 85 * on their first call to wait instead of having wait 86 * ensure the pipe is created on first call. 87 */ 88 public final void ensure() 89 { 90 /* Get the thread object (TID) for the calling thread */ 91 Thread callingThread = Thread.getThis(); 92 93 /* Lock the pipe-pairs */ 94 pipesLock.lock(); 95 96 /* Checks if a pipe-pair exists, if not creates it */ 97 // TODO: Add a catch here, then unlock, rethrow 98 int[2] pipePair = pipeExistenceEnsure(callingThread); 99 100 /* Unlock the pipe-pairs */ 101 pipesLock.unlock(); 102 } 103 104 105 // TODO: Make this a method we can call actually 106 private int[2] pipeExistenceEnsure(Thread thread) 107 { 108 int[2] pipePair; 109 110 /* Lock the pipe-pairs */ 111 pipesLock.lock(); 112 113 /* If it is not in the pair, create a pipe-pair and save it */ 114 if(!(thread in pipes)) 115 { 116 // TODO: Add a catch here, then unlock then rethrow 117 pipes[thread] = newPipe(); //TODO: If bad (exception) use scopre guard too 118 } 119 120 /* Grab the pair */ 121 pipePair = pipes[thread]; 122 123 /* Unlock the pipe-pairs */ 124 pipesLock.unlock(); 125 126 return pipePair; 127 } 128 129 // NOTE: Returns true on woken, false on timeout 130 private final bool wait(timeval* timestruct) 131 { 132 /* Get the thread object (TID) for the calling thread */ 133 Thread callingThread = Thread.getThis(); 134 135 /* Lock the pipe-pairs */ 136 pipesLock.lock(); 137 138 /* Checks if a pipe-pair exists, if not creates it */ 139 // TODO: Add a catch here, then unlock, rethrow 140 int[2] pipePair = pipeExistenceEnsure(callingThread); 141 142 /* Unlock the pipe-pairs */ 143 pipesLock.unlock(); 144 145 146 /* Get the read-end of the pipe fd */ 147 int readFD = pipePair[0]; 148 149 // TODO: IO/queue block using select with a timeout 150 // select(); 151 152 // NOTE: Not sure why but nfdsmust be the highest fd number that is being monitored+1 153 // ... so in our case that must be `pipePair[0]+1` 154 155 // Setup the fd_set for read fs struct 156 157 /** 158 * Setup the fd_set for read file descriptors 159 * 160 * 1. Initialize the struct with FD_ZERO 161 * 2. Add the file descriptor of interest i.e. `readFD` 162 */ 163 fd_set readFDs; 164 fdSetZero(&readFDs); 165 fdSetSet(readFD, &readFDs); 166 167 /** 168 * Now block till we have a change in `readFD`'s state 169 * (i.e. it becomes readbale without a block). However, 170 * if a timeout was specified we can then return after 171 * said timeout. 172 */ 173 int status = select(readFD+1, &readFDs, null, null, timestruct); 174 175 /** 176 * The number of Fds (1 in this case) ready for reading is returned. 177 * 178 * This means that if we have: 179 * 180 * 1. `1` returned that then `readFD` is available for reading 181 * 182 * If timeout was 0 (timeval* is NULL) then it blocks till readable 183 * and hence the status would then be non-zero. The only way it can 184 * be `0` is if the timeout was non-zero (timeval* non-NULL) meaning 185 * it returned after timing out and nothing changed in any fd_set(s) 186 * (nothing became readable) 187 */ 188 if(status == 0) 189 { 190 return false; 191 } 192 /* On error */ 193 else if(status == -1) 194 { 195 // TODO: Here we need to check for errno (Weekend fix) 196 throw new SnoozeError("Error selecting pipe fd '"~to!(string)(readFD)~"' when trying to wait()"); 197 } 198 /* On success */ 199 else 200 { 201 /* Get the read end and read 1 byte (won't block) */ 202 byte singleBuff; 203 ptrdiff_t readCount = read(readFD, &singleBuff, 1); 204 205 /* If we did not read 1 byte then there was an error (either 1 or -1) */ 206 if(readCount != 1) 207 { 208 throw new SnoozeError("Error reading pipe fd '"~to!(string)(readFD)~"' when trying to wait()"); 209 } 210 211 return true; 212 } 213 214 // TODO: Then perform read to remove the status of "readbale" 215 // ... such that the next call to select still blocks if a notify() 216 // ... is yet to be called 217 218 219 } 220 221 222 /** 223 * Determines whether this event is ready or not, useful for checking if 224 * a wait would block if called relatively soon 225 * 226 * Returns: true if it would block, false otherwise 227 * 228 * TODO: Test this and write a unit test (it has not yet been tested) 229 */ 230 private final bool wouldWait() 231 { 232 /* Would we wait? */ 233 bool waitStatus; 234 235 /* Get the thread object (TID) for the calling thread */ 236 Thread callingThread = Thread.getThis(); 237 238 /* Lock the pipe-pairs */ 239 pipesLock.lock(); 240 241 /* Checks if a pipe-pair exists, if not creates it */ 242 // TODO: Add a catch here, then unlock, rethrow 243 int[2] pipePair = pipeExistenceEnsure(callingThread); 244 245 /* Unlock the pipe-pairs */ 246 pipesLock.unlock(); 247 248 249 /* Get the read-end of the pipe fd */ 250 int readFD = pipePair[0]; 251 252 /** 253 * Setup the fd_set for read file descriptors 254 * 255 * 1. Initialize the struct with FD_ZERO 256 * 2. Add the file descriptor of interest i.e. `readFD` 257 */ 258 fd_set readFDs; 259 fdSetZero(&readFDs); 260 fdSetSet(readFD, &readFDs); 261 262 /** 263 * Now we set a timeout that is very low so we can return 264 * very quickly, and then determine if within the deadline 265 * it became readable ("would not wait") or we exceeded the deadline and it 266 * was not readable ("would wait") 267 */ 268 timeval timestruct; 269 timestruct.tv_sec = 0; 270 timestruct.tv_usec = 1; 271 int status = select(readFD+1, &readFDs, null, null, ×truct); 272 273 /* If we timed out (i.e. "it would wait") */ 274 if(status == 0) 275 { 276 return true; 277 } 278 /* TODO: Handle select() errors */ 279 else if(status == -1) 280 { 281 // TODO: Handle this 282 return false; 283 } 284 /* If we have a number of fds readable (only 1) (i.e. "it would NOT wait") */ 285 else 286 { 287 return false; 288 } 289 290 291 292 // return waitStatus; 293 } 294 295 /** 296 * Waits on the event with a given timeout 297 * 298 * Params: 299 * duration = the timeout 300 */ 301 public final bool wait(Duration duration) 302 { 303 /* Split out the duration into seconds and microseconds */ 304 time_t seconds; 305 suseconds_t microseconds; 306 duration.split!("seconds", "msecs")(seconds, microseconds); 307 308 version(dbg) 309 { 310 /* If debugging enable, then print out these duirng compilation */ 311 pragma(msg, time_t); 312 pragma(msg, suseconds_t); 313 } 314 315 /* Generate the timeval struct */ 316 timeval timestruct; 317 timestruct.tv_sec = seconds; 318 timestruct.tv_usec = microseconds; 319 320 /* Call wait with this time duration */ 321 return wait(×truct); 322 } 323 324 /** 325 * Wakes up a single thread specified 326 * 327 * Params: 328 * thread = the Thread to wake up 329 */ 330 public final void notify(Thread thread) 331 { 332 // TODO: Throw error if the thread is not found 333 334 /* Lock the pipe-pairs */ 335 pipesLock.lock(); 336 337 /* If the thread provided is wait()-ing on this event */ 338 if(thread in pipes) 339 { 340 /* Obtain the pipe pair for this thread */ 341 int[2] pipePair = pipes[thread]; 342 343 /* Obtain the write FD */ 344 int pipeWriteEnd = pipePair[1]; 345 346 /* Write a single byte to it */ 347 byte wakeByte = 69; 348 write(pipeWriteEnd, &wakeByte, 1); // TODO: Collect status and if bad, unlock, throw exception 349 } 350 /* If the thread provided is NOT wait()-ing on this event */ 351 else 352 { 353 // TODO: Make this error configurable, maybe a non-fail mode should ne implementwd 354 if(!nonFail) 355 { 356 /* Unlock the pipe-pairs */ 357 pipesLock.unlock(); 358 359 throw new SnoozeError("Provided thread has yet to call wait() atleast once"); 360 } 361 } 362 363 /* Unlock the pipe-pairs */ 364 pipesLock.unlock(); 365 } 366 367 /** 368 * Wakes up all threads waiting on this event 369 */ 370 public final void notifyAll() 371 { 372 /* Lock the pipe-pairs */ 373 pipesLock.lock(); 374 375 /* Loop through each thread */ 376 foreach(Thread curThread; pipes.keys()) 377 { 378 /* Notify the current thread */ 379 notify(curThread); 380 } 381 382 /* Unlock the pipe-pairs */ 383 pipesLock.unlock(); 384 } 385 386 private int[2] newPipe() 387 { 388 /* Allocate space for the two FDs */ 389 int[2] pipePair; 390 391 // /* Create a new pipe and put the fd of the read end in [0] and write end in [1] */ 392 int status = pipe(pipePair.ptr); 393 394 /* If the pipe creation failed */ 395 if(status != 0) 396 { 397 // Throw an exception is pipe creation failed 398 throw new SnoozeError("Could not initialize the pipe"); 399 } 400 401 return pipePair; 402 } 403 } 404 405 version(unittest) 406 { 407 import std.conv : to; 408 import std.stdio : writeln; 409 } 410 411 unittest 412 { 413 Event event = new Event(); 414 415 class TestThread : Thread 416 { 417 private Event event; 418 419 this(Event event) 420 { 421 super(&worker); 422 this.event = event; 423 } 424 425 public void worker() 426 { 427 writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting..."); 428 event.wait(); 429 writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting... [done]"); 430 } 431 } 432 433 TestThread thread1 = new TestThread(event); 434 thread1.start(); 435 436 TestThread thread2 = new TestThread(event); 437 thread2.start(); 438 439 Thread.sleep(dur!("seconds")(10)); 440 writeln("Main thread is going to notify two threads"); 441 442 443 // TODO: Add assert to check 444 445 /* Wake up all sleeping on this event */ 446 event.notifyAll(); 447 448 /* Wait for all threads to exit */ 449 thread1.join(); 450 thread2.join(); 451 } 452 453 unittest 454 { 455 Event event = new Event(); 456 457 class MyThread : Thread 458 { 459 this() 460 { 461 super(&worker); 462 } 463 464 public void worker() {} 465 } 466 Thread thread1 = new MyThread(); 467 468 try 469 { 470 /* Wake up a thread which isn't waiting (or ever registered) */ 471 event.notify(thread1); 472 473 assert(false); 474 } 475 catch(SnoozeError e) 476 { 477 assert(true); 478 } 479 } 480 481 unittest 482 { 483 Event event = new Event(); 484 485 class TestThread : Thread 486 { 487 private Event event; 488 489 this(Event event) 490 { 491 super(&worker); 492 this.event = event; 493 } 494 495 public void worker() 496 { 497 writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting..."); 498 499 /* Here we test timeout, we never notify so this should timeout and return false */ 500 assert(event.wait(dur!("seconds")(2)) == false); 501 writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting... [done]"); 502 } 503 } 504 505 TestThread thread1 = new TestThread(event); 506 thread1.start(); 507 508 /* Wait for the thread to exit */ 509 thread1.join(); 510 }