Line data Source code
1 : /*
2 : * AsynchronousTools.cc
3 : *
4 : * Created on: Nov 1, 2010
5 : * Author: jjacobs
6 : */
7 :
8 : #include <assert.h>
9 : #include <cstdarg>
10 : #include <cstring>
11 : #include <errno.h>
12 : #include <fstream>
13 : #include <queue>
14 : #include <semaphore.h>
15 : #include <fcntl.h>
16 :
17 : #include <time.h>
18 : #include <casacore/casa/aips.h>
19 : #if defined(AIPS_LINUX)
20 : #if ! defined(_GNU_SOURCE)
21 : #define _GNU_SOURCE /* or _BSD_SOURCE or _SVID_SOURCE */
22 : #endif
23 : #include <unistd.h>
24 : #include <sys/syscall.h> /* For SYS_xxx definitions */
25 : #endif
26 : #include <sys/time.h>
27 :
28 : #include <casacore/casa/Exceptions/Error.h>
29 : #include <casacore/casa/Logging/LogIO.h>
30 :
31 : #include <condition_variable>
32 : #include <mutex>
33 : #include <thread>
34 :
35 : #include "AsynchronousTools.h"
36 : #include <stdcasa/UtilJ.h>
37 :
38 : using namespace std;
39 : using namespace casacore;
40 : using namespace casa::utilj;
41 :
42 : namespace casa {
43 :
44 : namespace async {
45 :
46 : class ConditionImpl {
47 :
48 : friend class Condition;
49 :
50 : private:
51 :
52 0 : ConditionImpl () : condition_p () {}
53 :
54 : std::condition_variable condition_p;
55 : };
56 :
57 : class MutexImpl {
58 :
59 : friend class Mutex;
60 : friend class Condition;
61 :
62 : private:
63 :
64 0 : MutexImpl () : mutex_p () {}
65 0 : ~MutexImpl () {}
66 :
67 : std::thread::id lockingThreadId_p;
68 : std::mutex mutex_p;
69 : };
70 :
71 : class SemaphoreImpl {
72 :
73 : friend class Semaphore;
74 :
75 : private:
76 :
77 0 : SemaphoreImpl () : semaphore_p (NULL) {}
78 :
79 : sem_t * semaphore_p; // [use]
80 : };
81 :
82 : struct timespec
83 0 : convertMsDeltaToTimespec (Int milliseconds)
84 : {
85 : // Get the time with a possible accuracy to microseconds and
86 : // then convert it into the timeout's data structure used by
87 : // pthreads
88 :
89 : struct timeval tVal;
90 0 : gettimeofday (& tVal, NULL);
91 :
92 : struct timespec t;
93 0 : t.tv_sec = tVal.tv_sec;
94 0 : t.tv_nsec = tVal.tv_usec * 1000;
95 :
96 : // Add the wait time in milliseconds to this structure
97 : // taking care to handle carry out and avoid overflow
98 :
99 0 : t.tv_sec += milliseconds / 1000;
100 : // extract seconds and add them in
101 :
102 0 : t.tv_nsec += (milliseconds % 1000) * 1000000L;
103 : // extract ms, convert to ns and add in
104 :
105 0 : t.tv_sec += t.tv_nsec / 1000000000L;
106 : // carry out of the ns field into seconds
107 :
108 0 : t.tv_nsec %= 1000000000L;
109 : // remove any seconds carried out of ns field
110 :
111 0 : return t;
112 : }
113 :
114 :
115 0 : Condition::Condition ()
116 : {
117 0 : impl_p = new ConditionImpl ();
118 0 : }
119 :
120 0 : Condition::~Condition ()
121 : {
122 0 : delete impl_p;
123 0 : }
124 :
125 : void
126 0 : Condition::broadcast ()
127 : {
128 0 : notify_all ();
129 0 : }
130 :
131 : void
132 0 : Condition::notify_all ()
133 : {
134 0 : impl_p->condition_p.notify_all ();
135 0 : }
136 :
137 : void
138 0 : Condition::notify_one ()
139 : {
140 0 : impl_p->condition_p.notify_one ();
141 0 : }
142 :
143 : void
144 0 : Condition::signal ()
145 : {
146 0 : notify_one ();
147 0 : }
148 :
149 : void
150 0 : Condition::wait (UniqueLock & uniqueLock)
151 : {
152 0 : impl_p->condition_p.wait (uniqueLock.uniqueLock_p);
153 0 : }
154 : /*
155 : Bool
156 : Condition::wait (Mutex & mutex, int milliseconds)
157 : {
158 : Assert (milliseconds >= 0); // weird if it's negative
159 :
160 : struct timespec t = convertMsDeltaToTimespec (milliseconds);
161 : int code = pthread_cond_timedwait (impl_p->condition_p, mutex.getRep(), & t);
162 :
163 : bool gotWait = true;
164 : if (code == ETIMEDOUT){
165 : gotWait = false;
166 : }
167 : else{
168 : ThrowIfError (code, String::format ("Condition::wait (%d)", milliseconds));
169 : }
170 :
171 : return gotWait;
172 : }
173 : */
174 :
175 0 : LockGuard::LockGuard (Mutex & mutex)
176 : {
177 0 : mutex_p = & mutex;
178 0 : mutex_p->lock ();
179 0 : }
180 :
181 0 : LockGuard::LockGuard (Mutex * mutex)
182 : {
183 0 : Assert (mutex != NULL);
184 :
185 0 : mutex_p = mutex;
186 0 : mutex_p->lock ();
187 0 : }
188 :
189 0 : LockGuard::~LockGuard ()
190 : {
191 0 : mutex_p->unlock ();
192 0 : }
193 :
194 0 : LockGuardInverse::LockGuardInverse (Mutex & mutex)
195 : {
196 0 : mutex_p = & mutex;
197 0 : mutex_p->unlock ();
198 0 : }
199 :
200 0 : LockGuardInverse::LockGuardInverse (Mutex * mutex)
201 : {
202 0 : Assert (mutex != NULL);
203 :
204 0 : mutex_p = mutex;
205 0 : mutex_p->unlock ();
206 0 : }
207 :
208 0 : LockGuardInverse::LockGuardInverse (LockGuard & lg)
209 : {
210 0 : mutex_p = lg.mutex_p;
211 0 : mutex_p->unlock();
212 0 : }
213 :
214 :
215 0 : LockGuardInverse::~LockGuardInverse ()
216 : {
217 0 : mutex_p->lock ();
218 0 : }
219 :
220 :
221 :
222 : Logger* Logger::singleton_p = NULL;
223 :
224 0 : Logger::Logger ()
225 : : loggingStarted_p (false),
226 0 : nameMutex_p (new Mutex ())
227 0 : {}
228 :
229 0 : Logger::~Logger ()
230 : {
231 0 : if (loggingStarted_p){
232 0 : delete get();
233 : }
234 0 : }
235 :
236 : std::once_flag loggerOnceFlag;
237 :
238 : Logger*
239 0 : Logger::get()
240 : {
241 0 : std::call_once (loggerOnceFlag, initialize);
242 :
243 0 : return singleton_p;
244 : }
245 :
246 : void
247 0 : Logger::initialize ()
248 : {
249 0 : singleton_p = new Logger ();
250 0 : }
251 :
252 : void
253 0 : Logger::log (const char * format, ...)
254 : {
255 : va_list vaList;
256 0 : va_start (vaList, format);
257 :
258 : char buffer[4096];
259 :
260 : // Create the text to be logged
261 :
262 0 : vsnprintf (buffer, sizeof (buffer), format, vaList);
263 :
264 : // Grab the timestamp and pid (for IDing threads)
265 :
266 0 : String threadNameText;
267 :
268 : {
269 0 : MutexLocker ml (* nameMutex_p);
270 :
271 0 : pthread_t tid = pthread_self();
272 0 : ThreadNames::iterator threadName = threadNames_p.find (tid);
273 0 : if (threadName != threadNames_p.end ()){
274 0 : threadNameText = String (" [") + (threadName->second) + "] : ";
275 : }
276 : else{
277 0 : threadNameText = String::format (" [0x%08x] : ", tid);
278 : }
279 : }
280 :
281 0 : String prefix = utilj::getTimestamp() + threadNameText;
282 :
283 : // Allocate a buffer to put into the queue
284 :
285 0 : string outputText = prefix + buffer;
286 :
287 0 : va_end (vaList);
288 :
289 : // Lock the queue, push on the block of text and increment
290 : // the drain semaphore
291 :
292 0 : loggerThread_p -> log (outputText); // ownership passes to the thread
293 0 : }
294 :
295 : void
296 0 : Logger::registerName (const String & threadName)
297 : {
298 0 : Assert (nameMutex_p != NULL);
299 :
300 0 : MutexLocker ml (* nameMutex_p);
301 :
302 0 : threadNames_p [pthread_self()] = threadName;
303 0 : }
304 :
305 : void
306 0 : Logger::start (const char * filename)
307 : {
308 0 : if (! loggingStarted_p){ // ignore multiple starts
309 :
310 0 : loggerThread_p = new LoggerThread ();
311 :
312 0 : loggerThread_p ->setLogFilename (filename == NULL ? "" : filename);
313 :
314 0 : loggerThread_p ->startThread();
315 :
316 0 : loggingStarted_p = true;
317 : }
318 0 : }
319 :
320 0 : Logger::LoggerThread::LoggerThread ()
321 0 : {}
322 :
323 0 : Logger::LoggerThread::~LoggerThread ()
324 : {
325 0 : terminate();
326 :
327 0 : this->join();
328 :
329 0 : if (deleteStream_p)
330 : {
331 0 : dynamic_cast<ofstream *> (logStream_p)->close();
332 0 : delete logStream_p;
333 : }
334 0 : }
335 :
336 : void
337 0 : Logger::LoggerThread::log (const string & text)
338 : {
339 0 : MutexLocker m (mutex_p);
340 :
341 0 : outputQueue_p.push (text);
342 :
343 0 : loggerChanged_p.notify_all ();
344 0 : }
345 :
346 :
347 : void *
348 0 : Logger::LoggerThread::run ()
349 : {
350 0 : LogIO logIo (LogOrigin ("Logger::LoggerThread"));
351 0 : logIo << "starting execution; tid=" << gettid() << endl << LogIO::POST;
352 :
353 :
354 : try {
355 : // Determine where to write the logging info. If nothing is specified or either "cerr" or
356 : // "stdout" are specified then use standard error. If "cout" or "stdout" are specified then
357 : // use standard out. Otherwise open the specified file and write to that.
358 :
359 0 : if (logFilename_p.empty () || logFilename_p == "cerr" || logFilename_p == "stderr"){
360 0 : logStream_p = & cerr;
361 0 : deleteStream_p = false;
362 : }
363 0 : else if (logFilename_p == "cout" || logFilename_p == "stdout"){
364 0 : logStream_p = & cout;
365 0 : deleteStream_p = false;
366 : }
367 : else{
368 0 : logStream_p = new ofstream (logFilename_p.c_str(), ios::out);
369 0 : deleteStream_p = true;
370 : }
371 :
372 0 : * logStream_p << utilj::getTimestamp() << ": Logging started, tid=" << gettid() << endl;
373 :
374 : // Loop waiting on the drain semaphore. This should be incremented once
375 : // every time users add a block of text to the queue.
376 :
377 : while (true){
378 :
379 0 : string text;
380 :
381 : {
382 : // Pop the front block of output off of the queue
383 : // Keep mutex locked while accessing queue.
384 :
385 0 : UniqueLock uniqueLock (mutex_p);
386 :
387 0 : while (! isTerminationRequested() && outputQueue_p.empty()){
388 0 : loggerChanged_p.wait (uniqueLock);
389 : }
390 :
391 0 : if (isTerminationRequested() && outputQueue_p.empty()){
392 0 : break;
393 : }
394 :
395 0 : text = outputQueue_p.front();
396 :
397 0 : outputQueue_p.pop();
398 : }
399 :
400 : // Now output the text and then delete the storage
401 :
402 0 : * logStream_p << text;
403 :
404 0 : logStream_p->flush();
405 0 : }
406 :
407 0 : * logStream_p << "*** Logging terminated" << endl;
408 :
409 0 : logStream_p->flush();
410 :
411 0 : return NULL;
412 : }
413 0 : catch (exception & e){
414 :
415 0 : const char * message = "*** Logging thread caught exception: ";
416 :
417 0 : cerr << message << e.what() << endl;
418 0 : cerr.flush();
419 :
420 0 : if (logStream_p != & cerr){
421 :
422 0 : * logStream_p << message << e.what() << endl;
423 0 : logStream_p->flush();
424 : }
425 :
426 0 : throw;
427 : }
428 0 : catch (...){
429 :
430 0 : const char * message = "*** Logging thread caught unknown exception";
431 :
432 0 : cerr << message << endl;
433 0 : cerr.flush();
434 :
435 0 : if (logStream_p != & cerr){
436 0 : * logStream_p << message << endl;
437 0 : logStream_p->flush();
438 : }
439 :
440 0 : throw;
441 : }
442 : }
443 :
444 : void
445 0 : Logger::LoggerThread::setLogFilename (const String & filename)
446 : {
447 0 : logFilename_p = filename;
448 0 : }
449 :
450 : void
451 0 : Logger::LoggerThread::terminate ()
452 : {
453 0 : Thread::terminate();
454 :
455 0 : loggerChanged_p.notify_all ();
456 0 : }
457 :
458 0 : Mutex::Mutex ()
459 : {
460 0 : impl_p = new MutexImpl ();
461 0 : isLocked_p = false;
462 0 : }
463 :
464 0 : Mutex::~Mutex ()
465 : {
466 0 : delete impl_p;
467 0 : }
468 :
469 : std::mutex &
470 0 : Mutex::getMutex ()
471 : {
472 0 : return impl_p->mutex_p;
473 : }
474 :
475 : //Bool
476 : //Mutex::isLockedByThisThread () const
477 : //{
478 : // // Only for use in debugs or asserts
479 : //
480 : // Bool itIs = isLocked_p && std::this_thread::get_id () == impl_p->lockingThreadId_p;
481 : //
482 : // return itIs;
483 : //}
484 :
485 : void
486 0 : Mutex::lock ()
487 : {
488 0 : impl_p->mutex_p.lock();
489 0 : impl_p->lockingThreadId_p = std::this_thread::get_id ();
490 0 : isLocked_p = true;
491 0 : }
492 :
493 : /*
494 : Bool
495 : Mutex::lock (Int milliseconds)
496 : {
497 :
498 : Assert (milliseconds >= 0); // weird if it's negative
499 :
500 : struct timespec t = convertMsDeltaToTimespec (milliseconds);
501 : int code = pthread_mutex_timedlock (impl_p->mutex_p, & t);
502 :
503 : bool gotLock = true;
504 : if (code == ETIMEDOUT){
505 : gotLock = false;
506 : }
507 : else{
508 : ThrowIfError (code, String::format ("Mutex::lock (%d)", milliseconds));
509 : }
510 :
511 : return gotLock;
512 : }
513 : */
514 :
515 : Bool
516 0 : Mutex::trylock ()
517 : {
518 0 : bool gotLock = impl_p->mutex_p.try_lock ();
519 0 : isLocked_p = gotLock;
520 0 : if (isLocked_p){
521 0 : impl_p->lockingThreadId_p = std::this_thread::get_id ();
522 : }
523 :
524 0 : return gotLock;
525 : }
526 :
527 : void
528 0 : Mutex::unlock ()
529 : {
530 0 : isLocked_p = false;
531 0 : impl_p->mutex_p.unlock ();
532 0 : }
533 :
534 : // jagonzal: Useful when locking is mandatory
535 : void
536 0 : Mutex::acquirelock()
537 : {
538 0 : while (!trylock())
539 : {
540 0 : sched_yield();
541 : }
542 0 : }
543 :
544 0 : MutexLocker::MutexLocker (Mutex & mutex)
545 0 : : mutex_p (& mutex)
546 : {
547 0 : mutex_p->lock();
548 0 : }
549 :
550 0 : MutexLocker::MutexLocker (Mutex * mutex)
551 0 : : mutex_p (mutex)
552 : {
553 0 : Assert (mutex_p != NULL);
554 :
555 0 : mutex_p->lock();
556 0 : }
557 :
558 0 : MutexLocker::~MutexLocker ()
559 : {
560 0 : mutex_p->unlock();
561 0 : }
562 :
563 0 : Semaphore::Semaphore (int initialValue)
564 : {
565 0 : Assert (initialValue >= 0);
566 :
567 0 : impl_p = new SemaphoreImpl ();
568 :
569 : // Since Mac doesn't support unnamed semaphores, try and find a
570 : // unique name for the semaphore. Names will be of the form
571 : // "/Semaphore_xxx"
572 :
573 0 : int code = 0;
574 0 : int i = 0;
575 :
576 0 : do {
577 :
578 0 : ++ i;
579 :
580 0 : name_p = String::format ("/CasaAsync_%03d", i);
581 0 : impl_p->semaphore_p = sem_open (name_p.c_str(), O_CREAT | O_EXCL, 0700, initialValue);//new sem_t;
582 0 : code = (impl_p->semaphore_p == SEM_FAILED) ? errno : 0;
583 :
584 0 : } while (impl_p->semaphore_p == SEM_FAILED && code == EEXIST);
585 :
586 0 : ThrowIfError (code, "Semaphore::open: name='" + name_p + "'");
587 0 : }
588 :
589 0 : Semaphore::~Semaphore ()
590 : {
591 0 : int code = sem_close (impl_p->semaphore_p);
592 0 : ThrowIfError (code == 0 ? 0 : errno, "Semaphore::close");
593 :
594 0 : code = sem_unlink (name_p.c_str());
595 0 : ThrowIfError (code == 0 ? 0 : errno, "Semaphore::unlink: name='" + name_p + "'");
596 :
597 0 : delete impl_p;
598 0 : }
599 :
600 : Int
601 0 : Semaphore::getValue ()
602 : {
603 : int value;
604 0 : int code = sem_getvalue (impl_p->semaphore_p, & value);
605 0 : ThrowIfError (code == 0 ? 0 : errno, "Semaphore::getValue");
606 :
607 0 : return value;
608 : }
609 :
610 : void
611 0 : Semaphore::post ()
612 : {
613 0 : int code = sem_post (impl_p->semaphore_p);
614 0 : ThrowIfError (code == 0 ? 0 : errno, "Semaphore::post");
615 0 : }
616 :
617 : Bool
618 0 : Semaphore::trywait ()
619 : {
620 0 : int code = sem_trywait (impl_p->semaphore_p);
621 0 : bool gotSemaphore = true;
622 :
623 0 : if (code != 0 && errno == EAGAIN){
624 0 : gotSemaphore = false;
625 : }
626 : else{
627 0 : ThrowIfError (code == 0 ? 0 : errno, "Semaphore::wait");
628 : }
629 :
630 0 : return gotSemaphore;
631 : }
632 :
633 : void
634 0 : Semaphore::wait ()
635 : {
636 0 : int errorCode = 0;
637 : int code;
638 :
639 0 : do {
640 :
641 0 : code = sem_wait (impl_p->semaphore_p);
642 0 : errorCode = errno;
643 :
644 0 : } while (code != 0 && errorCode == EINTR);
645 :
646 0 : ThrowIfError (code == 0 ? 0 : errorCode, "Semaphore::wait");
647 0 : }
648 :
649 : Bool
650 0 : Semaphore::wait (int milliseconds)
651 : {
652 0 : Assert (milliseconds >= 0); // it's weird if it's negative
653 :
654 : //// struct timespec t = convertMsDeltaToTimespec (milliseconds);
655 0 : int errorCode = 0;
656 : int code;
657 :
658 0 : do {
659 :
660 0 : code = sem_wait (impl_p->semaphore_p);
661 0 : errorCode = errno;
662 :
663 0 : } while (code != 0 && errorCode == EINTR);
664 :
665 :
666 0 : Bool gotSemaphore = true;
667 :
668 0 : if (code == 0){
669 0 : gotSemaphore = true;
670 0 : } else if (errno == ETIMEDOUT){
671 0 : gotSemaphore = false;
672 : } else {
673 0 : ThrowIfError (errno, String::format ("Mutex::lock (%d)", milliseconds));
674 : }
675 :
676 0 : return gotSemaphore;
677 : }
678 :
679 :
680 0 : Thread::Thread ()
681 : {
682 0 : id_p = new pthread_t;
683 0 : started_p = false;
684 0 : terminationRequested_p = false;
685 0 : }
686 :
687 0 : Thread::~Thread ()
688 : {
689 : // Make sure the thread knows it's time to quit
690 :
691 0 : terminate ();
692 :
693 0 : delete id_p;
694 0 : }
695 :
696 : pthread_t
697 0 : Thread::getId () const
698 : {
699 0 : return * id_p;
700 : }
701 :
702 : pid_t
703 0 : Thread::gettid () const
704 : {
705 0 : pid_t result = 0;
706 : #if defined(AIPS_LINUX)
707 0 : result = syscall (SYS_gettid);
708 : #endif
709 0 : return result;
710 : }
711 :
712 : void *
713 0 : Thread::join ()
714 : {
715 : void * result;
716 0 : int code = pthread_join (* id_p, & result);
717 0 : ThrowIfError (code, "Thread::join");
718 :
719 0 : return result;
720 : }
721 :
722 : bool
723 0 : Thread::isStarted () const
724 : {
725 0 : return started_p;
726 : }
727 :
728 : void
729 0 : Thread::startThread ()
730 : {
731 : // Create the thread, passing a pointer to this object as its
732 : // single argument. Subclass Thread to pass other information
733 : // into the thread function.
734 :
735 0 : int code = pthread_create (id_p, NULL, threadFunction, this);
736 0 : started_p = true;
737 0 : ThrowIfError (code, "Thread::create");
738 0 : }
739 :
740 : void
741 0 : Thread::terminate ()
742 : {
743 0 : terminationRequested_p = true;
744 0 : }
745 :
746 : bool
747 0 : Thread::isTerminationRequested () const
748 : {
749 0 : return terminationRequested_p;
750 : }
751 :
752 : void *
753 0 : Thread::threadFunction (void * arg)
754 : {
755 0 : Thread * thread = reinterpret_cast<Thread *> (arg);
756 :
757 0 : void * result = thread->run ();
758 :
759 0 : return result; // use thread variable to store any results
760 : }
761 :
762 0 : UniqueLock::UniqueLock (Mutex & mutex)
763 0 : : uniqueLock_p (mutex.getMutex())
764 0 : {}
765 :
766 : void
767 0 : UniqueLock::lock ()
768 : {
769 0 : uniqueLock_p.lock ();
770 0 : }
771 :
772 : void
773 0 : UniqueLock::unlock ()
774 : {
775 0 : uniqueLock_p.unlock ();
776 0 : }
777 :
778 :
779 :
780 : } // end namespace Async
781 :
782 : } // end namespace CASA
|