Reading JSON GPSD Data using Network Sockets with Golang

As part of my Raspberry Pi NTP Server, I wanted the system monitoring daemon to display GPSD information on my LCD display. GPSD is a daemon that listens for GPS data from a receiver. It supports a wide variety of chips and make it easier to write generic code that supports a variety of GPS devices. In addition to text and graphical user interfaces, GPSD provides a very simple to use JSON interface that can be accessed via a network socket.

To access the data, the first thing we need to do is to create a goroutine that will connect to GPSD and receive data. One of the things about golang IO is there’s just not a way to do non-blocking IO. To get around that, I use a goroutine to handle reading from GPSD.

Here’s the startup code. It establishes the TCP connection, and puts it into WATCH mode. Then, it creates a text scanner that reads from the socket. As lines are received, they’re sent to the data channel for processing.

// Goroutine to background update the GPS data.
func GPSDMonitor() {
  var conn net.Conn
  ch := make(chan string, 32)
  initComplete := make(chan int)
  var scanner *bufio.Scanner
  go func() {
    for {
      if conn == nil {
        fmt.Printf("Establishing TCP Connection to %s:%d\n", gpsdSource, gpsdPort)
        tcpAddr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%s:%d", gpsdSource, gpsdPort))
        if err != nil {
          log.Fatal(err)
        }
        conn, err = net.DialTCP("tcp", nil, tcpAddr)
        if err != nil {
          log.Fatal(err)
        }
        conn.Write([]byte("?WATCH={\"enable\": true}\n?POLL;\n"))
        rdr := bufio.NewReader(conn)
        scanner = bufio.NewScanner(rdr)
        scanner.Split(bufio.ScanLines)

      }
      initComplete <- 1
      for scanner.Scan() {
        line := scanner.Text()
        ch <- line
      }
      log.Print("Broke from scanner.scan()")
    }
  }()
  <- initComplete
  ...
}

One of the things really worth mentioning in the code above is the initComplete channel. I don’t want to enter the code that reads the data until the connection is established and things are initialized. My first effort looked something like:

var conn net.Conn
go func(){
  if conn==nil {
    connTmp = (create the connection)...
    scanner = (create scanner)
    conn = connTmp
    for scanner.Scan() {
        ch <- scanner.Text()
    }
  }
}()
for conn == nil {
  time.Sleep(100 * time.Millisecond)
}
...

I wasn’t really happy with this. I know from experience that compilers can re-order operations very unexpectedly and variables can be assigned in an order you don’t expect. Clearly, some synchronization would be better. My second effort was:

var initLock sync.Mutex
var conn net.Conn
go func() {
  for {
    if conn==nil {
      initLock.Lock()
      conn = (create the actual connection)
      scanner = (create scanner)
      initLock.Unlock()
    }
  }  
}()
// Block here until the initialization is complete.
initLock.Lock()
initLock.Unlock()
// Begin rest of block.

The naive assumption of this effort was that initLock.Lock() would be called in the goroutine first. What actually happened was the goroutine didn’t start running until after the main branch had blown through the synchronization block. To my credit, it didn’t take me a long time to realize what I had done. My final effort at synchronizing the initialization used a channel, and it looked like this:

var conn net.Conn
initComplete := make(chan int)
go func() {
  for {
    if conn==nil {
      conn = (create the actual connection)
      scanner = (create scanner)
      initComplete <- 1
    }
  }  
}()
// Block here until the initialization is complete.
<- initComplete
// Begin rest of block.

With this locking, I’m guaranteed that the connection is initialized before the processing code begins executing. The read from the initComplete channel ( <- initComplete) will block until there’s data present. Data isn’t present until the initialization is complete.

The next piece is the processor that reads the lines and calls the parsers. When a line of data is received, the select statement unblocks. If no lines are received then after the poll interval has elapsed, the time.After(pollInterval) statement unblocks and a poll command is sent.

  for {
    // This is the poll/process loop that accepts the
    // received lines and processes them.
    <-initComplete
    readLines := false
    for conn != nil {
      select {
      case line := <-ch:
        processLine(line)
        readLines = true
      case <-time.After(pollInterval):
        if readLines {
          conn.Write([]byte("?POLL;\n"))
          readLines = false
        } else {
          log.Print("Didn't get any data on GPSD poll. Closing connection.")
          conn.Close()
          conn = nil
        }
      }
    }
  }

The next part is to actually process the received lines. There is a golang library for GPSD which defines structures for the GPSD data records and handles unmarshalling the JSON response data into variables. Since I wanted to understand using JSON I didn’t go this route. Instead, I wrote my own unmarshalling code. Here’s an example of how I unmarshall the TPV record from GPSD.

func processLine(line string) {
  if len(line) < 16 {
    return
  }
  var f interface{}
  err := json.Unmarshal([]byte(line), &f)
  if err != nil {
    log.Print(err)
    return
  }

  m := f.(map[string]interface{})
  cl := m["class"]
  switch cl {
  case "VERSION":
    version = fmt.Sprintf("GPSD v%s", m["release"].(string))
  case "DEVICES":
    processDevices(m["devices"].([]interface{}))
  case "POLL":
    processPoll(m)
  case "WATCH":
  default:
  }
}

func processPoll(m map[string]interface{}) {
  for k, v := range m {
    switch k {
    case "sky":
      processSky(v.([]interface{})[0].(map[string]interface{}))
    case "tpv":
      processTPV(v.([]interface{})[0].(map[string]interface{}))
    }
  }
}

An example TPV data record looks like this:

     map[alt:1600
         class:TPV
         climb:0
         device:/dev/ttyS5
         eps:9.37
         ept:0.005
         epv:13.703
         epx:6.019
         epy:6.019
         lat:39.923813333
         lon:-105.4332213
         mode:3
         speed:0
         status:2
         time:2021-07-18T16:30:55.000Z
         4track:201]

To unmarshall the TPV data, is pretty ugly. Here’s a picture of how it’s done. It de-references the lat value, obtaining a float value by performing a type assertion to float64. If lat were not a float64 value, that type assertion would generate an error.

To handle keys that might not be present, you use the second form of reading a map value. In the example below, altHAE. If it’s not present, then an attempt is made to read altMSL.

  latitude = m["lat"].(float64)
  longitude = m["lon"].(float64)
  if v, ok := m["altHAE"]; ok {
    altitude = v.(float64)
  } else if v, ok := m["altMSL"]; ok {
    altitude = v.(float64)
  } else {
    if v, ok := m["alt"]; ok {
      altitude = v.(float64)
    } else {
      altitude = 9999.0
    }
  }

Conclusion

Processing the socket IO is pretty tricky. Some of this is how I chose to use GPSD. There is a mode where GPSD will continuously stream JSON records as it’s state is updated. I decided that I didn’t need the data that frequently, and didn’t want to pay the CPU overhead of parsing data continuously.

Using goroutines allowed me to have a connection/receiver, and a separate, simpler line processor. If I were writing this in another language, I would probably write the routine to read from the network socket with a specified timeout value.

Now that I understand how to manually unmarshall the JSON, in other areas, I’ll use the built-in golang methods for handling it.