Collecting and plotting live data with Go
My recent talk, Practical Go
Programming,
describes the construction of the URL shortener program
goto, and concludes with a graphical demonstration
of the program being tested under load. It tests goto running as a single
master, and one, two, and three slaves in front of one master. Another Go
program plots the number of Get (redirect) and Put (shorten) operations
served by each process on a line graph. In this post I will describe how those
statistics are gathered, collated, and displayed.
First off, I should let you know that I did this in a hurry. The night before I first delievered the talk (at OSDC in Melbourne) I realised it should end with something visual, some display that would show just how capable and efficient this simple Go program is. So after just under an hour of hacking I had produced the stat package, the corresponding stat server, and the stress-testing program bench. This disclaimer serves both to excuse the quality of the code, and a testament to Go’s utility as a language.
bench
The bench program simply fires off repeated HTTP requests to one or more goto servers. When bench starts it launches several goroutines:
func main() {
for i := 0; i < 20; i++ {
go loop(get, getDelay)
}
for i := 0; i < 2; i++ {
go loop(post, postDelay)
}
}
func loop(fn func(), delay int64) {
for {
fn()
time.Sleep(getDelay)
}
}The get function makes a URL redirect request to the goto server, and post
makes a URL shortening request. To make the GET requests to valid short URLs,
bench records each short URL returned by post and provides them to get as
needed. This is coordinated through two channels, newURL and randURL:
var (
newURL = make(chan string)
randURL = make(chan string)
)The post function sends each shortened URL it receives from goto to the
newURL channel:
func post() {
url := "http://master/add"
r, err := http.PostForm(url, map[string]string{"url": fooUrl})
if err != nil {
log.Println("post:", err)
return
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println("post:", err)
return
}
newURL <- string(b)
}While get receives random short URLs from randURL:
func get() {
url := <-randURL
r, err := http.Get(url)
if err != nil {
log.Println("get:", err)
}
}The keeper function runs in its own goroutine and maintains a slice of short
URLs. When keeper receives a value from newURL it adds the value to the
slice. At the same time, it attempts to send URLs picked randomly from the
slice to randURL.
func keeper() {
var urls []string
urls = append(urls, <-newURL)
for {
r := urls[rand.Intn(len(urls))] // choose random url
select {
case u := <-newURL:
urls = append(urls, u)
case randURL <- r:
// random url sent
}
}
}This is a nice example of a Go’s select statement, where it attempts to receive and and send data at the same instant, and selects whichever operation is ready first.
There’s a little more to the bench program, such as configuration through command-line flags, but I’ll leave their discovery as an exercise for the reader.
stat
The stat package enables any Go program to count events that occur during a specified time period (in this case, each second) and report them to a statistics server via RPC. It exposes a simple interface:
func Monitor(addr string)
var (
In = make(chan string, 100)
Process = "default"
)To use stat, a process should launch a new goroutine and call Monitor, its
first argument being the address of the statistics server.
The stat.Process variable can be set to a string that describes the process,
which will eventually be used to name the data series on the line chart.
In the case of goto, I set it to the process' HTTP listen port.
For each event to be counted, the process sends an identifying string to
stat.In. For example, each time goto handles a Get request it sends
the string "get" to In.
The Monitor function opens an RPC connection to the server, and then receives
events from In and counts them, while periodically singing updates to the
stats server. It uses a map of counter values to keep track of the data for
each series, and a time.Ticker to trigger server updates.
type counter struct {
total, period, cycles int64
}
func Monitor(addr string) {
counters := make(map[string]*counter)
client, err := rpc.DialHTTP("tcp", addr)
if err != nil {
log.Fatal(err)
}
t := time.NewTicker(period)
for {
select {
case <-t.C:
update(client, counters)
case s := <-In:
c, ok := counters[s]
if !ok {
c = &counter{}
counters[s] = c
}
c.period++
}
}
}The update function constructs a Point for each series, and sends it to the
stats server via a Server.Update rpc request.
type Point struct {
Process string
Series string
Value int64
}
func update(client *rpc.Client, counters map[string]*counter) {
for series, c := range counters {
c.total += c.period
c.cycles++
p := Point{Process, series, c.period}
err := client.Call("Server.Update", &p, &struct{}{})
if err != nil {
log.Println("stat update:", err)
}
c.period = 0
}
}The stat package exports the Point struct (by naming it with a capital
letter) so that the stat server itself can use it.
stat/server
The stat server is an
RPC server that exposes one method, Server.Update, and
an HTTP server that serves some static HTML and JavaScript to display the
chart, and the live chart data as JSON.
The Server type contains a map that holds slices of coordinates keyed by
series name:
type Server struct {
series map[string][][2]int64
start int64 // store points relative to start time
mu sync.Mutex
}
func NewServer() *Server {
return &Server{
series: make(map[string][][2]int64),
start: time.Nanoseconds(),
}
}Its Update method accepts updates via RPC from client programs using the stat
library:
func (s *Server) Update(args *stat.Point, r *struct{}) os.Error {
s.mu.Lock()
defer s.mu.Unlock()
// append point to series
key := args.Process + " " + args.Series
second := (time.Nanoseconds() - s.start) / 100e6
point := [2]int64{second, args.Value}
s.series[key] = append(s.series[key], point)
// trim series to maxLen
if sk := s.series[key]; len(sk) > *maxLen {
sk = sk[len(sk)-*maxLen:]
}
return nil
}Not the cleanest piece of code I’ve written, but in essence it just tracks data
series keyed by the Process and Series provided by the client, and only
maintains the last maxLen points (configurable by a command-line flag).
The ServeHTTP method simply serves the current data as a JSON blob, allowing
us to use a *Server as an http.Handler.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
defer s.mu.Unlock()
w.SetHeader("Content-Type", "application/json")
json.NewEncoder(w).Encode(s.series)
}The server’s main function sets all this up, as well as the Static handler
for serving the HTML and JavaScript.
func main() {
flag.Parse()
server := NewServer()
rpc.Register(server)
rpc.HandleHTTP()
http.HandleFunc("/", Static)
http.Handle("/get", server)
http.ListenAndServe(*listenAddr, nil)
}The front-end uses the Prototype and
Flotr libraries to draw the graph.
When the document loads, it kicks off a timer to make Ajax requests to /get
every second:
document.observe('dom:loaded', function(){
setInterval(function() {
new Ajax.Request('/get', { onSuccess: draw })
}, 1000)
});The draw function then massages that data into the appropriate format to be
plotted by Flotr:
function draw(data) {
var series = [];
Object.keys(data.responseJSON).each(function(key) {
series.push({
label: key,
data: data.responseJSON[key]
})
});
var f = Flotr.draw($('container'), series);
}And that, in essence, is all there is to it.
The stat package and server are pretty limited right now, but I would be interested in extending it further. If you use this code I’d be curious to hear about it.
41544 views and 3 responses
-
Mar 10 2011, 3:39 AMAlexander responded:I think, I see bugs:
func (s *Server) Update(args *stat.Point, r *struct{}) os.Error {
s.mu.Lock()
defer s.mu.Unlock()// append point to series
key := args.Process + " " + args.Series
- second := (time.Nanoseconds() - s.start) / 100e6
+ second := (time.Nanoseconds() - s.start) / 1e9
point := [2]int64{second, args.Value}
s.series[key] = append(s.series[key], point)// trim series to maxLen
if sk := s.series[key]; len(sk) > *maxLen {
- sk = sk[len(sk)-*maxLen:]
+ s.series[key] = sk[len(sk)-*maxLen:]
}return nil
} -
Mar 20 2011, 6:52 PMSteve Phillips liked this post.
-
Mar 23 2011, 6:38 PMJay Graves responded:in the 'loop' function in the 'bench' program shouldn't the sleep call use the 'delay' argument instead of the 'getDelay' constant? Typo?