Collecting and plotting live data with Go
My recent talk, Practical Go
Programming,
describes the construction of the URL shortener program
goto, and concludes with a graphical demonstration
of the program being tested under load. It tests goto running as a single
master, and one, two, and three slaves in front of one master. Another Go
program plots the number of Get
(redirect) and Put
(shorten) operations
served by each process on a line graph. In this post I will describe how those
statistics are gathered, collated, and displayed.
First off, I should let you know that I did this in a hurry. The night before I first delievered the talk (at OSDC in Melbourne) I realised it should end with something visual, some display that would show just how capable and efficient this simple Go program is. So after just under an hour of hacking I had produced the stat package, the corresponding stat server, and the stress-testing program bench. This disclaimer serves both to excuse the quality of the code, and a testament to Go’s utility as a language.
bench
The bench program simply fires off repeated HTTP requests to one or more goto servers. When bench starts it launches several goroutines:
func main() { for i := 0; i < 20; i++ { go loop(get, getDelay) } for i := 0; i < 2; i++ { go loop(post, postDelay) } } func loop(fn func(), delay int64) { for { fn() time.Sleep(getDelay) } }
The get
function makes a URL redirect request to the goto server, and post
makes a URL shortening request. To make the GET requests to valid short URLs,
bench records each short URL returned by post
and provides them to get
as
needed. This is coordinated through two channels, newURL
and randURL
:
var ( newURL = make(chan string) randURL = make(chan string) )
The post
function sends each shortened URL it receives from goto to the
newURL
channel:
func post() { url := "http://master/add" r, err := http.PostForm(url, map[string]string{"url": fooUrl}) if err != nil { log.Println("post:", err) return } defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) if err != nil { log.Println("post:", err) return } newURL <- string(b) }
While get
receives random short URLs from randURL
:
func get() { url := <-randURL r, err := http.Get(url) if err != nil { log.Println("get:", err) } }
The keeper
function runs in its own goroutine and maintains a slice of short
URLs. When keeper
receives a value from newURL
it adds the value to the
slice. At the same time, it attempts to send URLs picked randomly from the
slice to randURL
.
func keeper() { var urls []string urls = append(urls, <-newURL) for { r := urls[rand.Intn(len(urls))] // choose random url select { case u := <-newURL: urls = append(urls, u) case randURL <- r: // random url sent } } }
This is a nice example of a Go’s select statement, where it attempts to receive and and send data at the same instant, and selects whichever operation is ready first.
There’s a little more to the bench program, such as configuration through command-line flags, but I’ll leave their discovery as an exercise for the reader.
stat
The stat package enables any Go program to count events that occur during a specified time period (in this case, each second) and report them to a statistics server via RPC. It exposes a simple interface:
func Monitor(addr string) var ( In = make(chan string, 100) Process = "default" )
To use stat, a process should launch a new goroutine and call Monitor
, its
first argument being the address of the statistics server.
The stat.Process
variable can be set to a string that describes the process,
which will eventually be used to name the data series on the line chart.
In the case of goto, I set it to the process' HTTP listen port.
For each event to be counted, the process sends an identifying string to
stat.In
. For example, each time goto handles a Get
request it sends
the string "get"
to In
.
The Monitor
function opens an RPC connection to the server, and then receives
events from In
and counts them, while periodically singing updates to the
stats server. It uses a map of counter
values to keep track of the data for
each series, and a time.Ticker
to trigger server updates.
type counter struct { total, period, cycles int64 } func Monitor(addr string) { counters := make(map[string]*counter) client, err := rpc.DialHTTP("tcp", addr) if err != nil { log.Fatal(err) } t := time.NewTicker(period) for { select { case <-t.C: update(client, counters) case s := <-In: c, ok := counters[s] if !ok { c = &counter{} counters[s] = c } c.period++ } } }
The update
function constructs a Point
for each series, and sends it to the
stats server via a Server.Update
rpc request.
type Point struct { Process string Series string Value int64 } func update(client *rpc.Client, counters map[string]*counter) { for series, c := range counters { c.total += c.period c.cycles++ p := Point{Process, series, c.period} err := client.Call("Server.Update", &p, &struct{}{}) if err != nil { log.Println("stat update:", err) } c.period = 0 } }
The stat package exports the Point
struct (by naming it with a capital
letter) so that the stat server itself can use it.
stat/server
The stat server is an
RPC server that exposes one method, Server.Update
, and
an HTTP server that serves some static HTML and JavaScript to display the
chart, and the live chart data as JSON.
The Server
type contains a map that holds slices of coordinates keyed by
series name:
type Server struct { series map[string][][2]int64 start int64 // store points relative to start time mu sync.Mutex } func NewServer() *Server { return &Server{ series: make(map[string][][2]int64), start: time.Nanoseconds(), } }
Its Update
method accepts updates via RPC from client programs using the stat
library:
func (s *Server) Update(args *stat.Point, r *struct{}) os.Error { s.mu.Lock() defer s.mu.Unlock() // append point to series key := args.Process + " " + args.Series second := (time.Nanoseconds() - s.start) / 100e6 point := [2]int64{second, args.Value} s.series[key] = append(s.series[key], point) // trim series to maxLen if sk := s.series[key]; len(sk) > *maxLen { sk = sk[len(sk)-*maxLen:] } return nil }
Not the cleanest piece of code I’ve written, but in essence it just tracks data
series keyed by the Process
and Series
provided by the client, and only
maintains the last maxLen
points (configurable by a command-line flag).
The ServeHTTP
method simply serves the current data as a JSON blob, allowing
us to use a *Server
as an http.Handler
.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mu.Lock() defer s.mu.Unlock() w.SetHeader("Content-Type", "application/json") json.NewEncoder(w).Encode(s.series) }
The server’s main
function sets all this up, as well as the Static
handler
for serving the HTML and JavaScript.
func main() { flag.Parse() server := NewServer() rpc.Register(server) rpc.HandleHTTP() http.HandleFunc("/", Static) http.Handle("/get", server) http.ListenAndServe(*listenAddr, nil) }
The front-end uses the Prototype and
Flotr libraries to draw the graph.
When the document loads, it kicks off a timer to make Ajax requests to /get
every second:
document.observe('dom:loaded', function(){ setInterval(function() { new Ajax.Request('/get', { onSuccess: draw }) }, 1000) });
The draw
function then massages that data into the appropriate format to be
plotted by Flotr:
function draw(data) { var series = []; Object.keys(data.responseJSON).each(function(key) { series.push({ label: key, data: data.responseJSON[key] }) }); var f = Flotr.draw($('container'), series); }
And that, in essence, is all there is to it.
The stat package and server are pretty limited right now, but I would be interested in extending it further. If you use this code I’d be curious to hear about it.
41544 views and 3 responses
-
Mar 10 2011, 3:39 AMAlexander responded:I think, I see bugs:
func (s *Server) Update(args *stat.Point, r *struct{}) os.Error {
s.mu.Lock()
defer s.mu.Unlock()// append point to series
key := args.Process + " " + args.Series
- second := (time.Nanoseconds() - s.start) / 100e6
+ second := (time.Nanoseconds() - s.start) / 1e9
point := [2]int64{second, args.Value}
s.series[key] = append(s.series[key], point)// trim series to maxLen
if sk := s.series[key]; len(sk) > *maxLen {
- sk = sk[len(sk)-*maxLen:]
+ s.series[key] = sk[len(sk)-*maxLen:]
}return nil
} -
Mar 20 2011, 6:52 PMSteve Phillips liked this post.
-
Mar 23 2011, 6:38 PMJay Graves responded:in the 'loop' function in the 'bench' program shouldn't the sleep call use the 'delay' argument instead of the 'getDelay' constant? Typo?