In Part 2 of the series, i’m going to use channels to manage a request / response on a raw, shared TCP socket. If we assume that the interaction between the service and the client is always a simple request / response on the socket (see Part 1 for the use case), then:
- The client send a request (command or polling for data).
- Then it wait for the response in the socket, reading from socket buffer until the message frame is completed.
- It is guaranteed that service never send additional messages in the stream after the response, nor the client send additional requests before the response is delivered.
- Payload size for both requests and response is reasonable. Reasonable means that there’s no need to use the stream for long time, but all the messages are made by few TCP packets quicly delivered in burst.
With these constraints, intead of giving the client routine the responsibility of managing the whole connection, it is possible to wrap the implementation in the shared connection object, so that the client routine has to provide only:
- The request payload.
- A channel to accept the response / error.
- A guard function that check if message is complete or further reads are needed.
Channel definition
First of all let’s define the reponse channel struct the client routine will receive on:
type Response struct {
Data []byte
Err error
}
Request / response implementation
The method interface accept the request payload, the channel receiving the response and the guard function (that depends on the protocol defined by the service). Channel is also used to signal any error occured during the request.
func (cm *SharedConn) Request(req []byte, resp chan<- Response, readComplete func([]byte) bool) {
var buffer []byte
err := cm.Use(func(c Conn) error {
// deadline is the same for every request and response
err := c.SetDeadline(time.Now().Add(cm.RequestDeadline))
if err != nil {
return err
}
// send the request
_, err = c.Write(req)
if err != nil {
return err
}
// get the response using standard bufio scanner
scanner := newScanner(c, readComplete)
scanner.Scan()
// if an error occurred, signal it to shared connection
// so that the connection can be refreshed
err = scanner.Err()
if err != nil {
return err
}
buffer = scanner.Bytes()
return nil
})
resp <- Response{Data: buffer, Err: err}
}
The implementation takes advantage of the “Use” method (defined in part 1) to:
- Set a deadline.
- Write the request on the socket.
- Wait for the response (making all the reads necessary to complete the message frame, according to the
readComplete
guard).
The Use
method is responsible for ensure atomic access to the socket by the current routine. In that way the request and response message is delivered to the same client with no race conditions.
Scanner and message framing
To check when the response message is complete, the standard bufio.Scanner
features are leveraged, using a custom split function:
func newScanner(conn io.Reader, readComplete func([]byte) bool) *bufio.Scanner {
splitFunc := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF {
return len(data), data, io.EOF
}
if readComplete(data) {
return len(data), data, nil
}
return 0, nil, nil
}
scanner := bufio.NewScanner(conn)
scanner.Split(splitFunc)
return scanner
}
The Split
function embed the guard passed by the client routine, and make additional checks in case of EOF.
When the guard returns true, the scan terminates and return the token, that in this case is the entire message frame. By default scanner limits the token size to a max of 65Kb (much more than needed), but is is possible to set a custom buffer size if needed.
Testing time
For testing, a concurrent set of client connects to the same echo tcp server used in part 1, sending a random string as request and waiting for the response in the channel. At the end, all the routines get in the response channel the right response.
sharedConn := New("localhost:33224")
defer sharedConn.Close()
go tcpServer("33224")
time.Sleep(1 * time.Millisecond)
// result channel is used to check that actual response and wanted one match
result := make(chan struct {
want string
actual string
}, 10000)
wg := sync.WaitGroup{}
// for the test purpose, the guard check if the last message char is a newline as STR,
// to simulate a text, line based response
readComplete := func(data []byte) bool {
return data[len(data)-1] == '\n'
}
for i := 0; i < 10000; i++ {
wg.Add(1)
go func(conn *SharedConn, i int) {
resp := make(chan Response, 1)
str := randStringBytes(1000)
// send the request and wait for response in channel.
// because channel is buffered and connection timeout is set,
// a reponse is always sent back to the client
conn.Request([]byte(str), resp, readComplete)
r := <-resp
if r.Err != nil {
t.Errorf(r.Err.Error())
}
result <- struct {
want string
actual string
}{str, string(r.Data)}
wg.Done()
}(sharedConn, i)
}
wg.Wait()
// check if all routines got the righr response
close(result)
for v := range result {
if v.want != v.actual {
t.Errorf("i != v: %s != %s", v.want, v.actual)
}
}