Collecting and plotting live data with Go

My recent talk, Practical Go Programming, describes the construction of the URL shortener program goto, and concludes with a graphical demonstration of the program being tested under load. It tests goto running as a single master, and one, two, and three slaves in front of one master. Another Go program plots the number of Get (redirect) and Put (shorten) operations served by each process on a line graph. In this post I will describe how those statistics are gathered, collated, and displayed.

First off, I should let you know that I did this in a hurry. The night before I first delievered the talk (at OSDC in Melbourne) I realised it should end with something visual, some display that would show just how capable and efficient this simple Go program is. So after just under an hour of hacking I had produced the stat package, the corresponding stat server, and the stress-testing program bench. This disclaimer serves both to excuse the quality of the code, and a testament to Go’s utility as a language.

bench

The bench program simply fires off repeated HTTP requests to one or more goto servers. When bench starts it launches several goroutines:

func main() {
    for i := 0; i < 20; i++ {
        go loop(get, getDelay)
    }
    for i := 0; i < 2; i++ {
        go loop(post, postDelay)
    }
}

func loop(fn func(), delay int64) {
    for {
        fn()
        time.Sleep(getDelay)
    }
}

The get function makes a URL redirect request to the goto server, and post makes a URL shortening request. To make the GET requests to valid short URLs, bench records each short URL returned by post and provides them to get as needed. This is coordinated through two channels, newURL and randURL:

var (
    newURL  = make(chan string)
    randURL = make(chan string)
)

The post function sends each shortened URL it receives from goto to the newURL channel:

func post() {
    url := "http://master/add"
    r, err := http.PostForm(url, map[string]string{"url": fooUrl})
    if err != nil {
        log.Println("post:", err)
        return
    }
    defer r.Body.Close()
    b, err := ioutil.ReadAll(r.Body)
    if err != nil {
        log.Println("post:", err)
        return
    }
    newURL <- string(b)
}

While get receives random short URLs from randURL:

func get() {
    url := <-randURL
    r, err := http.Get(url)
    if err != nil {
        log.Println("get:", err)
    }
}

The keeper function runs in its own goroutine and maintains a slice of short URLs. When keeper receives a value from newURL it adds the value to the slice. At the same time, it attempts to send URLs picked randomly from the slice to randURL.

func keeper() {
    var urls []string
    urls = append(urls, <-newURL)
    for {
        r := urls[rand.Intn(len(urls))] // choose random url
        select {
        case u := <-newURL:
            urls = append(urls, u)
        case randURL <- r:
            // random url sent
        }
    }
}

This is a nice example of a Go’s select statement, where it attempts to receive and and send data at the same instant, and selects whichever operation is ready first.

There’s a little more to the bench program, such as configuration through command-line flags, but I’ll leave their discovery as an exercise for the reader.

stat

The stat package enables any Go program to count events that occur during a specified time period (in this case, each second) and report them to a statistics server via RPC. It exposes a simple interface:

func Monitor(addr string)

var (
    In      = make(chan string, 100)
    Process = "default"
)

To use stat, a process should launch a new goroutine and call Monitor, its first argument being the address of the statistics server.

The stat.Process variable can be set to a string that describes the process, which will eventually be used to name the data series on the line chart. In the case of goto, I set it to the process' HTTP listen port.

For each event to be counted, the process sends an identifying string to stat.In. For example, each time goto handles a Get request it sends the string "get" to In.

The Monitor function opens an RPC connection to the server, and then receives events from In and counts them, while periodically singing updates to the stats server. It uses a map of counter values to keep track of the data for each series, and a time.Ticker to trigger server updates.

type counter struct {
    total, period, cycles int64
}

func Monitor(addr string) {
    counters := make(map[string]*counter)
    client, err := rpc.DialHTTP("tcp", addr)
    if err != nil {
        log.Fatal(err)
    }
    t := time.NewTicker(period)
    for {
        select {
        case <-t.C:
            update(client, counters)
        case s := <-In:
            c, ok := counters[s]
            if !ok {
                c = &counter{}
                counters[s] = c
            }
            c.period++
        }
    }
}

The update function constructs a Point for each series, and sends it to the stats server via a Server.Update rpc request.

type Point struct {
    Process string
    Series  string
    Value   int64
}

func update(client *rpc.Client, counters map[string]*counter) {
    for series, c := range counters {
        c.total += c.period
        c.cycles++
        p := Point{Process, series, c.period}
        err := client.Call("Server.Update", &p, &struct{}{})
        if err != nil {
            log.Println("stat update:", err)
        }
        c.period = 0
    }
}

The stat package exports the Point struct (by naming it with a capital letter) so that the stat server itself can use it.

stat/server

The stat server is an RPC server that exposes one method, Server.Update, and an HTTP server that serves some static HTML and JavaScript to display the chart, and the live chart data as JSON.

The Server type contains a map that holds slices of coordinates keyed by series name:

type Server struct {
    series map[string][][2]int64
    start  int64 // store points relative to start time
    mu     sync.Mutex
}

func NewServer() *Server {
    return &Server{
        series: make(map[string][][2]int64),
        start:  time.Nanoseconds(),
    }
}

Its Update method accepts updates via RPC from client programs using the stat library:

func (s *Server) Update(args *stat.Point, r *struct{}) os.Error {
    s.mu.Lock()
    defer s.mu.Unlock()

    // append point to series
    key := args.Process + " " + args.Series
    second := (time.Nanoseconds() - s.start) / 100e6
    point := [2]int64{second, args.Value}
    s.series[key] = append(s.series[key], point)

    // trim series to maxLen
    if sk := s.series[key]; len(sk) > *maxLen {
        sk = sk[len(sk)-*maxLen:]
    }

    return nil
}

Not the cleanest piece of code I’ve written, but in essence it just tracks data series keyed by the Process and Series provided by the client, and only maintains the last maxLen points (configurable by a command-line flag).

The ServeHTTP method simply serves the current data as a JSON blob, allowing us to use a *Server as an http.Handler.

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    s.mu.Lock()
    defer s.mu.Unlock()
    w.SetHeader("Content-Type", "application/json")
    json.NewEncoder(w).Encode(s.series)
}

The server’s main function sets all this up, as well as the Static handler for serving the HTML and JavaScript.

func main() {
    flag.Parse()
    server := NewServer()
    rpc.Register(server)
    rpc.HandleHTTP()
    http.HandleFunc("/", Static)
    http.Handle("/get", server)
    http.ListenAndServe(*listenAddr, nil)
}

The front-end uses the Prototype and Flotr libraries to draw the graph. When the document loads, it kicks off a timer to make Ajax requests to /get every second:

document.observe('dom:loaded', function(){
    setInterval(function() {
        new Ajax.Request('/get', { onSuccess: draw })
    }, 1000)
});

The draw function then massages that data into the appropriate format to be plotted by Flotr:

function draw(data) {
    var series = [];
    Object.keys(data.responseJSON).each(function(key) {
        series.push({
            label: key,
            data: data.responseJSON[key]
        })
    });
    var f = Flotr.draw($('container'), series);
}

And that, in essence, is all there is to it.

The stat package and server are pretty limited right now, but I would be interested in extending it further. If you use this code I’d be curious to hear about it.

41544 views and 3 responses

  • Mar 10 2011, 3:39 AM
    Alexander responded:
    I think, I see bugs:

    func (s *Server) Update(args *stat.Point, r *struct{}) os.Error {
    s.mu.Lock()
    defer s.mu.Unlock()

    // append point to series
    key := args.Process + " " + args.Series
    - second := (time.Nanoseconds() - s.start) / 100e6
    + second := (time.Nanoseconds() - s.start) / 1e9
    point := [2]int64{second, args.Value}
    s.series[key] = append(s.series[key], point)

    // trim series to maxLen
    if sk := s.series[key]; len(sk) > *maxLen {
    - sk = sk[len(sk)-*maxLen:]
    + s.series[key] = sk[len(sk)-*maxLen:]
    }

    return nil
    }

  • Mar 20 2011, 6:52 PM
    Steve Phillips liked this post.
  • Mar 23 2011, 6:38 PM
    Jay Graves responded:
    in the 'loop' function in the 'bench' program shouldn't the sleep call use the 'delay' argument instead of the 'getDelay' constant? Typo?