Filtering and layering functionality onto a TCP byte stream

| 0 Comments

As I mentioned a while back, the interface that user code had to The Server Framework was somewhat jumbled and, in fact, consisted of three interfaces that were mashed together. I've been teasing these apart and in doing so have finally bitten the bullet and dived in and refactored the filtering interface so that it's easier to use.

The Server Framework has had a filtering interface for a long time, since around 2002 when I added SSL support for a client. The idea was that you should be able to layer services on to the raw TCP byte stream in such a way that the code that used the byte stream, i.e the business logic part of the server, didn't need to know or care that anything was being done. So, as far as SSL is concerned, you still have cleartext data arriving via your ReadComplete() calls in just the same way that you would if there was no SSL on the connection. This is especially useful when the layer may require some additional handshaking to set up the connection. Personally I don't want that showing through, so from the business layer you simply execute a Write() call and you shouldn't need to know, or care, that before that first block of application generated data hits the wire there's all manner of SSL handshaking going on under the covers.

I originally accomplished this by allowing a derived server class to 'filter' the TCP stream, that is get at the buffers before the application layer does (or before they're sent to the wire). It worked but it was ugly and when I was recently asked about layering compression onto a connection (either alone or as well as SSL) I knew it could be done but I also knew it would be rather tricky with the current architecture.

So, I've been cleaning up and redesigning...

The new filtering scheme allows for layering filters on top of one another and allows each filter to swallow or generate as much data as it wants to. Some filters, such as compressing filters, will, hopefully, reduce the amount of data written to the wire and increase the amount of data read by the application. They will swallow bytes on the way out of the system, 10 bytes written may end up as 7 on the wire, and generate bytes on the way into the system, 7 on the wire should, hopefully, end up as 10 read by the application. This is pretty simple if the filter never swallows a whole buffer or generates enough data so that one read completion effectively turns into two, but we can't rely on that. Anyway, some filters, such as SSL layering, will always generate completely new data writes and always swallow some of the data reads due to the handshaking that needs to go on under the covers. An additional issue is that, if the socket being used guarentees read and write ordering then it needs to be able to provide that same guarentee if there are filters on the stream; easy if the filters never swallow whole buffers or generate new read completions and less so if they do...

I ended up with two interfaces, one that the filter needs to implement and one that a filter can use to communicate with the connection manager. They're something like this:

class IFilterStreamSocketConnections
{
   public :
  
      /// Called once before any other method on this interface is called. 
      /// The sink interface should be used to pass calls onto the next filter
      /// in the chain and the manager interface should be used to generate
      /// new read and write events.
  
      virtual void InitialiseFilter(
         IFilterStreamSocketConnections &sink,
         IManageStreamSocketConnectionFilters &manager) = 0;
  
      /// The type of connection that is being filtered.
   
      enum ConnectType
      {
         ConnectTypeSync,
         ConnectTypeSyncNoThrow,
         ConnectTypeAsync
      };
  
      /// Called before an outbound connection is attempted but after the 
      /// socket is allocated. This call can be used to block or reroute 
      /// connections or to "plug" connect data into the socket's user data.
  
      virtual CSmartStreamSocket FilterConnect(
         const ConnectType connectType,
         IStreamSocketEx &socket,
         const IFullAddress &address,
         const void *pUserData,
         const void *pServerData) = 0;
  
      /// Called when a connection (inbound or outbound) is established. You
      /// can determine which direction the connection is (if that's important)
      /// by calling GetConnectionDirection() on the socket.
  
      virtual void FilterConnectionEstablished(
         IStreamSocketEx &socket,
         const IAddress &address) = 0;
  
      /// Called when a read is requested. The buffer is optional, if no buffer
      /// is passed with the call then one will be allocated just before the 
      /// read is issued. Filters can prevent the application (and filters 
      /// above) from issuing reads by simply not passing these calls on to
      /// the next filter in the chain. If a filter wishes to generate its own
      /// reads then it should use IManageStreamSocketConnectionFilters rather
      /// than calling this function with its own buffers.
  
      virtual void FilterReadRequest(
         IStreamSocketEx &socket,
         JetByteTools::IO::IBuffer *pBuffer) = 0;
  
      /// Called when a read has completed. The filter can manipulate the data
      /// in any way that they like, this includes using the data for their own
      /// purposes and swallowing read completions and also generating their
      /// own read completions via the IManageStreamSocketConnectionFilters
      /// interface.
  
      virtual void FilterReadCompleted(
         IStreamSocketEx &socket,
         JetByteTools::IO::IBuffer &buffer) = 0;
  
      /// Called when a read has completed with an error. The filter can manipulate 
      /// the data in any way that they like, this includes using the data for 
      /// their own purposes and swallowing read completions and also generating 
      /// their own read completions via the IManageStreamSocketConnectionFilters
      /// interface. Note that fiddling around with completion errors is probably
      /// unwise...
  
      virtual void FilterReadCompletionError(
         IStreamSocketEx &socket,
         JetByteTools::IO::IBuffer &buffer,
         const DWORD lastError) = 0;
  
      /// Called when a write is requested. Filters can prevent the application 
      /// (and filters above) from issuing reads by simply not passing these 
      /// calls on to the next filter in the chain. If a filter wishes to 
      /// generate its own reads then it should use 
      /// IManageStreamSocketConnectionFilters rather than calling this function 
      /// with its own buffers.
  
      virtual void FilterWriteRequest(
         IStreamSocketEx &socket,
         JetByteTools::IO::IBuffer &buffer) = 0;
  
      /// Called when a write has completed. The filter can manipulate the data
      /// in any way that they like, this includes using the data for their own
      /// purposes and swallowing write completions and also generating their
      /// own write completions via the IManageStreamSocketConnectionFilters
      /// interface.
  
      virtual void FilterWriteCompleted(
         IStreamSocketEx &socket,
         JetByteTools::IO::IBuffer &buffer) = 0;
  
      /// Called when a write has completed with an error. The filter can manipulate
      /// the data in any way that they like, this includes using the data for 
      /// their own purposes and swallowing write completions and also generating
      /// their own write completions via the IManageStreamSocketConnectionFilters
      /// interface. Note that fiddling around with completion errors is probably
      /// unwise...
  
      virtual void FilterWriteCompletionError(
         IStreamSocketEx &socket,
         JetByteTools::IO::IBuffer &buffer,
         const DWORD lastError) = 0;
  
      /// Called after IStreamSocketConnectionManagerCallback::OnSocketReleased() 
      /// callback and before the socket is passed to the allocator. This can
      /// be used to remove any user data that was added by the filter in a
      /// FilterConnect() call.
  
      virtual void FilterSocketReleased(
         JetByteTools::Win32::IIndexedOpaqueUserData &userData) = 0;
     
   protected :
   
      /// We never delete instances of this interface; you must manage the 
      /// lifetime of the class that implements it.
  
      ~IFilterStreamSocketConnections() {}
};

and

class IManageStreamSocketConnectionFilters
{
   public :
  
      /// Generate a read completion that uses the socket's sequence counter
      /// and pass it to the supplied filter.
  
      virtual void ReadCompleted(
         IStreamSocketEx &socket,
         JetByteTools::IO::IBuffer *pBuffer,
         IFilterStreamSocketConnections &nextFilter) = 0;
  
      /// Generate a read completion that uses the specified sequence counter
      /// and pass it to the supplied filter.
  
      virtual void ReadCompleted(
         const size_t sequenceId,
         IStreamSocketEx &socket,
         JetByteTools::IO::IBuffer *pBuffer,
         IFilterStreamSocketConnections &nextFilter) = 0;
  
      /// Generate a write request that uses the socket's sequence counter
      /// and pass it to the supplied filter.
  
      virtual void RequestWrite(
         IStreamSocketEx &socket,
         JetByteTools::IO::IBuffer &buffer,
         IFilterStreamSocketConnections &nextFilter) = 0;
  
      /// Generate a write request that uses the specified sequence counter
      /// and pass it to the supplied filter.
  
      virtual void RequestWrite(
         const size_t sequenceId,
         IStreamSocketEx &socket,
         JetByteTools::IO::IBuffer &buffer,
         IFilterStreamSocketConnections &nextFilter) = 0;
  
   protected :
  
      /// We never delete instances of this interface; you must manage the 
      /// lifetime of the class that implements it.
  
      ~IManageStreamSocketConnectionFilters() {}
};

The filter needs to be able to hook into connection establishment and termination because it's likely (almost essential) that the filter will need some per-connection data that it uses whilst manipulating the byte stream. By allowing pre and post connection hooking we allow the filter to store its data in the user data store of the socket. The rest of the filter interface is pretty straight forward. Various calls pass through the filter chain before reaching the user code callbacks and this allows the filter chain to manipulate data or swallow or generate request or completion events. Each filter is initialised with a reference to the next filter in the chain and a manager interface that allows it to generate new completion events. If it ever generates read completions or swallows write completions then it MUST always use the management interface to pass every call down the chain and, as is likely, if the order of the data buffers is important then it should use its own sequence numbers. Filters can request that the socket manages sequence numbers on its behalf and it references its set of sequence numbers via a "sequence id".

Given that the new filtering scheme allows chaining filters it's easy to add SSL over compression to a byte stream and not need to change the business logic to do so... Luckily I have far more tests than when I originally wrote the code back in 2002 and this meant that I was able to build tests that prove that all this layering and sequencing stuff actually works. In the past I'd always used my SSL server test as a shakedown for my sequencing and filtering changes, this time I didn't have to.

Leave a comment