#346 Santa Claus problem | Fan concurrency

cgrinds Tue 26 Aug 2008

The Santa Claus problem is a concurrency problem that's meant to be more interesting than the dining philosophers.

Here's the problem:

Santa Claus sleeps at the North pole until awakened by either all of the nine reindeer, or by a group of three out of ten elves. He performs one of two indivisible actions: - If awakened by the group of reindeer, Santa harnesses them to a sleigh, delivers toys, and finally unharnesses the reindeer who then go on vacation. - If awakened by a group of elves, Santa shows them into his office, consults with them on toy R&D, and finally shows them out so they can return to work constructing toys. > A waiting group of reindeer must be served by Santa before a waiting group of elves. Since Santa’s time is extremely valuable, marshalling the reindeer or elves into a group must not be done by Santa.

I'm interested in any feedback folks have.

  1. Is this idiomatic Fan?
  2. Is this representative of how Fan concurrency is used? In particular I don't care for subclassing Thread, I could have created another class that contained a thread but it wouldn't have bought me much after I make everything constant.
  3. The way Erlang delivers messages is quite nice - In Fan, currently the only way to interact with a thread's message queue is by removing the first message. I found it challenging/hacky the way I ended up requeuing a message that I wasn't ready to handle at the moment but wanted to handle latter. (Line 48 and 101)

Here's a solution that's somewhat inspired by Erlang's elegant solution.

** Fan Solution of Santa problem v1
** Inspired by http://www.cs.otago.ac.nz/staffpriv/ok/santa/
** Chris Grindstaff

class SantaProblem {
  static Void main() {
      SantaProblem().go
  }
  Worker[] makeWorkers(Secretary secretary, Str name, Int number) {
    workers := Worker[,]
    number.times |Int i| {workers.add(Worker(secretary, "$name-$i").start)}
    return workers
  }
  Void go() {
    santa := Santa().start
    edna := santa.edna.start
    robin := santa.robin.start

    elves := makeWorkers(edna, "elf", 10)
    reindeers := makeWorkers(robin, "reindeer", 9)
    santa.join
  }
}


const class Santa : Thread {
  const Secretary edna
  const Secretary robin
  new make() : super.make("santa") {
    edna = Secretary("edna", this, 3)
    robin = Secretary("robin", this, 9)
  }
  override Obj run() {
    loop |Obj msg->Obj| {
      //echo("santa: received $msg")
      if(msg is Str) {
        working := locals.get("working") as Int
        working--
        if(working == 0) {
          locals.remove("working")
          locals.remove("secretary")->sendAsync(this)
        } else {
          locals.set("working", working)
        }
        return msg
      }
      if(locals.containsKey("working")) {
        //requeue this msg since the previous workers aren't finished
        return sendLater(100ms, msg)
      }
      packed := msg as List
      from := packed[0] as Secretary
      workers := (Worker[])packed[1]
      locals.set("working", workers.size)
      locals.set("secretary", from)
      workers.each |Worker w| {w.sendAsync(this)}
      return this
      }
    return null
  }
}

const class Worker : Thread {
  const Secretary secretary
  new make(Secretary secretary, Str name) : super.make(name) {
    this.secretary = secretary
  }
  override Obj run() {
    secretary.sendSync(this)
    loop |Obj msg->Obj| {
      // random delay
      sleep(Duration.fromStr("${Int.random(1..3)}sec"))
      //echo("$name received: $msg")
      echo("$name doing my work")
      gate_keeper := msg as Thread
      gate_keeper.sendAsync("done from $name")
      //once the worker is done, let the secretary know we're ready for more work
      //sleep(Duration.fromStr("${Int.random(1..3)}sec"))
      secretary.sendAsync(this)
      return msg
    }
    return this
  }
}

const class Secretary : Thread {
  const Santa santa
  const Int needed
  new make(Str name, Santa santa, Int needed) : super.make(name) {
    this.santa = santa
    this.needed = needed
  }
  override Obj run() {
    loop |Obj msg->Obj| {
      //echo("$name: received $msg")
      if(msg is Santa) {
        locals.remove("dontBotherSanta")
        return this
      }
      if(locals.containsKey("dontBotherSanta")) {
        //requeue
        return sendLater(100ms, msg)
      }
      list := locals.get("workersWaiting") as List
      if(list == null) list = [,]
      if(list.size == needed - 1) {
        list = [,].addAll(list).add(msg).toImmutable
        locals.set("dontBotherSanta", true)
        echo("$name -> $santa $list")
        santa.sendAsync([this, list].toImmutable)
        locals.remove("workersWaiting")
      } else {
        locals.set("workersWaiting", [,].addAll(list).add(msg).toImmutable)
      }
      return msg
    }
    return this
  }
}

brian Tue 26 Aug 2008

Chris,

This is pretty interesting. Just let me say that I don't consider Fan's concurrency model finished - in fact I still think it needs a lot of work. So I would ask you to tell me what changes/features would make Fan concurrency easy or more powerful?

Do you have any suggestions for more flexible dequeuing? We could let you pass in an message matching function (but then we might need to switch from a push loop API to pull API like receive).

I tend to find myself using thread locals where I in Java I might use a static field. I've even proposed a special syntax. But I agree it is a big icky so I haven't done it yet.

Where you are using thread locals in Secretary though I would typically use normal local variable - the function passed to Thread.loop doesn't have to be immutable, so it can capture state. I typically use this pattern:

Void run() { loop(&process(State())) }
Void process(State s) {}

Other than that I would consider that your solution is idiomatic Fan in as much that you've probably written as much concurrent Fan as anyone on the planet :)

cgrinds Wed 27 Aug 2008

Brian,

I'm inclined to look at what Erlang has done in this space. Erlang combines async message passing and pattern matching in a very interesting way. Specifically in Erlang - a message is not removed from the queue until it matches a pattern.

Fan doesn't really support pattern matching in the way that Erlang and Scala do so I'm not sure how well this paradigm will fit, although as you said Brian, a message matching function might be close enough. Scala and Erlang both use a pull API like receive.

In Scala, messages always include the sender, which is a nice way to reduce some boilerplate.

brian Wed 27 Aug 2008

Chris, I agree with your assessment - we need a proposal for a new API which would

  1. include the sending thread
  2. would allow some type of pattern matching (probably via function versus a new language feature)

If anyone want to propose the new API go for it, otherwise I will try to give it some thought later this week.

cgrinds Wed 3 Sep 2008

Brian, I'd hoped to get a chance to propose something here but haven't been able to yet. Have you gotten to it yet?

Off the top of my head, I think a receive is needed. Her are some issues that come to mind:

  • It needs to block until a message from the thread's queue is matched.
  • It needs to be nestable, like this scala example:
class Counter extends Actor {
  override def act(): Unit = loop(0)

  def loop(value: int): unit = {
    receive {
      case Incr() => loop(value + 1)
      case Dec() => loop(value - 1)
      case Value(a) => a ! IntValue(value); loop(value)
      case Lock(a) => a ! IntValue(value)
        receive { case UnLock() => loop(value) }
      case _ => loop(value)
    }
  }
}

To use the receive API a programmer wants to bind a message matcher to a block of code.

  • the basic API needs to do 2 things
    1. determine the first message in the thread's queue that matches
    2. apply or call a function that corresponds with the matched function

without pattern matching we need a way to bind a message matcher to a block of code and a way to create a set of these.

brian Wed 3 Sep 2008

I haven't given it any thought yet - I've had my head down, cranking on fwt and flux. Hopefully I'll get to a good spot this month to pause that project, and take some time to go thru my todo list.

Thinking on the fly here, I think it will come down to two options:

We could use normal library APIs to create a match/dispatch mechanism. For example a poor man's matching routine could be done with 2 dimensional function list:

matcher := 
[
  [ |Obj m->Bool| { return m is Str }, &processStr],
  [ |Obj m->Bool| { return m is Int }, &processInt],
  [ |Obj m->Bool| { return true }, &processUnknown]
]  

That works, but might be a bit ugly, so then we need to consider special language support, or potentially just a library class which might be easier to use. This issue also gets back into the tuple discussion.

cgrinds Wed 3 Sep 2008

Yeah the double blocks are the best I'd come up with without special language support. And yeah, it is ugly. :-)

I believe message based concurrency is very important, so the syntax for this should be as natural as possible. Not to mention the practical matter that in Fan today, message passing concurrency is our only option for multi-threaded Fan apps.

I don't like this any better, too verbose.

matcher :=
[
  MsgMatch {
    match = |Obj msg->Bool| {return msg is Incr}
    func = |Obj msg->Obj| {loop(value + 1)}
  }
]

I'll read up on the tuple discussion, I think I missed that discussion.

JohnDG Thu 4 Sep 2008

As much as you're not gonna like to hear it, unified with blocks present a clean solution. :-)

override Obj defineMatches() {
    matchRule {
       when (it is Incr) {
          // Do stuff here on 'it', which is the Msg object
          
       }
    }
}

How does this work? Briefly, matchRule is a method on Thread accepting a single-arg closure -- a unified with block. The UWB references the implicit parameter it, which is itself a message object.

void matchRule(|Obj -> Void| rule) {
    rules.addRule(rule)
}

when is also a method on Thread, which accepts a parameter of type Bool, and another single-arg closure.

void when(Bool condition, |Obj -> Void| handler) {
    if (condition) {
        handler.call(currentMessage)

        handled = true
    }
}

When a new message arrived, it is processed the matchers, in the order they are defined, until a match is found, whereupon the corresponding code block is executed.

loop |Obj msg->Obj| {
    handled        = false
    currentMessage = msg;

    rules.each ||Obj -> Void| rule| {
        if (!handled) {
            rule.call(msg)
        }
        else {
            // Fan has no way to break here
        }
    }
}

(Likely there are some syntax and/or logical errors here.)

f00biebletch Thu 4 Sep 2008

Sorry to go tangential, I haven't played with the threading API much yet, but have done enough erlang. The beauty with the erlang receive construct is that the same pattern matching construct applies to case, if, and function pattern matching, so it's a natural, pervasive idiom. There is real beauty in something like

fib(0) -> 0;
fib(1) -> 1;
fib(N) -> fib(N-1)+fib(N-2).

even though it's not tail recursive. Similarly, a fibonacci process can do something like

receive
  {fib, Pid, 0} ->
    Pid ! {answer, 0};
  {fib, Pid, 1} ->
    Pid ! {answer, 1};
  {fib, Pid, N} ->
    Pid ! {answer, fib(N)} %% utterly contrived example, my apologies
end

There is consistency and simplicity rather than yet another API to learn (well, excluding OTP of course) as you find in the Scala actor implementation.

I'm not sure if this is something you necessarily want to strive for, but, given the multicore revolution (which is why I got into erlang in the first place), it would be great to have something utterly accessible and simple.

I apologize in advance for coming a bit out of right field and merely dumping a lame erlang example, but I really love the simplicity and consistency of their implementation.

Login or Signup to reply.