#1693 Modeling Evented actors

kaushik Fri 4 Nov 2011

This is a little bit long so pls bear with me :)

I am trying to model evented "actor per request" in fantom. Here's a simplified version of the problem i am running into:

So i've an actor pool with maximum of 1 thread. I create 10 actors all with this one actor pool. Only one of these 10 actors needs to do a big chunk of work that takes considerable time, Like say Actor.sleep(10sec). the others finish fairly quickly.. So you can imagine if the request for costly actor arrive first all other request have to wait for latest 10sec before finishing off their work since we have only one thread.

So as a solution i split the 10sec task into 10- 1sec tasks. so I sleep for 1sec and send a message to same actor again to give a chance for other pending actors to jump in and complete their work in the middle. The code is below. However even though the request for processing other actors come first the current model seem to prefer the completion of current actor's messages first.. If you execute the code below you can see that the first 10sec is spent anyway on actor1 in spite of splitting up the work in different messages. Let me know what i am missing.

using concurrent

class ActorTest{
    Void main(){
        pool := ActorPool(){maxThreads = 1}
        //create 10 actors
        (1..10).each{
            MyActor(it, pool).send(null) 
        }

        //Wait this thread
        Env.cur.in.read
    }
}

const class MyActor : Actor{
    const Int num
    new make(Int num, ActorPool pool) : super(pool){
        this.num = num
    }

    override Obj? receive(Obj? msg){
        if(num > 1){ //everything else except actor1 can finish quickly
            echo("a simple echo for actor $num")
            return null
        }

        //This is actor 1, we'll sleep for 1sec 10 times

        Int? noOfTimesSlept := Actor.locals["times-slept"]
        if(noOfTimesSlept == null) noOfTimesSlept=0
        noOfTimesSlept++
        Actor.locals["times-slept"] = noOfTimesSlept
        if(noOfTimesSlept > 10){
            return null
        }

        echo("Sleeping 1sec for actor $num")
        Actor.sleep(1sec)
        this.send(null)
        return null
    }
}

brian Fri 4 Nov 2011

@kaushik

I think your fundamental problem is that you are trying to limit the pool to one thread. That will force a bottleneck by a single "hog" actor. As a general rule you want 3 to 4 times more threads than expected "hogs", so that things get scheduled efficiently.

What makes actors efficient is they aren't truly preemptive multi-tasked, rather we use a thread pool to give us a balancing between cooperative and preemptive multi-tasking.

The specific issue you are probably running into is how I do scheduling. If you look at Actor.java in _work(), you will see that an actor will process up to 100 pending messages before yielding the thread to another actor. This is to avoid excessive context switches between actors (they aren't super expensive, but not free either).

qualidafial Fri 4 Nov 2011

@kaushik

You are trying to connect multiple units of work from inside the running actor. You will have better performance if you chain those units of work external to the actor, e.g.:

result := MyActor(1, pool).send(null)
(2..10).each {
  result = MyActor(it, pool).sendWhenDone(result, null)
}
result.get // block on calling thread

Edit: := vs =

qualidafial Fri 4 Nov 2011

While we're on the subject, it would be nice to have a method Actor#sendResultWhenDone(Future).

Without this, we have to send the future as the message to the next actor:

resultA := ActorA(pool).send(a)
resultB := ActorB(pool).sendWhenDone(resultA, resultA)

class ActorB : Actor
{
  override Obj? receive(Obj? msg)
  {
    Future f := msg
    unwrappedMsg := f.get
    // do stuff with unwrapped message
  }
}

This means that actor B has expect a Future as its message, instead of a more natural type as the situation demands.

I propose adding a new method sendResultWhenDone, which sends the result of resultA.get as the message to actor B:

Future sendResultWhenDone(Future f)

Thus:

resultA := ActorA(pool).send(a)
resultB := ActorB(pool).sendResultWhenDone(resultA)

class ActorB : Actor
{
  override Obj? receive(Obj? msg)
  {
    // do stuff with message
  }
}

kaushik Sat 5 Nov 2011

Thanks guys. Sorry for the late reply, it was bed time on the other side of the world :)

The specific issue you are probably running into is how I do scheduling. If you look at Actor.java in _work(), you will see that an actor will process up to 100 pending messages before yielding the thread to another actor.

Yes. Is it configurable? Let me describe the actual problem i am running into. The way wisp works, it has one-message/per-request/per-actor model. And the actor pool has a hard limit of 100 threads. That means wisp can support a maximum of 100 concurrent connections. So try this.

  1. Write a service(A) that will do something big. Say send a x-gb file to the client or do a 10,000 db reads(you never know the kind of reports they ask for :)
  2. Write another service(B) that will just write "hello world" to the screen

Have a 100 people(or apache bench) start downloading using service A. 100 threads are done. Now open browser and start a request for B. it has to wait until all the requests for service-A has to be completed.

Isn't this is exactly the kind of problem that evented programming is trying to solve? Typically the clients of Service-A are willing to wait and if it usually takes 1hr for them to download that file, they wouldn't mind if it takes 1hr and 10 minutes or even 2 hours. However, clients who want to see the "hello world", home page would think the system is down if it takes 1hr to see the page instead of the usual 10ms.

Now the solution I was thinking is: a) instead of writing a 100GB file to each client, keep writing say 4MB and yeild to check if anyother reuqest have come in

b) Have a provision in wisp to not close the socket and the end of one message and have the application (or framework) close it later. This is another request actually.

With Fantom's actors and closures modeling this is very simple.

You are trying to connect multiple units of work from inside the running actor. You will have better performance if you chain those units of work external to the actor,

I could use an external actor but the reason I want to keep everything related to one request in one actor is simple. You can carry your mutable state in Actor.locals, which is extremely convenient.

thanks, I'd love to hear any other solutions to this.

brian Sat 5 Nov 2011

Okay if we are talking about WispService, then I think the discussion is slightly different.

First off, we absolutely should have an option to tune the maxThreads of WispService and it should have a higher default than 100. I pushed a change by adding a new WispService.maxThreads field that defaults to 500. changeset.

Longer term I don't think the main issue is how actors work, rather it how async I/O works (in conjunction with actors). I've avoided this issue up until this point because simple thread based HTTP processing really is good enough for most applications. But where it will break down quickly is when we go to add WebSockets to Wisp (or using older hacks like Comet). At that point we will have to design a real async I/O API that plays nicely with Actors.

kaushik Sat 5 Nov 2011

I pushed a change by adding a new WispService.maxThreads field that defaults to 500. changeset.

Thanks. this should be a good temporary fix.

Could we also have an option to adjust the throughput for every actor? (I think scala and akka have some option for this). while processing 100 messages in one go might be a good default, it might not be always right and reduces the fairness in some cases.

kaushik Sat 5 Nov 2011

At that point we will have to design a real async I/O API that plays nicely with Actors.

It'll be great when we have this, but until then, i think we can still design a (fairly-)highly scalable system and websockets or long polling with sync I/O if

  • A thread is not attached to a request (and Having wisp support for this)
  • Every (big)request does its work in chunks, giving fair chance to others

In addition, I could actually have a big computation that want to do in parts. Also I am assuming the async I/O api cannot cover everything, like for eg., queries to a db which does not have a decent aysnc jdbc driver(which no db has btw).

So i'd like to ask for the following features if it'd work

  1. Ability to set the throughput of wisp actor(no. of messages a wisp actor will process in one sweep)[or any actor in general] depending on the what iam doing. It'll be great if we can also adjust this at runtime depending on the load.
  2. Ability to tell wispactor not to close the socket at the end of the "process" so I can chose to close it later for long requests.

brian Fri 11 Nov 2011

Ability to set the throughput of wisp actor(no. of messages a wisp actor will process in one sweep)[or any actor in general] depending on the what iam doing. It'll be great if we can also adjust this at runtime depending on the load.

I think this is sort of a low-level tuning hook and really exposes what I would consider implementation details of the actor framework. But I agree that is probably important to let you tune that when you really need to. So what I did was create a NoDoc hook on ActorPool for you - changeset.

Ability to tell wispactor not to close the socket at the end of the "process" so I can chose to close it later for long requests.

I think on the surface this might be similar to previous issue - sort of a pretty low level hook. We could do something similar - a hook marked as NoDoc. But I'm not sure exactly what you are thinking here. Are you subclassing WispService? How exactly would you get this all wired up to work?

qualidafial Fri 11 Nov 2011

What about a config option to set the default threshold?

Edit: Clarification: I'm talking about in /etc/concurrent/config.props

kaushik Fri 11 Nov 2011

Thanks brian. The Actor change should be good enough for now. Let me actually make the change I am thinking in wisp, put it in production and we can discuss the actual set of changes that might be required.

Login or Signup to reply.