Designing an asynchronous server-side API for WebSockets

| 0 Comments
I've just released The WebSockets Option Pack for The Server Framework and I think it's worth running through why the API that I designed ended up as it did. This will probably end up as part of The Server Framework's WebSockets documentation, which is currently a little sparse.
The WebSockets protocol is a message based protocol with unbounded message sizes. This makes a general purpose API more difficult to design as some clients of the API may wish to work in terms of discrete messages and some may wish to work in terms of streams of data. Due to the possibility of frame fragmentation occurring between the client and the server (due to unknown intermediaries being present in the data path) the server-side cannot assume that it will be able to determine the size of an individual message in advance of receiving the final frame for that message.

WebSocketsAPI-100ByteMessage.png

Whilst the client code may only ever generate messages that are 100 bytes or less the server could receive those messages as (in the worst case) a sequence of 100 frames each of 1 byte. Since each fragmented frame contains only the size of the current fragment there is no way for the server to determine that the message is 100 bytes long until the final fragment arrives.

WebSocketsAPI-FragmentedMessage.png

Often the code that's using the server-side API will also be the code that deploys the client side code which is communicating with it, this allows that code to know things about the messaging it's expecting, after all, it's quite likely that the same organisation created both the client and server code. The challenge for The Server Framework's WebSockets API is to expose the underlying Websockets connection in such a way that the user of the API can decide how best to deal with the inbound data. It's tempting to simply structure the API to work in terms of a series of streams and to pass the problems of message accumulation on to the user of the API, however I felt that was taking the easy way out and doing the users of the API a disservice. Instead the API can be configured to work in terms of complete, distinct, messages as long as they will fit into a single 'buffer' and then degrade to working in terms of a stream if the message is too large. Since the user of the API gets to determine how big the buffers that they use are, they can select a size that is appropriate for their message size, if that is important to them. Users who would prefer to work with streams, or who are working in terms of a single unending stream of data rather than discrete messages, can opt to receive data as it arrives rather than having it accumulated inside The Server Framework's WebSockets API.

The API interface looks something like this.

      virtual void OnData(
         JetByteTools::WebSocket::HyBi::IWebSocket &socket,
         const JetByteTools::Win32::_tstring &text,
         const JetByteTools::WebSocket::MessageStatus status,
         const __int64 messageBytesOutstanding);

      virtual void OnData(
         JetByteTools::WebSocket::HyBi::IWebSocket &socket,
         JetByteTools::IO::IBuffer &buffer,
         const JetByteTools::WebSocket::MessageType type,
         const JetByteTools::WebSocket::MessageStatus status,
         const __int64 messageBytesOutstanding);

Where type tells us if we have a binary message or a text message, status tells us if this data forms a complete message or if it's just part of the message and messageBytesOutstanding tells us how many bytes are still to come, if it can be determined. There are two versions of the OnData() callback because some clients of the API may wish to work in terms of text messages presented to them pre-converted from UTF-8 into a string and some may prefer the flexibility of having the IBuffer interface passed to them so that they can, in turn, pass it off to another thread for processing, perhaps, using the reference counting functionality that's built into the buffer system.

When configuring your server you decide on the size of buffers used and whether the WebSockets protocol handler accumulates messages or simply passes you the data as it arrives. See the WebSockets example servers for more details.

When configured correctly a message based implementation could process discrete messages from a WebSockets connection like this:

void CSocketServer::OnData(
   IWebSocket &socket,
   IBuffer &buffer,
   const MessageType type,
   const MessageStatus status,
   const __int64 messageBytesOutstanding)
{
   if (status != MessageStatusComplete)
   {
      throw CException(
         _T("CSocketServer::OnData()"),
         _T("Unexpected: message larger than buffer size"));
   }

   if (messageBytesOutstanding != 0)
   {
      throw CException(
         _T("CSocketServer::OnData()"),
         _T("Unexpected: message larger than buffer size: ") + ToString(messageBytesOutstanding) + _T(" outstanding"));
   }

   if (type == MessageTypeText)
   {
      // Process a text message
   }
   else
   {
      // Process a binary message
   }

   socket.TryRead();  // Read a new message into a new buffer
}

You may wish to do your message accumulation yourself, perhaps because the inbound data is actually a stream of application messages presented as a sequence of WebSockets fragments (and remember, you can't rely on the fragments that are sent being the same size as the fragments that are received!). In this case it would be normal to sometimes pass the buffer that you received back to the next call to read so that more data can be added to the end of the buffer...

   socket.TryRead(&buffer);  // Read more data into this buffer
}

Accumulating messages larger than your buffer size is easy if you simply chain the buffers together once they're full.
void CPerConnectionData::OnData(
   IWebSocket &socket,
   IBuffer &buffer,
   const MessageType type,
   const MessageStatus status,
   const __int64 messageBytesOutstanding)
{
   if (m_messageSize == 0)
   {
      m_messageType = type;
   }
   else if (m_messageType != type)
   {
      throw CException(_T("CPerConnectionData::OnDataFrame()"), _T("Unexpected frame type"));
   }

   if (status == MessageStatusComplete && messageBytesOutstanding == 0)
   {
      AddBuffer(buffer);

      EchoMessage(socket);

      socket.TryRead();
   }
   else if (buffer.GetSize() == buffer.GetUsed())
   {
      AddBuffer(buffer);

      socket.TryRead();
   }
   else
   {
      socket.TryRead(&buffer);
   }
}

void CPerConnectionData::AddBuffer(
   IBuffer &buffer)
{
   const IBuffer::BufferSize used = buffer.GetUsed();

   if (used || m_messageBuffers.IsEmpty())
   {
      m_messageSize += used;

      m_messageBuffers.Add(buffer);
   }
}

Sending data would be simple if we restricted the user to buffer sized chunks or to single send calls, however an API user may wish to send very large messages, or to send a sequence of fragments where the actual message size is unknown at the time when the message is started. The API supports both of these options as well as the simple 'send a buffer as a text/binary message' option.

Here we send a single buffer as a discrete WebSockets message.

   if (type == MessageTypeText)
   {
      socket.TryWriteText(buffer);
   }
   else
   {
      socket.TryWriteBinary(buffer);
   }

Whilst here, we send a message as a sequence of buffers. We know the size of the complete message before we start and pass that value in our call to StartMessage().

void CPerConnectionData::EchoMessage(
   IWebSocket &socket)
{
   CSmartBuffer buffer(m_messageBuffers.GetNext());

   socket.StartMessage(m_messageType, m_messageSize, buffer.GetRef());

   buffer = m_messageBuffers.GetNext();

   while (buffer.Get())
   {
      socket.SendMessageData(buffer.GetRef());

      buffer = m_messageBuffers.GetNext();
   }

   m_messageSize = 0;
}

Finally, here we show sending a message as a sequence of fragments, not knowing how long the resulting message will be when we start sending.

void CFragmentEchoingSocketServer::OnData(
   IWebSocket &socket,
   IBuffer &buffer,
   const MessageType type,
   const MessageStatus status,
   const __int64 messageBytesOutstanding)
{
   const bool echoing = ToBool(socket.GetUserData(m_userDataIndex));

   if (echoing)
   {
      if (status == MessageStatusComplete && messageBytesOutstanding == 0)
      {
         socket.StartNewFragment(buffer.GetUsed(), buffer, true);

         socket.SetUserData(m_userDataIndex, false);
      }
      else
      {
         socket.StartNewFragment(buffer.GetUsed(), buffer);
      }
   }
   else 
   {
      socket.StartFragmentedMessage(type, buffer.GetUsed(), buffer);

      socket.SetUserData(m_userDataIndex, true);
   }

   socket.TryRead();
}

See the WebSockets example servers for more details.

The WebSockets protocol presents itself as providing a message based interface but given that a single message can have an unbounded size it's complicated to provide a general purpose API which will suit all potential users. By allowing the user of the API to select the buffering and accumulation strategies we provide the flexibility to make it as easy to work with simple, small, discrete messages as it is to work with a potentially endless stream of data.

The WebSockets Option Pack for The Server Framework is available from version 6.5 of The Server Framework which was released recently.

Leave a comment