1. 使用Actor的并发
Scala的actor提供了一种基于事件的轻量级线程。只要使用scala.actors.Actor伴生对象的actor方法,就可以创建一个actor。它接受一个函数值/闭包做参数,一创建好就开始运行。用!()方法给actor发消息,用receive()方法从actor接收消息。receive()也可以闭包为参数,通常用模式匹配处理接收到的消息。
我们看个例子,假定我们需要判定一个给定的数是否是完全数(完全数是一个正整数,其因子之和是该数的两倍):
非并发编程的实现:
def sumOfFactors(number:Int) = { (0/:(1 to number)){(sum, i) => if(number%i == 0) sum+i else sum } } def isPerfect(candidate:Int) = 2*candidate == sumOfFactors(candidate) println("6 is perfect? " + isPerfect(6)) println("33550336 is perfect? " + isPerfect(33550336)) println("33550337 is perfect? " + isPerfect(33550337))
并发编程的实现,将从1到candidate数这个范围内的数划分成多个区间,把每个区间内求和的任务分配给单独的进程。
import scala.actors.Actor._class FasterPerfectNumberFinder { def sumOfFactorsInRange(lower:Int, upper:Int, number:Int) = { (0/:(lower to upper)){(sum, i) => if(number%i == 0) sum+i else sum } } def isPerfectConcurrent(candidate:Int) = { val RANGE = 1000000 val numberOfPartitions = (candidate.toDouble/RANGE).ceil.toInt val caller = self for(i<-0 until numberOfPartitions){ val lower = i*RANGE + 1 val upper = candidate min(i+1)*RANGE actor { caller ! sumOfFactorsInRange(lower,upper,candidate) } } val sum = (0 /: (0 until numberOfPartitions)){ (partialSum, i) => receive { case sumInRange:Int => partialSum + sumInRange } } 2 * candidate == sum } println("6 is perfect? " + isPerfectConcurrent(6)) println("33550336 is perfect? " + isPerfectConcurrent(33550336)) println("33550337 is perfect? " + isPerfectConcurrent(33550337))}object FasterPerfectNumberFinder extends App{ new FasterPerfectNumberFinder() }
程序运行结果如下:
6 is perfect? true33550336 is perfect? true33550337 is perfect? false
比较两种方法用时的程序如下:
import scala.actors.Actor._class FindPerfectNumberOverRange { //普通实现 def sumOfFactors(number:Int) = { (0/:(1 to number)){(sum, i) => if(number%i == 0) sum+i else sum } } def isPerfect(candidate:Int) = 2*candidate == sumOfFactors(candidate) //并发实现 def sumOfFactorsInRange(lower:Int, upper:Int, number:Int) = { (0/:(lower to upper)){(sum, i) => if(number%i == 0) sum+i else sum } } def isPerfectConcurrent(candidate:Int) = { val RANGE = 1000000 val numberOfPartitions = (candidate.toDouble/RANGE).ceil.toInt val caller = self for(i<-0 until numberOfPartitions){ val lower = i*RANGE + 1 val upper = candidate min(i+1)*RANGE actor { caller ! sumOfFactorsInRange(lower,upper,candidate) } } val sum = (0 /: (0 until numberOfPartitions)){ (partialSum, i) => receive { case sumInRange:Int => partialSum + sumInRange } } 2 * candidate == sum } //比较时间花费 def countPerfectNumbersInRange(start:Int, end:Int, isPerfectFinder:Int => Boolean)={ val startTime = System.nanoTime() val numberOfPerfectNumbers = (0 /: (start to end)){(count, candidate) => if(isPerfectFinder(candidate)) count + 1 else count } val endTime = System.nanoTime() println("Found " + numberOfPerfectNumbers + " perfect numbers in given range, took " + (endTime-startTime)/1000000000.0 + " secs") } }object FindPerfectNumberOverRange extends App{ val fpn = new FindPerfectNumberOverRange() val startNumber = 33550300 val endNumber = 33550400 fpn.countPerfectNumbersInRange(startNumber, endNumber, fpn.isPerfect) fpn.countPerfectNumbersInRange(startNumber, endNumber, fpn.isPerfectConcurrent)}
程序运行结果如下:
Found 1 perfect numbers in given range, took 53.505288657 secsFound 1 perfect numbers in given range, took 35.739131734 secs
2. 消息传递
下面看一下消息是如何从一个actor传到另一个actor。
import scala.actors.Actor._class MessagePassing { var startTime : Long = 0 val caller = self val engrossedActor = actor { println("Number of messages received so far? " + mailboxSize) caller ! "send" Thread.sleep(3000) println("Number of messages received while I was busy? " + mailboxSize) receive { case msg => val receivedTime = System.currentTimeMillis() - startTime println("Received message " + msg + "after " + receivedTime + " ms") } caller ! "received" } receive { case _ =>} println("Sending Message ") startTime = System.currentTimeMillis() engrossedActor ! "hello buddy" val endTime = System.currentTimeMillis() - startTime printf("Took less than %dms to send message\n", endTime) receive { case _ => }}object MessagePassing extends App { new MessagePassing()}
程序运行结果如下:
Number of messages received so far? 0Sending Message Took less than 0ms to send messageNumber of messages received while I was busy? 0Received message hello buddyafter 2997 ms
从输出可以看出,发送不阻塞,接收不中断。在actor调用receive()方法接收之前,消息会一直等在那里。
异步地发送和接收消息是一项好的实践——可以最大限度的利用并发。不过,如果对同步的发送消息和接收响应有兴趣,可以用!?()方法。在接收发消息的目标actor给出响应之前,她会一直阻塞在那里。这会引起潜在的死锁。一个已经失败的actor会导致其他actor的失败,然后就轮到应用失败了。所以,即便要用这个方法,至少要用有超时参数的变体,像这样:
package com.cn.gaoimport scala.actors._import Actor._class AskFortune { val fortuneTeller = actor { for(i <- 1 to 4) { Thread.sleep(1000); receive { case _ => sender ! "your day will rock! "+ i //case _ => reply("your day will rock! " + i) // same as above } } } println(fortuneTeller !? (2000, "what's ahead")) println(fortuneTeller !? (500, "what's ahead")) val aPrinter = actor { receive { case msg => println("Ah, fortune message for you-"+ msg)} } fortuneTeller.send("What's up", aPrinter) fortuneTeller ! "How's my future?" Thread.sleep(3000) receive{ case msg : String => println("Received "+ msg)} println("Let's get that lost message") receive { case !(channel,msg) => println("Received belated message "+ msg)}}object AskFortune extends App{ new AskFortune()}
在超时之前,如果actor发送回消息,!?()方法就会返回结果。否则,它会返回None,所以,这个方法的返回类型是Option[Any]。在上面的代码中,sender所引用的是最近一个发送消息的actor。程序运行结果如下:
Some(your day will rock! 1)NoneAh, fortune message for you-your day will rock! 3Received your day will rock! 4Let's get that lost messageReceived belated message your day will rock! 2
3. Actor类
如果想在actor启动时进行显式控制,希望在actor里存入更多信息,可以创建一个对象,混入Actor trait。这是对的——Scala的Actor只是个trait,可以在任何喜欢的地方混入它。下面是个例子:
AnsweringService.scala
package com.cn.gaoimport scala.actors._import Actor._class AnsweringService(val folks:String*) extends Actor { def act(){ while(true){ receive{ case(caller: Actor, name:String, msg:String) => caller ! ( if(folks.contains(name)) String.format("Hey it's %s got message %s", name, msg) else String.format("Hey there's no one with the name %s here",name) ) case "ping" => println("ping!") case "quit" => println("existing actor") exit } } }}object AnsweringService extends App{ val answeringService1 = new AnsweringService("Sara", "Kara", "John") answeringService1 ! (self, "Sara", "In town") answeringService1 ! (self, "Kara", "Go shopping?") answeringService1.start() answeringService1 ! (self, "John", "Bug fixed?") answeringService1 ! (self, "Bill", "What's up") for(i <- 1 to 4) { receive { case msg => println(msg)}} answeringService1 ! "ping" answeringService1 ! "quit" answeringService1 ! "ping" Thread.sleep(2000) println("The last ping was not processed")}
程序运行结果如下:
Hey it's Sara got message In townHey it's Kara got message Go shopping?Hey it's John got message Bug fixed?Hey there's no one with the name Bill hereping!existing actorThe last ping was not processed
开始,我们给actor发送了一些元组消息。这些消息不会立即得到处理,因为actor还没有启动。它们会进入队列,等待后续处理。然后调用start()方法,再发送一些消息。只要调用了start()方法,就会有一个单独的线程调用actor的act()方法。这时,曾经发出去的所有消息都开始进行处理。然后,我们循环接收对方发出的四条消息的应答。
调用exit()方法可以停止actor。不过这个方法只是抛出异常,试图终止当前线程的执行,所以,在act()方法里调用挺不错。
4. actor方法
如果对显式启动actor并不真的那么关注,那么可以使用actor()方法。在actor间传递数据,可以用!()和receive()方法。下面从一个使用actor方法的例子开始,然后重构,使其并发。
这个方法isPrime()告诉我们给定的数是不是素数。为了达到说明的目的,我在方法里加了一些打印语句:
package com.cn.gaoimport scala.actors._import Actor._class PrimeTeller { def isPrime(number: Int) = { println("Going to find if " + number + " is prime") var result = true if(number == 2 || number == 3) result = true for(i <- 2 to Math.sqrt(number.toDouble).floor.toInt;if result){ if(number % i == 0) result = false } println("done finding if " + number + " is prime") result }}
调用上面这段代码的话,接收到应答之前,就会阻塞在那里。如下所示,这里把调用这个方法的职责委托给一个actor。这个actor会确定一个数是否是素数,然后,用一个异步响应发回给调用者。
package com.cn.gaoimport scala.actors._import Actor._object PrimeTeller extends App { def isPrime(number: Int) = { println("Going to find if " + number + " is prime") var result = true if(number == 2 || number == 3) result = true for(i <- 2 to Math.sqrt(number.toDouble).floor.toInt;if result){ if(number % i == 0) result = false } println("done finding if " + number + " is prime") result } val primeTeller = actor{ var continue = true while(continue){ receive { case (caller: Actor, number:Int) => caller ! (number, isPrime(number)) case "quit" => continue = false } } } primeTeller ! (self, 2) primeTeller ! (self, 131) primeTeller ! (self, 132) for(i<- 1 to 3){ receive { case (number, result) => println(number + "is prime? " + result) } } primeTeller ! "quit"}
primeTeller是一个引用,它指向了用actor()方法创建的一个匿名actor。它会不断循环,直到接收到“quit”消息。除了退出消息,它还能接收一个包含caller和number的元组。收到这个消息时,它会判断给定的数是否是素数,然后,给caller发回一个消息。
程序运行结果如下:
Going to find if 2 is primedone finding if 2 is primeGoing to find if 131 is prime2is prime? truedone finding if 131 is primeGoing to find if 132 is prime131is prime? truedone finding if 132 is prime132is prime? false
上面的代码处理了接收到的每个数字;从输出可以看到这一点。在actor忙于判断一个数是否是素数时,如果又接收到多个请求,它们就会进入队列。因此,即便是将执行委托给了actor,它依然是顺序的。
让这个例子并行相当容易,在PrimeTeller actor的第6行,不要去调用isPrime(),而是把这个职责委托给另一个actor,让它给调用者回复应答,程序如下:
package com.cn.gaoimport scala.actors._import Actor._object PrimeTeller extends App { def isPrime(number: Int) = { println("Going to find if " + number + " is prime") var result = true if(number == 2 || number == 3) result = true for(i <- 2 to Math.sqrt(number.toDouble).floor.toInt;if result){ if(number % i == 0) result = false } println("done finding if " + number + " is prime") result } val primeTeller = actor{ var continue = true while(continue){ receive {// case (caller: Actor, number:Int) => caller ! (number,// isPrime(number)) case (caller: Actor, number:Int) => actor {caller ! (number, isPrime(number))} case "quit" => continue = false } } } primeTeller ! (self, 2) primeTeller ! (self, 131) primeTeller ! (self, 132) for(i<- 1 to 3){ receive { case (number, result) => println(number + "is prime? " + result) } } primeTeller ! "quit"}
再次运行上面的代码,我们会看到,多个请求并发地执行了,如下所示:
Going to find if 2 is primeGoing to find if 131 is primeGoing to find if 132 is primedone finding if 132 is primedone finding if 2 is primedone finding if 131 is prime132is prime? false131is prime? true2is prime? true
5. receive和receiveWithin方法
receive()接收一个函数值/闭包,返回一个处理消息的应答。下面是个从receive()方法接收结果的例子:
package com.cn.gaoimport scala.actors.Actor._object Receive extends App { val caller = self val accumulator = actor { var sum = 0 var continue = true while(continue) { sum += receive { case number:Int => number case "quit" => continue = false 0 } } caller ! sum } accumulator ! 1 accumulator ! 7 accumulator ! 8 accumulator ! "quit" receive{ case result => println("Total is " + result)}}
accumulator接收数字,对传给它的数字求和。完成之后,它会发回一个消息,带有求和的结果。上面代码的输出如下:
Total is 16
调用receive()方法会造成程序阻塞,直到实际接收到应答为止。如果预期的actor应答一直没有发过来就麻烦了。这会让我们一直等下去。用receiveWithin()方法修正这一点,它会接收一个timeout参数,如下:
package com.cn.gaoimport scala.actors._import scala.actors.Actor._object ReceiveWithin extends App { val caller = self val accumulator = actor { var sum = 0 var continue = true while(continue) { sum += receiveWithin(1000) { case number:Int => number case TIMEOUT => println("Time out! Will return result now") continue = false 0 } } caller ! sum } accumulator ! 1 accumulator ! 7 accumulator ! 8 receiveWithin(2000) { case result => println("Total is " + result) }}
在给定的超时期限内,如果什么都没有收到,receiveWithin()方法会收到一个TIMEOUT消息。如果不对其进行模式匹配,就会抛出异常。在上面的代码里,接收到TIMEOUT消息当做了完成值累加的信号。输出如下:
Time out! Will return result nowTotal is 16
我们应该倾向于使用receiveWithin()方法而非receive()方法,避免产生活性等待问题。
recevie()和receiveWithin()方法把函数值当作偏应用函数,调用代码块之前,会检查它是否处理消息。所以,如果接收到一个非预期的消息,就会悄悄地忽略它。当然,如果想把忽略的消息显示出来,可以提供一个case_=>...语句。下面这个例子展示了忽略的无效消息:
package com.cn.gaoimport scala.actors._import Actor._object MessageIgnore extends App{ val expectStringOrInteger = actor { for(i <- 1 to 4) { receiveWithin(1000) { case str: String =>println("You said " + str) case num: Int => println("You gave " + num) case TIMEOUT => println("Time out!") } } } expectStringOrInteger ! "only constant is change" expectStringOrInteger ! 1024 expectStringOrInteger ! 22.22 expectStringOrInteger ! (self, 1024) receiveWithin(3000){ case _ => }}
在代码最后,放了一个receiveWithin()的调用。因为主线程退出时,程序就退出了,这个语句保证程序还活动着,给actor一个应答的机会。从输出中可以看出,actor处理了前两个发送给它的消息,忽略了后两个,因为它们没有匹配上预期的消息模式。程序最终会超时,因为没有再接收到任何可以匹配的消息。输出结果如下:
You said only constant is changeYou gave 1024Time out!Time out!
6. react和reactWithin方法
在每个actor里,调用receive()的时候实际上会要求有一个单独的线程。这个线程会一直持有,直到这个actor结束。也就是说,即便是在等待消息到达,程序也会持有这些线程,每个actor一个,这绝对是一种资源浪费。Scala不得不持有这些线程的原因在于,控制流的执行过程中有一些具体状态。如果在调用序列里没有需要保持和返回的状态,Scala几乎就可以从线程池里获取任意线程执行消息处理——这恰恰就是使用react()所做的事情。react()不同于receive(),它并不返回任何结果。实际上,它并不从调用中返回。
如果处理了react()的当前消息后,还要处理更多的消息,就要在消息处理的末尾调用其他方法。Scala会把这个调用执行交给线程池里的任意线程。看一个这种行为的例子:
package com.cn.gaoimport scala.actors.Actor._import scala.actors._object React extends App { def info(msg:String) = println(msg + " received by " + Thread.currentThread()) def receiveMessage(id:Int) { for(i <- 1 to 2) { receiveWithin(20000) { case msg:String => info("receive: " + id + msg) case TIMEOUT => } } } def reactMessage(id:Int){ react { case msg:String => info("react: " + id + msg) reactMessage(id) } } val actors = Array ( actor {info("react: 1 actor created"); reactMessage(1)}, actor {info("react: 2 actor created"); reactMessage(2)}, actor {info("receive: 3 actor created"); receiveMessage(3)}, actor {info("receive: 4 actor created"); receiveMessage(4)} ) Thread.sleep(1000) for(i <- 0 to 3){actors(i) ! " hello"; Thread.sleep(2000)} Thread.sleep(2000) for(i <- 0 to 3){actors(i) ! " hello"; Thread.sleep(2000)}}
上面的代码输出结果如下:
react: 1 actor created received by Thread[ForkJoinPool-1-worker-5,5,main]react: 2 actor created received by Thread[ForkJoinPool-1-worker-3,5,main]receive: 3 actor created received by Thread[ForkJoinPool-1-worker-1,5,main]receive: 4 actor created received by Thread[ForkJoinPool-1-worker-7,5,main]react: 1 hello received by Thread[ForkJoinPool-1-worker-3,5,main]react: 2 hello received by Thread[ForkJoinPool-1-worker-3,5,main]receive: 3 hello received by Thread[ForkJoinPool-1-worker-1,5,main]receive: 4 hello received by Thread[ForkJoinPool-1-worker-7,5,main]react: 1 hello received by Thread[ForkJoinPool-1-worker-5,5,main]react: 2 hello received by Thread[ForkJoinPool-1-worker-5,5,main]receive: 3 hello received by Thread[ForkJoinPool-1-worker-1,5,main]receive: 4 hello received by Thread[ForkJoinPool-1-worker-7,5,main]
使用receiveWithin()方法的actor具有线程关联性(thread affinity);他们会持续的使用分配给他们的同一个线程。从上面的输出中就可以看出。
另一方面,使用react()的actor可以自由的交换彼此的线程,可以由任何可用的线程处理。
换句话说,使用react()的actor不具有线程关联性,它们会放弃自己的线程,用一个新的线程(或许是同一个)进行后续的消息处理。这种做法对资源更为友善,特别是在消息处理相当快的情况下。所以,我们鼓励使用react()来代替receive()。
类似于receiveWithin(),如果在超时时段里,没有接到任何消息,reactWithin()就会超时——在这种情况下,如果处理case TIMEOUT,可以采取任何想采取的行动,也可以从方法里退出。下面是一个使用reactWithin()的例子,尝试一下之前使用receiveWithin()实现累加器的例子,这次用reactWithin()方法:
package com.cn.gaoimport scala.actors._import scala.actors.Actor._object ReactWithin extends App { val caller = self def accumulate(sum:Int) { reactWithin(500){ case number:Int => accumulate(sum + number) case TIMEOUT => println("Timed out! Will send result now") caller ! sum } println("This will not be called...") } val accumulator = actor {accumulate(0)} accumulator ! 1 accumulator ! 7 accumulator ! 8 receiveWithin(10000) { case result => println("Total is " + result) }}
上面的代码输出如下:
Timed out! Will send result nowTotal is 16
同使用receiveWithin()的方案比起来,这个方案更加优雅,等待接收消息时,它并不持有任何线程。
关于react()和reactWithin(),最后要记住的一点是,因为这两个方法并不是真的从调用里返回(记住,Scala内部通过让这些方法抛出异常来处理这个问题),放在这些方法后的任何代码都不会执行(比如在accumulate()方法末尾加上打印语句)。所以,在调用这两个方法之后,不要写任何东西。
7. loop和loopWhile
有两件事情阻碍我们充分使用react()和reactWithin()。第一个是递归调用。如果有多个case语句,典型情况下,要在每个case里面重复调用。第二,似乎没有什么好的方式跳出方法。第一个顾虑的答案是单例对象Actor的loop()方法。第二个的答案是loopWhile()方法。
相比于在reactWithin()里递归的调用方法,可以在loop()调用里放一个对reactWithin()的调用。执行loop()方法的线程遇到reactWithin()的调用时,会放弃控制。消息到达时,任意的线程都可以继续执行适当的case语句。case语句执行完毕,线程会继续回到loop()块的顶部。这会一直继续下去。loopWhile()方法是类似的,但是只有提供的参数是有效的,它才会继续循环下去。因为loopWhile()负责处理循环,所以,可以把局部状态放到循环之外,在reactWithin()方法里访问它。这样的话,就给了我们一个两全其美的选择,既可以像receiveWithin()那样处理状态,又可以像reactWithin()那样利用来自线程池的线程。下面看一个在loopWhile()里使用reactWithin()的例子。
package com.cn.gaoimport scala.actors._import Actor._object Loop extends App { val caller = self val accumulator = actor { var continue = true var sum = 0 loopWhile(continue){ reactWithin(500){ case number:Int => sum += number case TIMEOUT => continue = false caller ! sum } } } accumulator ! 1 accumulator ! 7 accumulator ! 8 receiveWithin(1000){ case result => println("Total is " + result)}}
上面的代码没有任何递归调用——这是由loopWhile()处理的。在退出消息处理的地方,只需简单的设置标记,由它处理退出循环,进而退出actor执行。代码输出如下:
Total is 16
8. 控制线程执行
我们已经见识到了,使用receive时,每个actor是怎样运行在自己的线程里,react又如何让actor共享来自线程池的线程。不过,有时我们会想要更强的控制力。比如,结束一个长期运行的任务之后,需要更新UI,这时需要在一个单独的线程里运行任务,然后,在主线程里更新UI。(因为UI组件时常不是线程安全的。)通过使用SingleThreadScheduler,可以让Scala在主线程里运行actor。我们用个例子看看如何做到这点:
package com.cn.gaoimport scala.actors._import scala.actors.scheduler._import Actor._object InMainThread { def main(args:Array[String]){ if (args.length > 0 && args(0)== "Single") { println("Command-line argument Single found") Scheduler.impl = new SingleThreadedScheduler() } println("Main running in " + Thread.currentThread()) actor {println("Actor1 running in " + Thread.currentThread())} actor {println("Actor2 running in " + Thread.currentThread())} receiveWithin(3000){ case _ => } }}
上面的代码里,创建了两个actor。如果不传任何命令行参数,两个actor的代码和主脚本的代码会运行在各自的线程里,输出如下:
Main running in Thread[main,5,main]Actor1 running in Thread[ForkJoinPool-1-worker-5,5,main]Actor2 running in Thread[ForkJoinPool-1-worker-5,5,main]
另一方面,如果像scala InMainThread.scala Single 这样运行之前的代码,会得到不同的结果:
Command-line argument Single foundMain running in Thread[main,5,main]Actor1 running in Thread[main,5,main]Actor2 running in Thread[main,5,main]
无论actor何时启动,Scala都会让单例对象Scheduler去运行它。通过是设置Scheduler的impl,就可以控制整个应用的actor调度策略。
上面的方式影响深远,它让我们可以控制所有的actor的调度。不过,也许我们想要让一些线程运行在主线程中,而其它actor运行在各自线程里。通过继承Actor trait,改写scheduler()方法,就可以做到这一点。默认情况下,这个方法为要调度的actor返回单例对象Scheduler。改写这个方法就可以控制调度单独的actor的方式,如下所示:
package com.cn.gaoimport scala.actors._import scala.actors.scheduler._import Actor._object InMainThreadSelective extends App { trait SingleThreadActor extends Actor { override protected def scheduler() = new SingleThreadedScheduler() } class MyActor1 extends Actor { def act() = println("Actor1 running in " + Thread.currentThread()) } class MyActor2 extends SingleThreadActor { def act() = println("Actor2 running in " + Thread.currentThread()) } println("Main running in " + Thread.currentThread()) new MyActor1().start() new MyActor2().start() actor{println("Actor 3 running in " + Thread.currentThread())} receiveWithin(5000){ case _ => }}
上面的代码创建了三个actor,其中,两个继承自Actor trait,一个使用了更为常规的actor()方法。通过改写protected方法scheduler,就可以控制MyActor2的线程。运行上述代码时,使用actor()和MyActor1创建的actor运行于自己的线程。而使用MyActor2创建的actor则运行于主线程,如下所示:
Main running in Thread[main,5,main]Actor2 running in Thread[main,5,main]Actor1 running in Thread[ForkJoinPool-1-worker-5,5,main]Actor 3 running in Thread[ForkJoinPool-1-worker-3,5,main]