1 /**
2  * The `Event` type and facilities
3  */
4 module libsnooze.event;
5 
6 // TODO: Would be nice if this built without unit tests failing
7 // ... so I'd like libsnooze.clib to work as my IDE picks up on
8 // ... it then
9 
10 version(release)
11 {
12 	import libsnooze.clib : pipe, write, read;
13 	import libsnooze.clib : select, fd_set, fdSetZero, fdSetSet;
14 	import libsnooze.clib : timeval, time_t, suseconds_t;
15 }
16 else
17 {
18 	import clib : pipe, write, read;
19 	import clib : select, fd_set, fdSetZero, fdSetSet;
20 	import clib : timeval, time_t, suseconds_t;
21 }
22 
23 import core.thread : Thread, Duration, dur;
24 import core.sync.mutex : Mutex;
25 import libsnooze.exceptions;
26 import std.conv : to;
27 import core.stdc.errno;
28 
29 /** 
30  * Represents an object you can wait and notify/notifyAll on
31  */
32 public class Event
33 {
34 	/* Array of [readFD, writeFD] pairs/arrays */
35 	private int[2][Thread] pipes;
36 	private Mutex pipesLock;
37 
38 	private bool nonFail = false;
39 
40 	/** 
41 	 * Constructs a new Event
42 	 */
43 	this()
44 	{
45 		internalInit();
46 	}
47 
48 	private void internalInit()
49 	{
50 		version(Linux)
51 		{
52 			// TODO: Switch to eventfd in the future
53 			initPipe();
54 			pragma(msg, "Buulding on linux uses the `pipe(int*)` system call");
55 		}
56 		else version(Windows)
57 		{
58 			throw SnoozeError("Platform Windows is not supported");
59 		}
60 		else
61 		{
62 			initPipe();
63 		}
64 	}
65 
66 	private void initPipe()
67 	{
68 		/* Create a lock for the pipe-pair array */
69 		pipesLock = new Mutex();
70 	}
71 
72 	/** 
73 	 * Wait on this event indefinately
74 	 *
75 	 * Throws:
76 	 *   `InterruptedException` if the `wait()`
77 	 * was interrupted for some reason
78 	 * Throws:
79 	 *   `FatalException` if a fatal error with
80 	 * the underlying mechanism occurs
81 	 */
82 	public final void wait()
83 	{
84 		/* Wait with no timeout (hence the null) */
85 		wait(null);
86 	}
87 
88 	/** 
89 	 * Ensures that the calling Thread gets a registered
90 	 * pipe added for it when called.
91 	 *
92 	 * This can be useful if one wants to initialize several
93 	 * threads that should be able to all be notified and wake up
94 	 * on their first call to wait instead of having wait
95 	 * ensure the pipe is created on first call.
96 	 *
97 	 * Throws:
98 	 *   `FatalException` on creating the pipe-pair
99 	 * if needs be
100 	 */
101 	public final void ensure()
102 	{
103 		/* Get the thread object (TID) for the calling thread */
104 		Thread callingThread = Thread.getThis();
105 
106 		/* Checks if a pipe-pair exists, if not creates it */
107 		pipeExistenceEnsure(callingThread);
108 	}
109 
110 	/** 
111 	 * Returns the pipe-pair for the mapped `Thread`
112 	 * provided. if one does not exist then it is
113 	 * created first and then returned.
114 	 *
115 	 * Throws:
116 	 *   `FatalException` on creating the pipe-pair
117 	 * if needs be
118 	 * Params:
119 	 *   thread = the `Thread` to ensure for
120 	 * Returns: the pipe-pair as an `int[]`
121 	 */
122 	private int[2] pipeExistenceEnsure(Thread thread)
123 	{
124 		int[2] pipePair;
125 
126 		/* Lock the pipe-pairs */
127 		pipesLock.lock();
128 
129 		/* On successful return or error */
130 		scope(exit)
131 		{
132 			/* Unlock the pipe-pairs */
133 			pipesLock.unlock();
134 		}
135 
136 		/* If it is not in the pair, create a pipe-pair and save it */
137 		if(!(thread in pipes))
138 		{
139 			pipes[thread] = newPipe();
140 		}
141 
142 		/* Grab the pair */
143 		pipePair = pipes[thread];
144 
145 		return pipePair;
146 	}
147 	
148 	/** 
149 	 * Waits for the time specified, returning `true`
150 	 * if awoken, `false` on timeout (if specified as
151 	 * non-zero).
152 	 *
153 	 * Throws:
154 	 *   `InterruptedException` if the `wait()` was
155 	 * interrupted for some reason.
156 	 * Throws:
157 	 *   `FatalException` on fatal error with the
158 	 * underlying mechanism.
159 	 *
160 	 * Params:
161 	 *   timestruct = the `timeval*` to indicate timeout period
162 	 * Returns: `true` if awoken, `false` on timeout
163 	 */
164 	private final bool wait(timeval* timestruct)
165 	{
166 		/* Get the thread object (TID) for the calling thread */
167 		Thread callingThread = Thread.getThis();
168 
169 		/* Checks if a pipe-pair exists, if not creates it */
170 		int[2] pipePair = pipeExistenceEnsure(callingThread);
171 
172 
173 		/* Get the read-end of the pipe fd */
174 		int readFD = pipePair[0];
175 
176 		// NOTE: Not sure why but nfdsmust be the highest fd number that is being monitored+1
177 		// ... so in our case that must be `pipePair[0]+1`
178 
179 		/** 
180 		 * Setup the fd_set for read file descriptors
181 		 * 
182 		 * 1. Initialize the struct with FD_ZERO
183 		 * 2. Add the file descriptor of interest i.e. `readFD`
184 		 */
185 		fd_set readFDs;
186 		fdSetZero(&readFDs);
187 		fdSetSet(readFD, &readFDs);
188 
189 		/** 
190 		 * Now block till we have a change in `readFD`'s state
191 		 * (i.e. it becomes readbale without a block). However,
192 		 * if a timeout was specified we can then return after
193 		 * said timeout.
194 		 */
195 		int status = select(readFD+1, &readFDs, null, null, timestruct);
196 
197 		/** 
198 		 * The number of Fds (1 in this case) ready for reading is returned.
199 		 *
200 		 * This means that if we have:
201 		 *
202 		 * 1. `1` returned that then `readFD` is available for reading
203 		 *
204 		 * If timeout was 0 (timeval* is NULL) then it blocks till readable
205 		 * and hence the status would then be non-zero. The only way it can
206 		 * be `0` is if the timeout was non-zero (timeval* non-NULL) meaning
207 		 * it returned after timing out and nothing changed in any fd_set(s)
208 		 * (nothing became readable)
209 		 */
210 		if(status == 0)
211 		{
212 			return false;
213 		}
214 		/**
215 		 * On error doing `select()`
216 		 *
217 		 * We check the `errno` as this could be an error resulting
218 		 * from an unblock due to a signal having been received, in
219 		 * that specific case we don't want to throw an error but rather
220 		 * an interrupted exception.
221 		 */
222 		else if(status == -1)
223 		{
224 			// Retrieve the kind-of error
225 			int errKind = errno();
226 
227 			version(dbg)
228 			{
229 				import std.stdio;
230 				writeln("select() interrupted, errno: ", errKind);
231 			}
232 
233 			// Handle as an interrupt
234 			if(errKind == EINTR)
235 			{
236 				throw new InterruptedException(this);
237 			}
238 			// Anything else is a legitimate error
239 			else
240 			{
241 				throw new FatalException(this, FatalError.WAIT_FAILURE, "Error selecting pipe fd '"~to!(string)(readFD)~"' when trying to wait(), got errno '"~to!(string)(errKind)); 
242 			}
243 		}
244 		/* On success */
245 		else
246 		{
247 			/* Get the read end and read 1 byte (won't block) */
248 			byte singleBuff;
249 			ptrdiff_t readCount = read(readFD, &singleBuff, 1); // TODO: We could get EINTR'd here too
250 
251 			/* If we did not read 1 byte then there was an error (either 1 or -1) */
252 			if(readCount != 1)
253 			{
254 				throw new FatalException(this, FatalError.WAIT_FAILURE, "Error reading pipe fd '"~to!(string)(readFD)~"' when trying to wait()");
255 			}
256 
257 			return true;
258 		}
259 
260 		// TODO: Then perform read to remove the status of "readbale"
261 		// ... such that the next call to select still blocks if a notify()
262 		// ... is yet to be called
263 
264 		
265 	}
266 
267 
268 	/** 
269 	 * Determines whether this event is ready or not, useful for checking if
270 	 * a wait would block if called relatively soon
271 	 *
272 	 * Returns: true if it would block, false otherwise
273 	 *
274 	 * TODO: Test this and write a unit test (it has not yet been tested)
275 	 */
276 	private final bool wouldWait()
277 	{
278 		/* Would we wait? */
279 		bool waitStatus;
280 
281 		/* Get the thread object (TID) for the calling thread */
282 		Thread callingThread = Thread.getThis();
283 
284 		/* Checks if a pipe-pair exists, if not creates it */
285 		int[2] pipePair = pipeExistenceEnsure(callingThread);
286 
287 
288 		/* Get the read-end of the pipe fd */
289 		int readFD = pipePair[0];
290 
291 		/** 
292 		 * Setup the fd_set for read file descriptors
293 		 * 
294 		 * 1. Initialize the struct with FD_ZERO
295 		 * 2. Add the file descriptor of interest i.e. `readFD`
296 		 */
297 		fd_set readFDs;
298 		fdSetZero(&readFDs);
299 		fdSetSet(readFD, &readFDs);
300 
301 		/** 
302 		 * Now we set a timeout that is very low so we can return
303 		 * very quickly, and then determine if within the deadline
304 		 * it became readable ("would not wait") or we exceeded the deadline and it 
305 		 * was not readable ("would wait")
306 		 */
307 		timeval timestruct;
308 		timestruct.tv_sec = 0;
309 		timestruct.tv_usec = 1;
310 		int status = select(readFD+1, &readFDs, null, null, &timestruct);
311 
312 		/* If we timed out (i.e. "it would wait") */
313 		if(status == 0)
314 		{
315 			return true;
316 		}
317 		/* TODO: Handle select() errors */
318 		else if(status == -1)
319 		{
320 			// TODO: Handle this
321 			return false;
322 		}
323 		/* If we have a number of fds readable (only 1) (i.e. "it would NOT wait") */
324 		else
325 		{
326 			return false;
327 		}
328 
329 
330 
331 		// return waitStatus;
332 	}
333 
334 	/** 
335 	 * Waits for the time specified, returning `true`
336 	 * if awoken, `false` on timeout
337 	 *
338 	 * Params:
339 	 *   duration = the `Duration` to indicate timeout period
340 	 * Returns: `true` if awoken, `false` on timeout
341 	 * Throws:
342 	 *   `FatalException` on fatal error with the
343 	 * underlying mechanism
344 	 * Throws:
345 	 *   `InterruptedException` if the `wait()` was
346 	 * interrupted for some reason
347 	 */
348 	public final bool wait(Duration duration)
349 	{
350 		/* Split out the duration into seconds and microseconds */
351 		time_t seconds;
352 		suseconds_t microseconds;
353 		duration.split!("seconds", "msecs")(seconds, microseconds);
354 
355 		version(dbg)
356 		{
357 			/* If debugging enable, then print out these duirng compilation */
358 			pragma(msg, time_t);
359 			pragma(msg, suseconds_t);
360 		}
361 		
362 		/* Generate the timeval struct */
363 		timeval timestruct;
364 		timestruct.tv_sec = seconds;
365 		timestruct.tv_usec = microseconds;
366 
367 		/* Call wait with this time duration */
368 		return wait(&timestruct);
369 	}
370 
371 	/** 
372 	 * Wakes up a single thread specified
373 	 *
374 	 * Params:
375 	 *   thread = the Thread to wake up
376 	 * Throws:
377 	 *   `FatalException` if the underlying
378 	 * mechanism failed to notify
379 	 */
380 	public final void notify(Thread thread)
381 	{
382 		// TODO: Throw error if the thread is not found
383 
384 		/* Lock the pipe-pairs */
385 		pipesLock.lock();
386 
387 		/* If the thread provided is wait()-ing on this event */
388 		if(thread in pipes)
389 		{
390 			/* Obtain the pipe pair for this thread */
391 			int[2] pipePair = pipes[thread];
392 
393 			/* Obtain the write FD */
394 			int pipeWriteEnd = pipePair[1];
395 
396 			/* Write a single byte to it */
397 			byte wakeByte = 69;
398 			write(pipeWriteEnd, &wakeByte, 1); // TODO: Collect status and if bad, unlock, throw exception
399 		}
400 		/* If the thread provided is NOT wait()-ing on this event */
401 		else
402 		{
403 			// TODO: Make this error configurable, maybe a non-fail mode should ne implementwd
404 			if(!nonFail)
405 			{
406 				/* Unlock the pipe-pairs */
407 				pipesLock.unlock();
408 
409 				throw new FatalException(this, FatalError.NOTIFY_FAILURE, "Provided thread has yet to call wait() atleast once");
410 			}	
411 		}
412 
413 		/* Unlock the pipe-pairs */
414 		pipesLock.unlock();
415 	}
416 
417 	/** 
418 	 * Wakes up all threads waiting on this event
419 	 *
420 	 * Throws:
421 	 *   `FatalException` if the underlying
422 	 * mechanism failed to notify
423 	 */
424 	public final void notifyAll()
425 	{
426 		/* Lock the pipe-pairs */
427 		pipesLock.lock();
428 
429 		/* Loop through each thread */
430 		foreach(Thread curThread; pipes.keys())
431 		{
432 			/* Notify the current thread */
433 			notify(curThread);
434 		}
435 
436 		/* Unlock the pipe-pairs */
437 		pipesLock.unlock();
438 	}
439 
440 	/** 
441 	 * Creates a new pipe-pair and returns it
442 	 *
443 	 * Throws:
444 	 *   `FatalException` on error creating the pipe
445 	 * Returns: the pipe-pair as an `int[]`
446 	 */
447 	private int[2] newPipe()
448 	{
449 		/* Allocate space for the two FDs */
450 		int[2] pipePair;
451 
452 		/* Create a new pipe and put the fd of the read end in [0] and write end in [1] */
453 		int status = pipe(pipePair.ptr);
454 
455 		/* If the pipe creation failed */
456 		if(status != 0)
457 		{
458 			// Throw an exception is pipe creation failed
459 			throw new FatalException(this, FatalError.WAIT_FAILURE, "Could not initialize the pipe");
460 		}
461 
462 		return pipePair;
463 	}
464 }
465 
466 version(unittest)
467 {
468 	import std.conv : to;
469 	import std.stdio : writeln;
470 }
471 
472 /**
473  * Basic example of two threads, `thread1` and `thread2`,
474  * which will wait on the `event`. We will then, from the
475  * main thread, notify them all (causing them all to wake up)
476  */
477 unittest
478 {
479 	Event event = new Event();
480 
481 	class TestThread : Thread
482 	{
483 		private Event event;
484 
485 		this(Event event)
486 		{
487 			super(&worker);
488 			this.event = event;
489 		}
490 
491 		public void worker()
492 		{
493 			writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting...");
494 			event.wait();
495 			writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting... [done]");
496 		}
497 	}
498 
499 	TestThread thread1 = new TestThread(event);
500 	thread1.start();
501 
502 	TestThread thread2 = new TestThread(event);
503 	thread2.start();
504 
505 	Thread.sleep(dur!("seconds")(10));
506 	writeln("Main thread is going to notify two threads");
507 
508 
509 	// TODO: Add assert to check
510 
511 	/* Wake up all sleeping on this event */
512 	event.notifyAll();
513 
514 	/* Wait for all threads to exit */
515 	thread1.join();
516 	thread2.join();
517 }
518 
519 /**
520  * An example of trying to `notify()` a thread
521  * which isn't registered (has never been `ensure()`'d
522  * or had `wait()` called from it at least once)
523  */
524 unittest
525 {
526 	Event event = new Event();
527 
528 	class MyThread : Thread
529 	{
530 		this()
531 		{
532 			super(&worker);
533 		}
534 
535 		public void worker() {}
536 	}
537 	Thread thread1 = new MyThread();
538 
539 	try
540 	{
541 		/* Wake up a thread which isn't waiting (or ever registered) */
542 		event.notify(thread1);
543 
544 		assert(false);
545 	}
546 	catch(SnoozeError e)
547 	{
548 		assert(true);
549 	}
550 }
551 
552 /**
553  * Here we have an example of a thread which waits
554  * on `event` but never gets notified but because
555  * we are using a timeout-based wait it will unblock
556  * after the timeout
557  */
558 unittest
559 {
560 	Event event = new Event();
561 
562 	class TestThread : Thread
563 	{
564 		private Event event;
565 
566 		this(Event event)
567 		{
568 			super(&worker);
569 			this.event = event;
570 		}
571 
572 		public void worker()
573 		{
574 			writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting...");
575 
576 			/* Here we test timeout, we never notify so this should timeout and return false */
577 			assert(event.wait(dur!("seconds")(2)) == false);
578 			writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting... [done]");
579 		}
580 	}
581 
582 	TestThread thread1 = new TestThread(event);
583 	thread1.start();
584 
585 	/* Wait for the thread to exit */
586 	thread1.join();
587 }
588 
589 
590 
591 // unittest
592 // {
593 // 	Event event = new Event();
594 
595 // 	class TestThread : Thread
596 // 	{
597 // 		private Event event;
598 
599 // 		this(Event event)
600 // 		{
601 // 			super(&worker);
602 // 			this.event = event;
603 // 		}
604 
605 // 		public void worker()
606 // 		{
607 // 			writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting...");
608 
609 // 			try
610 // 			{
611 // 				/* Wait */
612 // 				event.wait();
613 // 			}
614 // 			catch(InterruptedException e)
615 // 			{
616 // 				writeln("Had an interrupt");
617 // 			}
618 // 		}
619 // 	}
620 
621 // 	TestThread thread1 = new TestThread(event);
622 // 	thread1.start();
623 
624 // 	Thread.sleep(dur!("seconds")(10));
625 // 	writeln("Main thread is going to notify two threads");
626 
627 	
628 
629 
630 // 	// TODO: Add assert to check
631 
632 // 	/* Wake up all sleeping on this event */
633 // 	// event.notifyAll();
634 
635 // 	import core.thread.osthread : getpid;
636 // 	writeln("Cur pid: ", getpid());
637 
638 // 	/* Wait for all threads to exit */
639 // 	thread1.join();
640 // }