1 /**
2  * The `Event` type and facilities
3  */
4 module libsnooze.event;
5 
6 // TODO: Would be nice if this built without unit tests failing
7 // ... so I'd like libsnooze.clib to work as my IDE picks up on
8 // ... it then
9 
10 version(release)
11 {
12 	import libsnooze.clib : pipe, write, read;
13 	import libsnooze.clib : select, fd_set, fdSetZero, fdSetSet;
14 	import libsnooze.clib : timeval, time_t, suseconds_t;
15 }
16 else
17 {
18 	import clib : pipe, write, read;
19 	import clib : select, fd_set, fdSetZero, fdSetSet;
20 	import clib : timeval, time_t, suseconds_t;
21 }
22 
23 import core.thread : Thread, Duration, dur;
24 import core.sync.mutex : Mutex;
25 import libsnooze.exceptions : SnoozeError;
26 import std.conv : to;
27 
28 /** 
29  * Represents an object you can wait and notify/notifyAll on
30  */
31 public class Event
32 {
33 	/* Array of [readFD, writeFD] pairs/arrays */
34 	private int[2][Thread] pipes;
35 	private Mutex pipesLock;
36 
37 	private bool nonFail = false;
38 
39 	/** 
40 	 * Constructs a new Event
41 	 */
42 	this()
43 	{
44 		internalInit();
45 	}
46 
47 	private void internalInit()
48 	{
49 		version(Linux)
50 		{
51 			// TODO: Switch to eventfd in the future
52 			initPipe();
53 			pragma(msg, "Buulding on linux uses the `pipe(int*)` system call");
54 		}
55 		else version(Windows)
56 		{
57 			throw SnoozeError("Platform Windows is not supported");
58 		}
59 		else
60 		{
61 			initPipe();
62 		}
63 	}
64 
65 	private void initPipe()
66 	{
67 		/* Create a lock for the pipe-pair array */
68 		pipesLock = new Mutex();
69 	}
70 
71 	/** 
72 	 * Wait on this event indefinately
73 	 */
74 	public final void wait()
75 	{
76 		wait(null);
77 	}
78 
79 	/** 
80 	 * Ensures that the calling Thread gets a registered
81 	 * pipe added for it when called.
82 	 *
83 	 * This can be useful if one wants to initialize several
84 	 * threads that should be able to all be notified and wake up
85 	 * on their first call to wait instead of having wait
86 	 * ensure the pipe is created on first call.
87 	 */
88 	public final void ensure()
89 	{
90 		/* Get the thread object (TID) for the calling thread */
91 		Thread callingThread = Thread.getThis();
92 
93 		/* Lock the pipe-pairs */
94 		pipesLock.lock();
95 
96 		/* Checks if a pipe-pair exists, if not creates it */
97 		// TODO: Add a catch here, then unlock, rethrow
98 		int[2] pipePair = pipeExistenceEnsure(callingThread);
99 
100 		/* Unlock the pipe-pairs */
101 		pipesLock.unlock();
102 	}
103 
104 
105 	// TODO: Make this a method we can call actually
106 	private int[2] pipeExistenceEnsure(Thread thread)
107 	{
108 		int[2] pipePair;
109 
110 		/* Lock the pipe-pairs */
111 		pipesLock.lock();
112 
113 		/* If it is not in the pair, create a pipe-pair and save it */
114 		if(!(thread in pipes))
115 		{
116 			// TODO: Add a catch here, then unlock then rethrow
117 			pipes[thread] = newPipe();  //TODO: If bad (exception) use scopre guard too
118 		}
119 
120 		/* Grab the pair */
121 		pipePair = pipes[thread];
122 
123 		/* Unlock the pipe-pairs */
124 		pipesLock.unlock();
125 
126 		return pipePair;
127 	}
128 
129 	// NOTE: Returns true on woken, false on timeout
130 	private final bool wait(timeval* timestruct)
131 	{
132 		/* Get the thread object (TID) for the calling thread */
133 		Thread callingThread = Thread.getThis();
134 
135 		/* Lock the pipe-pairs */
136 		pipesLock.lock();
137 
138 		/* Checks if a pipe-pair exists, if not creates it */
139 		// TODO: Add a catch here, then unlock, rethrow
140 		int[2] pipePair = pipeExistenceEnsure(callingThread);
141 
142 		/* Unlock the pipe-pairs */
143 		pipesLock.unlock();
144 
145 
146 		/* Get the read-end of the pipe fd */
147 		int readFD = pipePair[0];
148 
149 		// TODO: IO/queue block using select with a timeout
150 		// select();
151 
152 		// NOTE: Not sure why but nfdsmust be the highest fd number that is being monitored+1
153 		// ... so in our case that must be `pipePair[0]+1`
154 
155 		// Setup the fd_set for read fs struct
156 
157 		/** 
158 		 * Setup the fd_set for read file descriptors
159 		 * 
160 		 * 1. Initialize the struct with FD_ZERO
161 		 * 2. Add the file descriptor of interest i.e. `readFD`
162 		 */
163 		fd_set readFDs;
164 		fdSetZero(&readFDs);
165 		fdSetSet(readFD, &readFDs);
166 
167 		/** 
168 		 * Now block till we have a change in `readFD`'s state
169 		 * (i.e. it becomes readbale without a block). However,
170 		 * if a timeout was specified we can then return after
171 		 * said timeout.
172 		 */
173 		int status = select(readFD+1, &readFDs, null, null, timestruct);
174 
175 		/** 
176 		 * The number of Fds (1 in this case) ready for reading is returned.
177 		 *
178 		 * This means that if we have:
179 		 *
180 		 * 1. `1` returned that then `readFD` is available for reading
181 		 *
182 		 * If timeout was 0 (timeval* is NULL) then it blocks till readable
183 		 * and hence the status would then be non-zero. The only way it can
184 		 * be `0` is if the timeout was non-zero (timeval* non-NULL) meaning
185 		 * it returned after timing out and nothing changed in any fd_set(s)
186 		 * (nothing became readable)
187 		 */
188 		if(status == 0)
189 		{
190 			return false;
191 		}
192 		/* On error */
193 		else if(status == -1)
194 		{
195 			// TODO: Here we need to check for errno (Weekend fix)
196 			throw new SnoozeError("Error selecting pipe fd '"~to!(string)(readFD)~"' when trying to wait()"); 
197 		}
198 		/* On success */
199 		else
200 		{
201 			/* Get the read end and read 1 byte (won't block) */
202 			byte singleBuff;
203 			ptrdiff_t readCount = read(readFD, &singleBuff, 1);
204 
205 			/* If we did not read 1 byte then there was an error (either 1 or -1) */
206 			if(readCount != 1)
207 			{
208 				throw new SnoozeError("Error reading pipe fd '"~to!(string)(readFD)~"' when trying to wait()");
209 			}
210 
211 			return true;
212 		}
213 
214 		// TODO: Then perform read to remove the status of "readbale"
215 		// ... such that the next call to select still blocks if a notify()
216 		// ... is yet to be called
217 
218 		
219 	}
220 
221 
222 	/** 
223 	 * Determines whether this event is ready or not, useful for checking if
224 	 * a wait would block if called relatively soon
225 	 *
226 	 * Returns: true if it would block, false otherwise
227 	 *
228 	 * TODO: Test this and write a unit test (it has not yet been tested)
229 	 */
230 	private final bool wouldWait()
231 	{
232 		/* Would we wait? */
233 		bool waitStatus;
234 
235 		/* Get the thread object (TID) for the calling thread */
236 		Thread callingThread = Thread.getThis();
237 
238 		/* Lock the pipe-pairs */
239 		pipesLock.lock();
240 
241 		/* Checks if a pipe-pair exists, if not creates it */
242 		// TODO: Add a catch here, then unlock, rethrow
243 		int[2] pipePair = pipeExistenceEnsure(callingThread);
244 
245 		/* Unlock the pipe-pairs */
246 		pipesLock.unlock();
247 
248 
249 		/* Get the read-end of the pipe fd */
250 		int readFD = pipePair[0];
251 
252 		/** 
253 		 * Setup the fd_set for read file descriptors
254 		 * 
255 		 * 1. Initialize the struct with FD_ZERO
256 		 * 2. Add the file descriptor of interest i.e. `readFD`
257 		 */
258 		fd_set readFDs;
259 		fdSetZero(&readFDs);
260 		fdSetSet(readFD, &readFDs);
261 
262 		/** 
263 		 * Now we set a timeout that is very low so we can return
264 		 * very quickly, and then determine if within the deadline
265 		 * it became readable ("would not wait") or we exceeded the deadline and it 
266 		 * was not readable ("would wait")
267 		 */
268 		timeval timestruct;
269 		timestruct.tv_sec = 0;
270 		timestruct.tv_usec = 1;
271 		int status = select(readFD+1, &readFDs, null, null, &timestruct);
272 
273 		/* If we timed out (i.e. "it would wait") */
274 		if(status == 0)
275 		{
276 			return true;
277 		}
278 		/* TODO: Handle select() errors */
279 		else if(status == -1)
280 		{
281 			// TODO: Handle this
282 			return false;
283 		}
284 		/* If we have a number of fds readable (only 1) (i.e. "it would NOT wait") */
285 		else
286 		{
287 			return false;
288 		}
289 
290 
291 
292 		// return waitStatus;
293 	}
294 
295 	/** 
296 	 * Waits on the event with a given timeout
297 	 *
298 	 * Params:
299 	 *   duration = the timeout
300 	 */
301 	public final bool wait(Duration duration)
302 	{
303 		/* Split out the duration into seconds and microseconds */
304 		time_t seconds;
305 		suseconds_t microseconds;
306 		duration.split!("seconds", "msecs")(seconds, microseconds);
307 
308 		version(dbg)
309 		{
310 			/* If debugging enable, then print out these duirng compilation */
311 			pragma(msg, time_t);
312 			pragma(msg, suseconds_t);
313 		}
314 		
315 		/* Generate the timeval struct */
316 		timeval timestruct;
317 		timestruct.tv_sec = seconds;
318 		timestruct.tv_usec = microseconds;
319 
320 		/* Call wait with this time duration */
321 		return wait(&timestruct);
322 	}
323 
324 	/** 
325 	 * Wakes up a single thread specified
326 	 *
327 	 * Params:
328 	 *   thread = the Thread to wake up
329 	 */
330 	public final void notify(Thread thread)
331 	{
332 		// TODO: Throw error if the thread is not found
333 
334 		/* Lock the pipe-pairs */
335 		pipesLock.lock();
336 
337 		/* If the thread provided is wait()-ing on this event */
338 		if(thread in pipes)
339 		{
340 			/* Obtain the pipe pair for this thread */
341 			int[2] pipePair = pipes[thread];
342 
343 			/* Obtain the write FD */
344 			int pipeWriteEnd = pipePair[1];
345 
346 			/* Write a single byte to it */
347 			byte wakeByte = 69;
348 			write(pipeWriteEnd, &wakeByte, 1); // TODO: Collect status and if bad, unlock, throw exception
349 		}
350 		/* If the thread provided is NOT wait()-ing on this event */
351 		else
352 		{
353 			// TODO: Make this error configurable, maybe a non-fail mode should ne implementwd
354 			if(!nonFail)
355 			{
356 				/* Unlock the pipe-pairs */
357 				pipesLock.unlock();
358 
359 				throw new SnoozeError("Provided thread has yet to call wait() atleast once");
360 			}	
361 		}
362 
363 		/* Unlock the pipe-pairs */
364 		pipesLock.unlock();
365 	}
366 
367 	/** 
368 	 * Wakes up all threads waiting on this event
369 	 */
370 	public final void notifyAll()
371 	{
372 		/* Lock the pipe-pairs */
373 		pipesLock.lock();
374 
375 		/* Loop through each thread */
376 		foreach(Thread curThread; pipes.keys())
377 		{
378 			/* Notify the current thread */
379 			notify(curThread);
380 		}
381 
382 		/* Unlock the pipe-pairs */
383 		pipesLock.unlock();
384 	}
385 
386 	private int[2] newPipe()
387 	{
388 		/* Allocate space for the two FDs */
389 		int[2] pipePair;
390 
391 		// /* Create a new pipe and put the fd of the read end in [0] and write end in [1] */
392 		int status = pipe(pipePair.ptr);
393 
394 		/* If the pipe creation failed */
395 		if(status != 0)
396 		{
397 			// Throw an exception is pipe creation failed
398 			throw new SnoozeError("Could not initialize the pipe");
399 		}
400 
401 		return pipePair;
402 	}
403 }
404 
405 version(unittest)
406 {
407 	import std.conv : to;
408 	import std.stdio : writeln;
409 }
410 
411 unittest
412 {
413 	Event event = new Event();
414 
415 	class TestThread : Thread
416 	{
417 		private Event event;
418 
419 		this(Event event)
420 		{
421 			super(&worker);
422 			this.event = event;
423 		}
424 
425 		public void worker()
426 		{
427 			writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting...");
428 			event.wait();
429 			writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting... [done]");
430 		}
431 	}
432 
433 	TestThread thread1 = new TestThread(event);
434 	thread1.start();
435 
436 	TestThread thread2 = new TestThread(event);
437 	thread2.start();
438 
439 	Thread.sleep(dur!("seconds")(10));
440 	writeln("Main thread is going to notify two threads");
441 
442 
443 	// TODO: Add assert to check
444 
445 	/* Wake up all sleeping on this event */
446 	event.notifyAll();
447 
448 	/* Wait for all threads to exit */
449 	thread1.join();
450 	thread2.join();
451 }
452 
453 unittest
454 {
455 	Event event = new Event();
456 
457 	class MyThread : Thread
458 	{
459 		this()
460 		{
461 			super(&worker);
462 		}
463 
464 		public void worker() {}
465 	}
466 	Thread thread1 = new MyThread();
467 
468 	try
469 	{
470 		/* Wake up a thread which isn't waiting (or ever registered) */
471 		event.notify(thread1);
472 
473 		assert(false);
474 	}
475 	catch(SnoozeError e)
476 	{
477 		assert(true);
478 	}
479 }
480 
481 unittest
482 {
483 	Event event = new Event();
484 
485 	class TestThread : Thread
486 	{
487 		private Event event;
488 
489 		this(Event event)
490 		{
491 			super(&worker);
492 			this.event = event;
493 		}
494 
495 		public void worker()
496 		{
497 			writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting...");
498 
499 			/* Here we test timeout, we never notify so this should timeout and return false */
500 			assert(event.wait(dur!("seconds")(2)) == false);
501 			writeln("("~to!(string)(Thread.getThis().id())~") Thread is waiting... [done]");
502 		}
503 	}
504 
505 	TestThread thread1 = new TestThread(event);
506 	thread1.start();
507 
508 	/* Wait for the thread to exit */
509 	thread1.join();
510 }