Logo Search packages:      
Sourcecode: pwlib version File versions  Download package

tlibmpthrd.cxx

/*
 * tlibmpthrd.cxx
 *
 * Routines for Macintosh pre-emptive threading system
 *
 * Portable Windows Library
 *
 * Copyright (c) 1993-1998 Equivalence Pty. Ltd.
 *
 * The contents of this file are subject to the Mozilla Public License
 * Version 1.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 * http://www.mozilla.org/MPL/
 *
 * Software distributed under the License is distributed on an "AS IS"
 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 * the License for the specific language governing rights and limitations
 * under the License.
 *
 * The Original Code is Portable Windows Library.
 *
 * The Initial Developer of the Original Code is Equivalence Pty. Ltd.
 *
 * Portions are Copyright (C) 1993 Free Software Foundation, Inc.
 * All Rights Reserved.
 *
 * Contributor(s): ______________________________________.
 *
 * $Log: tlibmpthrd.cxx,v $
 * Revision 1.4  2002/06/27 06:38:58  robertj
 * Changes to remove memory leak display for things that aren't memory leaks.
 *
 * Revision 1.3  2002/02/19 07:40:59  rogerh
 * Remove PMutex destructor for Carbon.
 *
 * Revision 1.2  2002/02/19 07:28:02  rogerh
 * PXAbortIO -> PXAbortBlock. Submitted by Peter Johnson <paj@mac.com>
 *
 * Revision 1.1  2001/08/11 15:38:43  rogerh
 * Add Mac OS Carbon changes from John Woods <jfw@jfwhome.funhouse.com>
 *
 */

#include <sys/resource.h>
#include <new> // just because I want to throw std::bad_alloc...

#ifndef NDEBUG
#define DEBUG_THREADS
extern int debug_mpthreads;
#endif

PDECLARE_CLASS(PHouseKeepingThread, PThread)
  public:
    PHouseKeepingThread()
      : PThread(1000, NoAutoDeleteThread, NormalPriority, "Housekeeper")
      { closing = FALSE; Resume(); }

    void Main();
    void SetClosing() { closing = TRUE; }

  protected:
    BOOL closing;
};


#define new PNEW


int PThread::PXBlockOnIO(int handle, int type, const PTimeInterval & timeout)
{
  //PTRACE(1,"PThread::PXBlockOnIO(" << handle << ',' << type << ')');

  // make sure we flush the buffer before doing a write
  fd_set tmp_rfd, tmp_wfd, tmp_efd;
  fd_set * read_fds      = &tmp_rfd;
  fd_set * write_fds     = &tmp_wfd;
  fd_set * exception_fds = &tmp_efd;

  struct timeval * tptr = NULL;
  struct timeval   timeout_val;
  if (timeout != PMaxTimeInterval) {
    static const PTimeInterval oneDay(0, 0, 0, 0, 1);
    if (timeout < oneDay) {
      timeout_val.tv_usec = (timeout.GetMilliSeconds() % 1000) * 1000;
      timeout_val.tv_sec  = timeout.GetSeconds();
      tptr                = &timeout_val;
    }
  }

  int retval;

  for (;;) {

    FD_ZERO (read_fds);
    FD_ZERO (write_fds);
    FD_ZERO (exception_fds);

    switch (type) {
      case PChannel::PXReadBlock:
      case PChannel::PXAcceptBlock:
        FD_SET (handle, read_fds);
        break;
      case PChannel::PXWriteBlock:
        FD_SET (handle, write_fds);
        break;
      case PChannel::PXConnectBlock:
        FD_SET (handle, write_fds);
        FD_SET (handle, exception_fds);
        break;
      default:
        PAssertAlways(PLogicError);
        return 0;
    }

    // include the termination pipe into all blocking I/O functions
    int width = handle+1;
    FD_SET(unblockPipe[0], read_fds);
    width = PMAX(width, unblockPipe[0]+1);
  
    retval = ::select(width, read_fds, write_fds, exception_fds, tptr);

    if ((retval >= 0) || (errno != EINTR))
      break;
  }

  if ((retval == 1) && FD_ISSET(unblockPipe[0], read_fds)) {
    BYTE ch;
    ::read(unblockPipe[0], &ch, 1);
    errno = EINTR;
    retval =  -1;
    //PTRACE(1,"Unblocked I/O");
  }

  return retval;
}

void PThread::PXAbortBlock() const
{
  BYTE ch;
  ::write(unblockPipe[1], &ch, 1);
}


// For Mac OS, the housekeeping thread has two jobs:
// First, poll for synchronous signals (as is traditional), and
// second, to poll the MPThread termination notification queue and clean up
// deceased PThreads.  
// There is an ickiness here which depends on a current restriction of 
// Mac OS X:  synchronous signals (i.e. not signals resulting from
// exceptions) are only delivered to the main thread.  I assume that
// it is therefore safe for the main thread to call MPNotifyQueue from
// a signal handler if and only if the main thread never calls MPNotifyQueue
// on the termination notification queue from its main code.  This ought to
// be acceptable if notifying a queue is single-threaded per queue; if
// MPNotifyQueue has a global critical section, this will work very badly.

static MPQueueID terminationNotificationQueue = 0;
// This bites.  Threads don't know what process they come from (even though
// there can be only one), yet when we're winding down the process thread
// kills the housekeeper before it can clean up all the other threads.
// So the process thread has to poll the termination queue, but it does so
// from PThread context, so it can't know there's no housekeeper.  yuck.
static BOOL noHousekeeper = 0;

static void SetUpTermQueue() 
{
    OSStatus err;
    // Let us PLEASE try not to get into any "create/delete" loops here.
    // Create it and be DONE with it.
    while (!terminationNotificationQueue) {
        MPQueueID tempQueue;
        err = MPCreateQueue(&tempQueue);
        PAssert(err == noErr, "MPCreateQueue failed");
        // When Motorola finally finishes the 620, there's a lot of Mac code
        // gonna need to be rewritten.
        // HAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHAHA!!!
        // "finishes the 620".  hee hee hee!
        if (!OTCompareAndSwap32(0, (UInt32)tempQueue,
                                (UInt32*)&terminationNotificationQueue)) {
            // then someone else snuck in and initialized it.
            MPDeleteQueue(tempQueue);
        } else {
#ifdef DEBUG_THREADS
            if (debug_mpthreads)
                fprintf(stderr,"set up notification queue\n");
#endif
            // XXX MPNotifyQueue is perfectly willing to allocate memory
            // XXX for the items dropped in the queue.  However, if it can't,
            // XXX then life just goes on -- and we miss a thread exit message.
            // XXX If we reserve queue space, however, then we guarantee two
            // XXX things:  1, we absolutely will be able to receive N
            // XXX notifications, and 2, we absolutely will drop the N+1st
            // XXX on the floor, spare memory or no.  The target applications
            // XXX for this library do not appear (currently) to generate
            // XXX absurd numbers of threads, so I'll reserve an absurd number
            // XXX of messages, and pretend that nothing can go wrong.
            // XXX n go wrong.n go wrong.n go wrong.n go wrong.n go wrong.
            // XXX If the following fails, it's probably for lack of memory,
            // XXX in which case the queue will just try dynamic allocation.
            (void)MPSetQueueReserve(terminationNotificationQueue, 128);
        }
    }
}

static BOOL PollNotificationQueue(Duration timeout)
{
    OSStatus err = noErr;
    void *parm1, *parm2, *parm3;
    
    err = MPWaitOnQueue(terminationNotificationQueue,
                        &parm1, &parm2, &parm3,
                        timeout);
    if (err == noErr) {
        // then we got a notification
        if ((int)parm1 == 1) {
            // then it was a thread death notification, parm2 is
            // the PThread pointer
#ifdef DEBUG_THREADS
            if (debug_mpthreads)
                fprintf(stderr,"notified of %p death\n", parm2);
#endif
            PThread::PX_ThreadEnd(parm2);
        } // else parm1 == 0 and it's just a wakeup notice
    }
    return err == noErr;
}

void PHouseKeepingThread::Main()
{
    PProcess & process = PProcess::Current();

    SetUpTermQueue();

    while (!closing) {
        PTimeInterval waitTime = process.timers.Process();

        Duration timeout;
        if (waitTime == PMaxTimeInterval)
            timeout = kDurationForever;
        else {
            // "Values of type Duration are 32 bits long.  They are intepreted
            //  in a manner consistend with the Time Manager -- positive values
            //  are in units of milliseconds, negative values are in units of
            //  microseconds."  
            // Fortunately, PMaxTimeInterval is limited to a positive 32-bit
            // number of milliseconds.
            timeout = (long)waitTime.GetMilliSeconds();
        }

        // Block on the notification queue

        (void)PollNotificationQueue(timeout);
        
        // whether we timed out or got notified, check the signals.
        process.PXCheckSignals();
    }
    noHousekeeper = 1;
#ifdef DEBUG_THREADS
    if (debug_mpthreads)
        fprintf(stderr,"housekeeper exiting\n");
#endif
}


void PProcess::Construct()
{
  // set the file descriptor limit to something sensible
  struct rlimit rl;
  PAssertOS(getrlimit(RLIMIT_NOFILE, &rl) == 0);
  rl.rlim_cur = rl.rlim_max;
  PAssertOS(setrlimit(RLIMIT_NOFILE, &rl) == 0);

  SetUpTermQueue();

  // initialise the housekeeping thread
  housekeepingThread = NULL;

  CommonConstruct();
}


PProcess::~PProcess()
{
  // Don't wait for housekeeper to stop if Terminate() is called from it.
  if (housekeepingThread != NULL && PThread::Current() != housekeepingThread) {
    housekeepingThread->SetClosing();
    SignalTimerChange();
    housekeepingThread->WaitForTermination();
    delete housekeepingThread;
    housekeepingThread = 0;
  }
  // XXX try to gracefully handle shutdown transient where the housekeeping
  // XXX thread hasn't managed to clean up all the threads
  while (PollNotificationQueue(kDurationImmediate)) ;
  
  CommonDestruct();
}


PThread::PThread()
{
  // see InitialiseProcessThread()
}


void PThread::InitialiseProcessThread()
{
  OSStatus err        = 0;
  PX_origStackSize    = 0;
  autoDelete          = FALSE;
  PX_threadId         = MPCurrentTaskID();
  PX_suspendCount     = 0;

  ::pipe(unblockPipe);

  // Sadly, Mac OS MPThreads can't just initialize a block of memory into
  // an MPSemaphore (XXX ought to be a CriticalRegion, but they're broken
  // in Mac OS X 10.0.x!)
  PX_suspendMutex = 0;
  if ((err = MPCreateSemaphore(1,1,&PX_suspendMutex))
      != 0) {
      PAssertOS(err == 0);
      throw std::bad_alloc();
  }

  ((PProcess *)this)->activeThreads.DisallowDeleteObjects();
  ((PProcess *)this)->activeThreads.SetAt((unsigned)PX_threadId, this);
}


PThread::PThread(PINDEX stackSize,
                 AutoDeleteFlag deletion,
                 Priority /*priorityLevel*/,
                 const PString & name)
        : threadName(name), PX_signature(kMPThreadSig)
{
  OSStatus err = 0;

  PAssert(stackSize > 0, PInvalidParameter);

  PX_origStackSize = stackSize;
  autoDelete       = (deletion == AutoDeleteThread);

  // Sadly, Mac OS MPThreads can't just initialize a block of memory into
  // an MPSemaphore (XXX ought to be a CriticalRegion, but they're broken
  // in Mac OS X 10.0.x!)
  PX_suspendMutex = 0;
  if ((err = MPCreateSemaphore(1,1,&PX_suspendMutex)) != 0) {
      PAssert(err == 0, "MPCreateSemaphore failed");
      throw std::bad_alloc();
  }

  ::pipe(unblockPipe);

  // throw the new thread
  PX_NewThread(TRUE);
}


PThread::~PThread()
{
  if (!IsTerminated()) 
    Terminate();

  ::close(unblockPipe[0]);
  ::close(unblockPipe[1]);

  if (PX_suspendMutex)
      MPDeleteSemaphore(PX_suspendMutex);
#ifdef DEBUG_THREADS
  if (debug_mpthreads)
      fprintf(stderr,"thread %p destructing\n", this);
#endif
  PX_signature = kMPDeadSig;
}

void PThread::PX_NewThread(BOOL startSuspended)
{
  OSErr err;
  // initialise suspend counter and create mutex
  PX_suspendCount = startSuspended ? 1 : 0;

  // initialise Suspend/Resume semaphore (for Mac OS X)
  // XXX The MPThread manager allows for starting tasks "suspended", but I
  // XXX suspect that only works if you have a debugger registered.
  suspend_semaphore = new PSemaphore(0,1);

  // throw the thread

  SetUpTermQueue();
  
  // create the task.
#ifdef DEBUG_THREADS
  if (debug_mpthreads)
      fprintf(stderr,"thread %p being started\n", (void *)this);
#endif
  err = MPCreateTask( (TaskProc)PX_ThreadStart, (void*)this,
                      65536, // stacksize
                      terminationNotificationQueue,
                      (void *)1,    // param 1 == "death"
                      (void *)this, // param 2 == "PThread to clean up"
                      0, // no options
                      &PX_threadId);
  PAssert(err == 0, "MPCreateTask failed");
  if (err) throw std::bad_alloc();

}

long PThread::PX_ThreadStart(void * arg)
{ 
  MPTaskID threadId = MPCurrentTaskID();

  // self-detach (no need)

  PThread * thread = (PThread *)arg;
  thread->SetThreadName(thread->GetThreadName());

  PProcess & process = PProcess::Current();

  // add thread to thread list
  process.threadMutex.Wait();
  process.activeThreads.SetAt((unsigned)threadId, thread);
  process.threadMutex.Signal();

  // if we are not supposed to start suspended, then don't wait
  // if we are supposed to start suspended, then wait for a resume

  if (thread->PX_suspendCount != 0) {
    thread->suspend_semaphore->Wait();    // Wait for the Resume
  }

  // now call the the thread main routine
  //PTRACE(1, "tlibthrd\tAbout to call Main");
  thread->Main();

#ifdef DEBUG_THREADS
  if (debug_mpthreads)
      fprintf(stderr,"thread %p returning\n", thread);
#endif
  return 0;
}


void PProcess::SignalTimerChange()
{
  if (housekeepingThread == NULL) {
#if PMEMORY_CHECK
    BOOL oldIgnoreAllocations = PMemoryHeap::SetIgnoreAllocations(TRUE);
#endif
    housekeepingThread = new PHouseKeepingThread;
#if PMEMORY_CHECK
    PMemoryHeap::SetIgnoreAllocations(oldIgnoreAllocations);
#endif
  }

  SetUpTermQueue();
  MPNotifyQueue(terminationNotificationQueue, 0, 0, 0);
}


void PThread::PX_ThreadEnd(void * arg)
{
  PThread * thread = (PThread *)arg;
  PProcess & process = PProcess::Current();
  
  MPTaskID id = thread->PX_GetThreadId();
  if (id != 0) {

    // remove this thread from the active thread list
    process.threadMutex.Wait();
    process.activeThreads.SetAt((unsigned)id, NULL);
    process.threadMutex.Signal();
  }

  // delete the thread if required, note this is done this way to avoid
  // a race condition, the thread ID cannot be zeroed before the if!
  if (thread->autoDelete) {
    thread->PX_threadId = 0;  // Prevent terminating terminated thread
    delete thread;
  }
  else
    thread->PX_threadId = 0;
}


MPTaskID PThread::PX_GetThreadId() const
{
  return PX_threadId;
}


void PThread::Restart()
{
  if (IsTerminated())
    return;

  PX_NewThread(FALSE);
}


void PThread::Terminate()
{
  if (PX_origStackSize <= 0)
    return;

  if (IsTerminated())
    return;

  PTRACE(1, "tlibthrd\tForcing termination of thread " << (void *)this);

  if (Current() == this)
      MPExit(0);
  else {
      MPTaskID taskId = PX_threadId;
      WaitForTermination();
      // XXX Dire Consequences[TM] are warned of when one uses MPTerminateTask.
      // XXX However, the same Dire Consequences are predicted (I think) for
      // XXX pthread_kill which the PWLIB code already uses.
      // XXX However, the only thing the cleanup function does is removes the
      // XXX thread from the thread table, which is already performed by the
      // XXX housekeeping thread; PWLIB doesn't try to salvage locks or
      // XXX anything clever like that.
      // XXX I just hope taskIds aren't quickly reused.
      if (taskId != 0)
          (void)MPTerminateTask(taskId, kMPTaskAbortedErr);
  }
}


void PThread::PXSetWaitingSemaphore(PSemaphore * sem)
{
    // not needed
}


BOOL PThread::IsTerminated() const
{
  if (PX_threadId == 0) {
    //PTRACE(1, "tlibthrd\tIsTerminated(" << (void *)this << ") = 0");
    return TRUE;
  }

#ifdef _not_def_ // Sigh.  no MPGetNextTaskID on MOSX
  // This seems like a silly way to do this, but I think it might work.
  // The end condition for MPGetNextTaskID isn't documented, so I try both
  // logical possibilities.
  MPTaskID sometask = 0;
  MPProcessID myproc = 0;
  while (MPGetNextTaskID(myproc, &sometask) == noErr) {
      if (sometask == 0) break;
      if (sometask == PX_threadId) {
          //PTRACE(1, "tlibthrd\tIsTerminated(" << (void *)this << ") not dead yet");
          return FALSE;
      }
  }
  // didn't find it, it's dead
  //PTRACE(1, "tlibthrd\tIsTerminated(" << (void *)this << ") = 0");
  return TRUE;
#else
  return FALSE; // ENOCLUE
#endif
}

// Mac OS X and Darwin 1.2 does not support pthread_kill() or sigwait()
// so we cannot implement suspend and resume using signals. Instead we have a
// partial implementation using a Semaphore.
// As a result, we can create a thread in a suspended state and then 'resume'
// it, but once it is going, we can no longer suspend it.
// So, for Mac OS X, we will accept Resume() calls (or Suspend(FALSE))
// but reject Suspend(TRUE) calls with an Assertion. This will indicate
// to a user that we cannot Suspend threads on Mac OS X

void PThread::Suspend(BOOL susp)
{
  OSStatus err;
  err = MPWaitOnSemaphore(PX_suspendMutex,kDurationForever);
  PAssert(err == 0, "MPWaitOnSemaphore failed");

  if (susp) {
    // Suspend - warn the user with an Assertion
    PAssertAlways("Cannot suspend threads on Mac OS X due to lack of pthread_kill()");
  }

  // if resuming, then see if to really resume
  else if (PX_suspendCount > 0) {
    PX_suspendCount--;
    if (PX_suspendCount == 0)  {
      suspend_semaphore->Signal();
    }
  }

  err = MPSignalSemaphore(PX_suspendMutex);
  PAssert( err == 0, "MPSignalSemaphore failed");
}

void PThread::Resume()
{
  Suspend(FALSE);
}


BOOL PThread::IsSuspended() const
{
  OSStatus err;

  if (IsTerminated())
    return FALSE;

  err = MPWaitOnSemaphore(PX_suspendMutex, kDurationForever);
  PAssert(err == 0, "MPWaitOnSemaphore failed");
  BOOL suspended = PX_suspendCount > 0;
  err = MPSignalSemaphore(PX_suspendMutex);
  PAssert(err == 0, "MPSignalSemaphore failed");
  return suspended;
}


void PThread::SetAutoDelete(AutoDeleteFlag deletion)
{
  PAssert(deletion != AutoDeleteThread || this != &PProcess::Current(), PLogicError);
  autoDelete = deletion == AutoDeleteThread;
}


void PThread::SetPriority(Priority /*priorityLevel*/)
{
}


PThread::Priority PThread::GetPriority() const
{
  return LowestPriority;
}


void PThread::Yield()
{
  ::sleep(0);
}


PThread * PThread::Current()
{
  PProcess & process = PProcess::Current();
  process.threadMutex.Wait();
  PThread * thread = process.activeThreads.GetAt((unsigned)MPCurrentTaskID());
  process.threadMutex.Signal();
  return PAssertNULL(thread);
}


void PThread::Sleep(const PTimeInterval & timeout)
{
    AbsoluteTime expiry;
    Duration delta = kDurationForever;
    
    if (timeout != PMaxTimeInterval) {
        delta = timeout.GetMilliSeconds();
    }
    expiry = AddDurationToAbsolute(delta, UpTime());
    
    (void)MPDelayUntil(&expiry);
}


void PThread::WaitForTermination() const
{
  PAssert(Current() != this, "Waiting for self termination!");
  
  PXAbortBlock();

  while (!IsTerminated()) {
    PAssert(PX_signature == kMPThreadSig, "bad signature in living thread");
    Current()->Sleep(10);
#ifdef DEBUG_THREADS
    if (debug_mpthreads)
        fprintf(stderr,"spinning for termination of thread %p\n", (void *)this);
#endif  
    if (noHousekeeper) PollNotificationQueue(kDurationImmediate);
  }
}


BOOL PThread::WaitForTermination(const PTimeInterval & maxWait) const
{
  PAssert(Current() != this, "Waiting for self termination!");
  
  //PTRACE(1, "tlibthrd\tWaitForTermination(delay)");
  PXAbortBlock();

  PTimer timeout = maxWait;
  while (!IsTerminated()) {
    if (timeout == 0)
      return FALSE;
    Current()->Sleep(10);
  }
  return TRUE;
}


///////////////////////////////////////////////////////////////////////////////

PSemaphore::PSemaphore(unsigned initial, unsigned maxCount)
{
    OSStatus err = MPCreateSemaphore(maxCount, initial, &semId);
    PAssert(err == 0, "MPCreateSemaphore failed");
    PAssert((long)semId != 0 && (long)semId != -1, "stupid semId");
}


PSemaphore::~PSemaphore()
{
    OSStatus err = MPDeleteSemaphore(semId);
    PAssert(err == 0, "MPDeleteSemaphore failed");
    *(long *)&semId = -1;
}


void PSemaphore::Wait()
{
    assert((long)semId != 0);
    assert((long)semId != -1);
    
    PAssert((long)semId != -1, "wait on destructed PSemaphore");
    PAssert((long)semId != 0, "semId stomped");
    OSStatus err = MPWaitOnSemaphore(semId, kDurationForever);
    PAssert(err == 0, "MPWaitOnSemaphore failed");
}


BOOL PSemaphore::Wait(const PTimeInterval & waitTime)
{
  OSErr err = 0;
    
  if (waitTime == PMaxTimeInterval) {
    Wait();
    return TRUE;
  }

  Duration timeout = waitTime.GetMilliSeconds();
  if ((err = MPWaitOnSemaphore(semId, timeout)) == noErr)
      return TRUE;
  if (err == kMPTimeoutErr)
      return FALSE;
  PAssert(err == 0, psprintf("timed wait error = %i", err));
  return FALSE;
}

void PSemaphore::Signal()
{
    OSStatus err = MPSignalSemaphore(semId);
    // was it already signalled?
    if (err == kMPInsufficientResourcesErr) err = 0;
    PAssert(err == 0, "MPSignalSemaphore failed");
}


BOOL PSemaphore::WillBlock() const
{
    OSStatus err = MPWaitOnSemaphore(semId, kDurationImmediate);
    if (err == kMPTimeoutErr)
        return TRUE;
    PAssert(err == 0, psprintf("timed wait error = %i", err));
    (void)MPSignalSemaphore(semId);
    return FALSE;
}

// Ideally, a PMutex would contain an MPCriticalSection instead of a
// semaphore, but the class derivation is outside the machine-specific
// code, and I'm unwilling to do something gross like implement a bogus
// constructor for PSemaphore which doesn't allocate a semaphore.

PMutex::PMutex()
  : PSemaphore(1, 1)
{
}

void PMutex::Wait()
{
      PSemaphore::Wait();
}

BOOL PMutex::Wait(const PTimeInterval & timeout)
{
      return PSemaphore::Wait(timeout);
}

void PMutex::Signal()
{
      PSemaphore::Signal();
}

BOOL PMutex::WillBlock() const 
{
      return PSemaphore::WillBlock();
}

PSyncPoint::PSyncPoint()
  : PSemaphore(0, 1)
{
}

Generated by  Doxygen 1.6.0   Back to index