In Part 2 of the series, i’m going to use channels to manage a request / response on a raw, shared TCP socket. If we assume that the interaction between the service and the client is always a simple request / response on the socket (see Part 1 for the use case), then:

  1. The client send a request (command or polling for data).
  2. Then it wait for the response in the socket, reading from socket buffer until the message frame is completed.
  3. It is guaranteed that service never send additional messages in the stream after the response, nor the client send additional requests before the response is delivered.
  4. Payload size for both requests and response is reasonable. Reasonable means that there’s no need to use the stream for long time, but all the messages are made by few TCP packets quicly delivered in burst.

With these constraints, intead of giving the client routine the responsibility of managing the whole connection, it is possible to wrap the implementation in the shared connection object, so that the client routine has to provide only:

  1. The request payload.
  2. A channel to accept the response / error.
  3. A guard function that check if message is complete or further reads are needed.

Channel definition

First of all let’s define the reponse channel struct the client routine will receive on:

type Response struct {
	Data []byte
	Err  error
}

Request / response implementation

The method interface accept the request payload, the channel receiving the response and the guard function (that depends on the protocol defined by the service). Channel is also used to signal any error occured during the request.

func (cm *SharedConn) Request(req []byte, resp chan<- Response, readComplete func([]byte) bool) {
	var buffer []byte
	err := cm.Use(func(c Conn) error {
		// deadline is the same for every request and response
		err := c.SetDeadline(time.Now().Add(cm.RequestDeadline))
		if err != nil {
			return err
		}
		
		// send the request
		_, err = c.Write(req)
		if err != nil {
			return err
		}
		
		// get the response using standard bufio scanner
		scanner := newScanner(c, readComplete)
		scanner.Scan()
		
		// if an error occurred, signal it to shared connection
		// so that the connection can be refreshed
		err = scanner.Err()
		if err != nil {
			return err
		}
		buffer = scanner.Bytes()
		return nil
	})
	resp <- Response{Data: buffer, Err: err}
}

The implementation takes advantage of the “Use” method (defined in part 1) to:

  1. Set a deadline.
  2. Write the request on the socket.
  3. Wait for the response (making all the reads necessary to complete the message frame, according to the readComplete guard).

The Use method is responsible for ensure atomic access to the socket by the current routine. In that way the request and response message is delivered to the same client with no race conditions.

Scanner and message framing

To check when the response message is complete, the standard bufio.Scanner features are leveraged, using a custom split function:

func newScanner(conn io.Reader, readComplete func([]byte) bool) *bufio.Scanner {
	splitFunc := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
		if atEOF {
			return len(data), data, io.EOF
		}
		if readComplete(data) {
			return len(data), data, nil
		}
		return 0, nil, nil
	}
	scanner := bufio.NewScanner(conn)
	scanner.Split(splitFunc)
	return scanner
}

The Split function embed the guard passed by the client routine, and make additional checks in case of EOF. When the guard returns true, the scan terminates and return the token, that in this case is the entire message frame. By default scanner limits the token size to a max of 65Kb (much more than needed), but is is possible to set a custom buffer size if needed.

Testing time

For testing, a concurrent set of client connects to the same echo tcp server used in part 1, sending a random string as request and waiting for the response in the channel. At the end, all the routines get in the response channel the right response.

sharedConn := New("localhost:33224")
defer sharedConn.Close()

go tcpServer("33224")
time.Sleep(1 * time.Millisecond)

// result channel is used to check that actual response and wanted one match
result := make(chan struct {
  want   string
  actual string
}, 10000)

wg := sync.WaitGroup{}

// for the test purpose, the guard check if the last message char is a newline as STR,
// to simulate a text, line based response
readComplete := func(data []byte) bool {
  return data[len(data)-1] == '\n'
}

for i := 0; i < 10000; i++ {
  wg.Add(1)
  go func(conn *SharedConn, i int) {
    resp := make(chan Response, 1)
    str := randStringBytes(1000)

    // send the request and wait for response in channel.
    // because channel is buffered and connection timeout is set, 
    // a reponse is always sent back to the client
    conn.Request([]byte(str), resp, readComplete)
    r := <-resp
    if r.Err != nil {
      t.Errorf(r.Err.Error())
    }
    result <- struct {
      want   string
      actual string
    }{str, string(r.Data)}
    wg.Done()
  }(sharedConn, i)
}
wg.Wait()

// check if all routines got the righr response
close(result)
for v := range result {
  if v.want != v.actual {
    t.Errorf("i != v: %s != %s", v.want, v.actual)
  }
}