Sometimes, in low-level devices (like sensors, controllers, serial to ethernet converters, etc) the only way to interact with the hardware is via a custom protocol on top of a raw TCP socket. TCP is stream oriented so it is up to the procotol to define the way to frame the messages. In most cases 3 kind of pattern are used:

  1. Using the first byte or n bytes to specify message length, so the receiver knows when the message is complete.
  2. Using (especially in text oriented streams) of a terminator char to mark the end of the message.
  3. In some rare request / response cases, when all the message data fits in 1 TCP packet, no delimiter is used and te receiver reads the whole buffer

Also, in a request / response between the 2 endpoints, the response does not always reference the request frame, and it is assumed that match the last request message. Typically, in these cases, it is not a good idea to have multiple parallel connections (sockets) opened at the same time, because:

  1. Hardware resources can be limited.
  2. The initial TCP handshake consumes resources and can introduce delay.
  3. It is not always guaranteed that the service / device can manage parallel connections (for example in serial to ethernet converters). The worst case is when multiple sockets are accepted but the protocol not support them, and the stream is filled with inconsistent data.

However, from the client point of view, it will be nice to have a way in which multiple routines can interact concurrently with the socket, so that each of them can push commands to the device or poll for specific subset of data only when needed.

Use case scenario

Let’s contraint the use case to have a better idea of want e want to achieve:

  1. 1 service with a TCP server listening.
  2. 1 client that must use at most 1 socket to connect to the service.
  3. The socket has to be shared with multiple goroutines that use it concurrently.
  4. Each goroutine is responsible to read and write from / to the socket, but cannot close or reopen it. When the routine has finished with operations, it signal it to a stream manager that assign the socket to another routine.
  5. Each goroutine is responsible for signalling an error if connection cannot be used anymore. In that case the stream manager must close and free the connection resources, provide a new shared conection and assign it to waiting routines.

Non-goals

  • Additional application layer to the TCP socket. Solution will focus only of sharing a raw socket.
  • Preemption between the routines. The routine must send and receive the entire message and collaborate to release the shared connection.

Concurrency and net.Conn

From the Golang documentation:

// Read reads data from the connection.
	// Read can be made to time out and return an error after a fixed
	// time limit; see SetDeadline and SetReadDeadline.
	Read(b []byte) (n int, err error)

	// Write writes data to the connection.
	// Write can be made to time out and return an error after a fixed
	// time limit; see SetDeadline and SetWriteDeadline.
	Write(b []byte) (n int, err error)

  // Conn is a generic stream-oriented network connection.
  // Multiple goroutines may invoke methods on a Conn simultaneously.

Both Read and Write allows for concurrent invokations by multiple goroutines, but obviously it’s impossible to determine the stream status and if which specific routine actually get the data after a read. So for the specific use case, it is not possible to directly share the net.Conn between multiple routines, because we need to ensure that multiple write and reads are owned by a single routine.

Naif solution using a mutex

First of all, let’s define the interface that each routine will use to interact with the shared connection to:

  • write and read
  • set a deadline

This interface prevent the routine to close the connection or tweak the net.Conn: That responsibility is of the SharedConn object.

type deadlineable interface {
	SetReadDeadline(t time.Time) error
	SetWriteDeadline(t time.Time) error
	SetDeadline(t time.Time) error
}

type Conn interface {
	io.ReadWriter
	deadlineable
}

Then here’s the definition of the shared connection. It use a simple mutex to manage the internal net.Conn

type SharedConn struct {
	m    sync.Mutex
	c    net.Conn
	addr string
}

func New(addr string) *SharedConn {
	return &SharedConn{
		addr: addr,
	}
}

SharedConn has 2 public methods. The first one is to use the connection. It accepts a closure that is invoked with a Conn intance.

Inside that closure a routine can use the connection with no race-conditions. When the closure ends, the connections is assigned to another routine waiting for the lock 8if there is one).

The underlrying net.Conn is kept always opened. The closure may also return an error to signal that connection must be recovered. In That case the SharedConn try to open a new socket.

func (cm *SharedConn) Use(f func(conn Conn) error) error {
	cm.m.Lock()
	defer cm.m.Unlock()
	c, err := cm.refreshConn()
	if err != nil {
		return fmt.Errorf("error refreshing connection: %v", err)
	}
	err = f(c)
	if err != nil {
		cm.Close()
		return fmt.Errorf("error using connection: %v", err)
	}
	return nil
}

func (cm *SharedConn) refreshConn() (net.Conn, error) {
	if cm.hasConn() {
		return cm.c, nil
	}
	c, err := net.Dial("tcp", cm.addr)
	if err != nil {
		return nil, fmt.Errorf("error on refresh connection : %v", err)
	}
	cm.c = c
	return cm.c, nil
}

func (cm *SharedConn) hasConn() bool {
	return cm.c != nil
}

The second method is used to Close the connection when it is not needed anymore, so that the underlying net.Conn can be released.

func (cm *SharedConn) Close() error {
	if cm.c != nil {
		err := cm.c.Close()
		cm.c = nil
		return err
	}
	return nil
}

Let’s test it!

func TestSharedConnUse(t *testing.T) {
  	// Shared connection client
	sharedConn := New("localhost:33224")
	defer sharedConn.Close()

  	// Simple tcp serve that listen and echoes all bytes sent
	go tcpServer("33224")
	time.Sleep(50 * time.Millisecond)

  	// In the test, each goroutine send a number and read the same number from the tcp server
  	// saving the response in the map. To succeed, all goroutines have to write and read it's own number
	testMap := make(map[int]int)
	wg := sync.WaitGroup{}
	for i := 0; i < 10000; i++ {
		wg.Add(1)
		go func(conn *SharedConn, i int) {
			err := sharedConn.Use(func(c Conn) error {

				// Write the number string
				_, err := c.Write([]byte(strconv.Itoa(i)))
				if err != nil {
					return err
				}

				// Read the number echoed by the server
				buffer := make([]byte, 100)
				n, err := c.Read(buffer)
				if err != nil || n == 0 {
					return err
				}

				//update the map with the readed number
				testMap[i], err = strconv.Atoi(string(buffer[0:n]))
				if err != nil {
					return err
				}
				wg.Done()
				return nil
			})
			if err != nil {
				log.Fatal(err)
			}
		}(sharedConn, i)
	}
	wg.Wait()

   // Check that all the goroutines have sent and received the right number
	for i, v := range testMap {
		if i != v {
			t.Errorf("i != v: %d != %d", i, v)
		}
	}
}

In the second part, i’ll explore an alternative approach for request / reply cases based on channels