Process management using Jobs on Windows

One of the problems I currently have with CruiseControl.Net is that some of my tests spawn multiple processes; such as server tests which run the development environment, which runs a batch file to start a server (or two) and then run a test harness which generates network traffic and then run a program to ask the server to shutdown cleanly. When these tests timeout in CC.Net they’re forcibly killed. Unfortunately due to how Windows processes work, killing the parent of a tree of processes doesn’t kill the children. This is because Windows processes don’t know who their children are and partly because it seems that even tools like taskkill.exe and kill.exe (which CC.Net uses to try and kill whole process trees) have issues with working out the relationship between processes sometimes; it may be a race condition, process A launches process B, the tool snapshots the process relationships somehow, process B launches process C, the tool walks the snapshot and kills the processes but doesn’t know about process C… The end result is that when one of these multiple process tests times out in CC.Net the cleanup doesn’t clean up and processes are left hanging around which then interfere with other tests.

There’s a solution to this in the shape of Windows Job objects, and, just by chance, I happen to have been reading about them this week in Richter’s new release. A Job object allows you to group together processes and it knows when new child processes are started and it can kill all processes in the job in one go, or you can wait on all processes in the job to terminate, etc. Unfortunately it’s not easy to add support for launching CC.Net tasks in a job as jobs currently aren’t exposed by .Net (they’ve only been around since Windows 2000, after all…).

One of the nice thing about jobs is that the notifications from them come in via an IO completion port. This means that you need 0 more threads than you already have to monitor a job (assuming you already have at least one thread monitoring an IOCP). I took my usual approach and spent some time putting together some simple classes to make job management easier and to learn how the API works. Once I had a job object that I could use to manage a set of processes and which I implemented my IWaitable interface so that I could wait for the termination of all of the processes within the job I decided it might be nice to be able to launch and monitor a process and capture its output and error streams all without using a single extra thread in the process that was doing the management…

The standard way to capture a process’ standard output and standard error is detailed pretty well in this article, but the problem that I have with that is that it uses an anonymous pipe and you can’t issue overlapped reads on anonymous pipes which means you need a thread to read the pipes and that spoils my scalability. However, as the documentation for CreatePipe() states, “Anonymous pipes are implemented using a named pipe with a unique name. Therefore, you can often pass a handle to an anonymous pipe to a function that requires a handle to a named pipe.” so I set about writing a new implementation of CreatePipe() that allows the pipe to be used with overlapped I/O.

The result is something like this:

CProcessOutputPipe::CProcessOutputPipe(
   const DWORD bufferSize,
   const Milliseconds timeout)
   :  m_name(GenerateName()),
      m_pipeServer(CreateServerPipe(m_name, bufferSize, timeout)),
      m_pipeClient(CreateClientPipe(m_name))
{
   Connect(timeout);
}
  
CProcessOutputPipe::CProcessOutputPipe(
   const _tstring &name,
   const DWORD bufferSize,
   const Milliseconds timeout)
   :  m_name(name),
      m_pipeServer(CreateServerPipe(name, bufferSize, timeout)),
      m_pipeClient(CreateClientPipe(name))
{
   Connect(timeout);
}
  
void CProcessOutputPipe::Connect(
   const Milliseconds timeout)
{
   COverlappedWithEvent overlapped;

   if (0 == ::ConnectNamedPipe(m_pipeServer, &overlapped))
   {
      const DWORD lastError = ::GetLastError();
      
      if (lastError == ERROR_IO_PENDING)
      {
         if (!overlapped.Wait(timeout))
         {
            throw CException(
               _T("CProcessOutputPipe::CProcessOutputPipe()"), 
               _T("Failed to connect client and server ends of the pipe."));
         }
      }
      else if (lastError != ERROR_PIPE_CONNECTED)
      {
         throw CWin32Exception(
            _T("CProcessOutputPipe::CProcessOutputPipe() - ConnectNamedPipe"), 
            lastError);
      }
   }
}
  
CProcessOutputPipe::~CProcessOutputPipe()
{
}
  
HANDLE CProcessOutputPipe::DetachWriteHandle()
{
   return m_pipeClient.Detach();
}
  
HANDLE CProcessOutputPipe::DetachReadHandle()
{
   return m_pipeServer.Detach();
}
  
static _tstring GenerateName()
{
   return CreateGUIDAsString();
}
  
static HANDLE CreateServerPipe(
   const _tstring &name,
   const DWORD bufferSize,
   const Milliseconds defaultTimeout)
{
   const _tstring fullName = _T("\\\\.\\pipe\\") + name;
  
   CSecurityAttributes securityAttributes(false);
  
   CSmartHandle handle(::CreateNamedPipe(
      fullName.c_str(),
      PIPE_ACCESS_INBOUND |      // data flow from client to server
      FILE_FLAG_OVERLAPPED,      // overlapped mode 
      PIPE_TYPE_BYTE |           // byte-type pipe 
      PIPE_READMODE_BYTE |       // byte-read mode 
      PIPE_WAIT,                 // blocking mode 
      1,                         // number of instances 
      0,                         // output buffer size - no data flows this way
      bufferSize,                // input buffer size 
      defaultTimeout,            // client time-out 
      &securityAttributes));     // dont allow handle inheritance
  
   if (handle == INVALID_HANDLE_VALUE)
   {
      const DWORD lastError = ::GetLastError();
  
      throw CWin32Exception(
         _T("CProcessOutputPipe::CreateServerPipe()"), 
         lastError);
   }
  
   return handle.Detach();
}
  
static HANDLE CreateClientPipe(
   const _tstring &name)
{
   static const _tstring server = _T(".");
  
   const _tstring fullName = _T("\\\\") + server + _T("\\pipe\\") + name;
  
   CSecurityAttributes securityAttributes(true);
  
   CSmartHandle handle(::CreateFile( 
         fullName.c_str(),   
         GENERIC_WRITE, 
         0,                      // no sharing 
         &securityAttributes,    // allow handle inheritance
         OPEN_EXISTING,          // opens existing pipe 
         FILE_FLAG_OVERLAPPED,   // default attributes 
         NULL));                 // no template file 
  
   if (handle == INVALID_HANDLE_VALUE)
   {
      const DWORD lastError = ::GetLastError();
  
      throw CWin32Exception(
         _T("CProcessOutputPipe::CreateClientPipe() - CreateFile"), 
         lastError);
   }
  
   DWORD dwMode = PIPE_READMODE_BYTE; 
  
   if (0 == ::SetNamedPipeHandleState( 
      handle,   // pipe handle 
      &dwMode,  // new pipe mode 
      NULL,     // don't set maximum bytes 
      NULL))    // don't set maximum time 
   {
      const DWORD lastError = ::GetLastError();
  
      throw CWin32Exception(
         _T("CProcessOutputPipe::CreateClientPipe() - SetNamedPipeHandleState"), 
         lastError);
   }
  
   return handle.Detach();
}

Once the write end of this pipe is wired up to a process’ standard output or error stream and an instance of CAsyncFileReader() is plugged in on the read end I can monitor the process and read the pipe using the threads in my IOCP’s pool. Firing off a process and capturing/processing its output is now reliable (I can terminate it and any processes it creates) and scalable (I don’t need any new threads in my process to monitor a new process tree).

Of course this doesn’t help me directly with my problems with CC.Net but I should be able to write a process launcher that I can spawn from within a CC.Net task and which runs my multi-process tests in a job and which can therefore clean up after then correctly when it’s killed off. I’m also one step closer to a scalable C++ task execution engine; but that’s another story.