#236 WideFinder-2 in Fan (multi-core version)

brian Mon 9 Jun 2008

Background

I finally have a Fan script which parallelizes nicely on the Sun Fire T2000. This version of the program runs on the full 42GB dataset a bit under 18min and weighs in with 114 LOC. If you stripped comments and wrote your opening braces differently it is only 91 LOC.

real    17:48.4
user  3:27:37.4
sys   1:37:00.0

I'm sure there will be lots of speedier results, but hopefully this will turn out to be respectable performance. But my real goal is a simple, concise program. I have a program which runs a tad faster (best run 16:57), but if this was production code I'd go with this version which is easier to understand and maintain.

What Doesn't Work

First a couple tips to what doesn't work. I'm running Fan on the JVM, so under the covers we are using Java's IO. A simple Fan program can read the bytes of the 42GB data in less than 5 minutes - so we aren't IO bound. The killer is converting from bytes to Strings, but that is unavoidable (the price of a Unicode enabled VM).

So the challenge is how to parallelize decoding bytes into Strings we can parse. I tried a couple attempts to divide the file up and give each thread a region of the file to read and parse. But having multiple threads with open file handles did not utilize the cores well.

Another design I tried out was funneling all the parsed lines to a single totalizer thread who was responsible for managing the mapped counters. That is clean and simple because we don't have a merge step. But after I added some diagnostics, I saw that it was spending a lot of time in a wait step due to flow control on the totalizer's message queue. I was able to achieve good performance by sending batches of 1000s to the totalizer, but it resulted in a more complicated program - so I ditched that strategy. So this version does the totalizing in the parsing threads with an extra merge step.

Design

All thread concurrency is handled with message passing using Thread.sendAysnc. The basic flow of the program:

  1. spawn reader thread to read in 64kb chunks of the file
  2. spawn n parser threads to parse those 64kb chunks in parallel and totalize the results
  3. wait for the reader to finish
  4. send null to the parsers threads to get their individual shard of results
  5. spawn a thread to merge the shards and report on each field (hits, 404s, etc)

Reader

The reader thread is the responsible for reading the file as fast as possible. We loop through using a random access file reading in 64kb Bufs. Once we read in 64kb, we trim the buffer backwards to the last newline, and reposition the file pointer to re-read those bytes. Remember we only want this thread to read bytes, it doesn't do any character decoding into strings.

As we read buffers, ideally we'd just like to send them as async messages to the parser threads on a rotating basis. The problem is that we can quickly fill up our message queues with huge chunks of memory - so we need some flow control. The way I implemented flow control was to have the parser's send themselves to the reader's message queue when they are ready for more work. The reader waits for the next parser in its loop, reads the next chunk, and then sends it as an async message back to the parser. With this design we don't ever enqueue more than one 64kb chunk on each parser's message queue.

Parsers

The parsers are where we utilize the cores during the IO/totalization phase. Each parser thread is responsible for:

  1. waiting for 64kb chunks from the reader
  2. parsing those chunks into log records
  3. totalizing the records
  4. sending itself back to the reader as an async message for more work

The totals for a single parser thread is called a Shard. A shard contains the maps for the various log fields (hits, bytes, clients, etc). When the main thread joins with the reader thread, it sends null to each parser to get back its shard results.

I'm not sure what the optimal number of parsers is, but using 30 parsers was able to parse the 218 millions lines in 16min 43sec. This phase is by far where most of the runtime is spent. A lot of is spent mapping bytes to java.lang.Strings. This is understandable since we have to copy from byte[] into char[] under the covers using a charset. Maybe something in NIO can read from file directly into char[] under the covers - but I didn't investigate.

Report

Once we've gotten all the shards, we spawn a report thread for each field: hits, bytes, clients, 404s, and refs. The report thread's responsibilities:

  1. merge shards into one big Str:Int map
  2. reverse sort the map values to find the threshold (10th highest value)
  3. find all the key/value pairs with a value higher than threshold
  4. write the report to a string
  5. return the report string to the main thread to print

You could definitely do some clever things here to parallelize the merge/sort better. But this phase only consumes 51sec of the total run, so I stuck with a simple, brute force design.

Code

class WideFinder
{
  static const Int numParsers := 30
  static const Int chunkSize  := 65536

  Void main()
  {
    // spawn reader and parser threads
    reader := Thread.make("reader", &readerRun).start
    parsers := Thread[,]
    for (i:=0; i<numParsers; ++i)
      parsers.add(Thread.make(null, &parserRun(reader)).start)

    // wait for reader to finish, then gather shards from parsers
    reader.join
    shards := Shard[,]
    parsers.each |Thread t| { shards.add(t.sendSync(null)) }

    // spawn reports and then print each one
    reports :=
    [
      Thread.make(null, &reportRun(shards, "Top URIs by hit", "hits")).start,
      Thread.make(null, &reportRun(shards, "Top URIs by Megabytes", "bytes")).start,
      Thread.make(null, &reportRun(shards, "Top 404s", "s404s")).start,
      Thread.make(null, &reportRun(shards, "Top client addresses", "clients")).start,
      Thread.make(null, &reportRun(shards, "Top referrers", "refs")).start
    ]
    reports.each |Thread t| { echo(t.join) }
  }

  static Void readerRun(Thread t)
  {
    fp := Sys.args[0].toUri.toFile.open("r")
    cs := Charset.fromStr("ISO-8859-1")
    t.loop |Obj msg|
    {
      start := fp.pos
      buf := Buf.make(chunkSize) { charset = cs }
      if (fp.readBuf(buf, chunkSize) == null) { fp.close; t.stop; return }
      while (buf[-1] !== '\n') buf.size--
      fp.seek(start+buf.size);
      ((Thread)msg).sendAsync(buf.seek(0))
    }
  }

  static Void parserRun(Thread reader, Thread t)
  {
    shard := Shard.make
    reader.sendAsync(t)
    t.loop |Obj msg->Obj|
    {
      if (msg == null) { return shard }
      ((Buf)msg).eachLine |Str line|
      {
        try
        {
          toks := line.split(" ")
          if (toks[5] == "\"GET")
            shard.process(toks[0], toks[6], toks[8], toks[9], toks[10])
        }
        catch {}
      }
      try { reader.sendAsync(t) } catch {}
      return null
    }
  }

  static Str reportRun(Shard[] shards, Str label, Str field)
  {
    // create merged map of shards
    merge := Str:Int[:] { def = 0 }
    shards.each |Shard shard|
    {
      Str:Int map := shard.type.field(field).get(shard)
      map.each |Int v, Str k| { merge[k] += v }
    }

    // sort values to find top 10
    threshold := merge.values.sortr[9]
    top := merge.findAll |Int v->Bool| { return v >= threshold }
    topKeys := top.keys.sortr |Str a, Str b->Int| { return top[a] <=> top[b] }

    s := StrBuf.make.add("\n$label\n")
    topKeys.each |Str key|
    {
      val := (field == "bytes") ? top[key] / (1024*1024) : top[key]
      s.add("  ${val.toStr.justr(5)} $key\n")
    }
    return s.add("\n").toStr
  }
}

class Shard
{
  Str:Int hits    := Str:Int[:] { def = 0 }
  Str:Int bytes   := Str:Int[:] { def = 0 }
  Str:Int s404s   := Str:Int[:] { def = 0 }
  Str:Int clients := Str:Int[:] { def = 0 }
  Str:Int refs    := Str:Int[:] { def = 0 }
  const Regex re := Regex.fromStr(r"^/ongoing/When/\d\d\dx/\d\d\d\d/\d\d/\d\d/[^ .]+$")

  Void process(Str client, Str uri, Str status, Str size, Str ref)
  {
    if (status == "200")  { try { bytes[uri] += size.toInt } catch {} }
    else if (status == "404") { s404s[uri]++; return }
    else if (status != "304") return

    if (uri.size < 32 || !re.matches(uri)) return
    hits[uri]++
    clients[client]++
    if (ref != "\"-\"" && !ref.contains("http://www.tbray.org/ongoing/"))
      refs[ref[1..-2]]++  // lose the quotes
  }
}

katox Wed 15 Oct 2008

I was curious about this one. I wondered why Fan lost about 4 seconds on Java. Unfortunately it's not obvious as the sample Java program uses some kind of a complex framework (how significant!).

Since there are some test data available I just wanted to run the Fan sample. Unfortunately it doesn't work anymore. If I update the syntax and Str.split API change I still get an exception:

sys::IOErr: Not serializable: sys::MemBuf
  fanx.serial.ObjEncoder.writeObj (ObjEncoder.java:85)
  fan.sys.OutStream.writeObj (OutStream.java:247)
  fan.sys.OutStream.writeObj (OutStream.java:244)
  fan.sys.Namespace.safe (Namespace.java:86)
  fan.sys.Thread.sendAsync (Thread.java:590)
  wf::WideFinder.readerRun (Main.fan:42)
  fan.sys.Thread.dispatch (Thread.java:552)
  fan.sys.Thread.loop (Thread.java:539)
  wf::WideFinder.readerRun (Main.fan:35)
  sun.reflect.NativeMethodAccessorImpl.invoke0 (NativeMethodAccessorImpl.java)
  sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:39)
  sun.reflect.DelegatingMethodAccessorImpl.invoke   (DelegatingMethodAccessorImpl.java:25)
  java.lang.reflect.Method.invoke (Method.java:597)
  fan.sys.Method.invoke (Method.java:554)
  fan.sys.Method$MethodFunc.call1 (Method.java:270)
  fan.sys.Thread.run (Thread.java:511)
  fan.sys.Thread$Val.run (Thread.java:447)

Brian, is this temporary due to recent changes towards value-type support or something has changed elsewhere?

brian Wed 15 Oct 2008

It is hard to compare my Fan solution to other Java based solutions b/c the code is so different - I was going for a small number of lines versus speed. But I suspect even with same code structure Fan will be slower because its everything is boxed (something we will rectify soon :-). Java as a whole fared poorly with its Unicode strings versus byte oriented solutions - a huge amount of time is spend decoding bytes into Unicode and performing Unicode aware regex.

The specific issue you are seeing was because Fan needs some type of escape hatch to allow unsafe mutable message passing. See Issues from WideFinder for a more general discussion. As a temporary fix you can comment out the calls to Namespace.safe in "Thread.java".

Login or Signup to reply.