lua程序设计(6) - 协同程序

协同程序(Coroutine)类似于多线程情况下的线程:是一种程序的运行绪,它有自己的栈,自己的局部变量,自己的当前指令指针(Instruction Pointer),但与其他协同程序共享全局变量以几乎所有其他资源。线程和协同程序的主要区别在于:在概念上讲,在多处理器情况下,拥有多线程的程序并发运行多个线程;而协同程序是通过协作来完成,在任一指定时刻只有一个协同程序在运行,一个正在运行的协同程序只在它显式要求被挂起时才会被挂起。

a所有与协同程序有关的函数都在coroutine表中。用于创建新的协同程序的create函数,只有一个参数:一个函数,即协同程序将要执行的代码。若一切顺利,create函数将返回一个thread类型的值,这就是一个新创建的协同程序。通常情况下,create的参数是一个匿名函数:
co = coroutine.create(function ()
print(“hi”)
end)
print(co) –> thread: 0x8071d98
协同程序有三个状态:挂起(Suspended)、运行(Running)、停止(Dead)。当我们成功创建协同程序时,其初始状态为挂起,即此时协同程序并不自动运行其代码。可以用status函数测试协同程序所处的状态:
print(coroutine.status(co)) –> suspended
函数coroutine.resume通过使协同程序的状态由挂起变为运行,可以启动或重新启动一个协同程序,或者说唤醒一个协同程序
coroutine.resume(co) –> hi
本例中,协同程序打印出“hi”并终止程序,之后便进入无法再次被唤醒的终止状态:
print(coroutine.status(co)) –> dead

当目前为止,协同看起来只是一种复杂的调用函数的方式,真正的强大之处体现在yield函数,它可以将正在运行的代码挂起以便再次被唤醒。先看一个简单的例子:
co = coroutine.create(function()
for i = 1, 10 do
print(“co”, i)
coroutine.yield()
end
end)
当我们唤醒这个协同程序,它将开始执行代码,直至遇到第一个yield处,这时它被挂起:
coroutine.resume(co) –> co 1
如果这时检查其状态,我们可以看到这个协同程序已经被挂起,因此它能够再次被唤醒:
print(coroutine.status(co)) –> suspended
站在协同程序的角度来看:当协同程序被挂起的时候,它将进入yield函数的内部循环,当我们唤醒被挂起的协同程序时,一开始对yield的调用指令将返回(即yield函数退出其内部循环),继而执行yield调用之后的代码,直到再次遇到yield或程序结束:
coroutine.resume(co) –> co 2
coroutine.resume(co) –> co 3

coroutine.resume(co) –> co 10
coroutine.resume(co) – prints nothing
上例中最后一次调用resume函数时,协同程序已完成循环并返回,因此协同程序处于终止状态。如果仍然试图唤醒它,那么它将返回空值和相应的错误信息:
print(coroutine.resume(co))
–> false cannot resume dead coroutine
一个有用的Lua应用是利用成对出现的resume和yield函数来相互(双方分别为调用resume和yield的代码段)交换数据。第一个例子中只有resume,没有yield,resume函数接受额外的信息作为协同程序的参数:
co = coroutine.create(function(a, b, c)
print(“co”, a, b, c)
end)
coroutine.resume(co, 1, 2, 3) –> co 1 2 3
第二个例子中,调用resume后得到的返回值包括:用以指示协同程序无错运行的true值,和传递给对应的yield函数的所有参数:
co = coroutine.create(function(a, b)
coroutine.yield(a + b, a - b)
end)
print(coroutine.resume(co, 20, 10)) –> true 30 10
同样地,传递给对应的resume函数的参数,也会被传递给yield函数:
co = coroutine.create(function()
print(“co”, coroutine.yield())
end)
coroutine.resume(co)
coroutine.resume(co, 4, 5) –> co 4 5
最终,当协同程序结束时,其所有的返回值,也会传给对应的resume函数:
co = coroutine.create(function()
return 6, 7
end)
print(coroutine.resume(co)) –> true 6 7
我们很少在一个协同程序中同时使用多个特性,但每一种都有其用处。

现在已大体了解了协同程序的基础内容,在我们继续学习之前,先澄清某些概念。Lua的协同被称为非对称协同程序(Asymmetric Coroutines),指挂起一个正在运行的协同程序的函数与唤醒一个被挂起的协同程序的函数是不同的,而有些语言提供了对称协同程序(Symmetric Coroutines),它们使用一个函数负责分配协同程序的程序运行控制权。
有人称非对称的协同程序为“半协同程序”(因为它们并非对称,也并不是真正的合作关系),而另一些人使用同样的术语表示“受限制的协同程序”,这意味着:一个受限制的协同程序只有运行在任何非其他函数内部的时候才能将自己挂起(放弃程序运行控制权),也就是说,在该协同程序的运行栈内部没有后续调用的时候才能放弃控制权。换句话说,只有在半协同程序的内部才能使用yield函数将自己挂起,Python中的生成器(Generator)就是这种类型的半协同程序。
与对称协同和非对称协同的区别不同的是,协同程序与生成器的区别更大:生成器相对比较简单,它不能完成真正的协同程序所能完成的一些任务。Lua提供了真正的非对称的协同程序,而且以非对称的协同机制为基础可以很容易地实现对称的协同程序:每一次程序运行控制权的转移都在执行resume之后接着执行yield函数将自己挂起。

示例1:管道和过滤器
function receive(prod)
local status, value = coroutine.resume(prod)
return value
end
function send(x)
coroutine.yield(x)
end
function producer()
return coroutine.create(function()
while true do
local x = io.read() – produce new value
send(x)
end
end)
end
function filter(prod)
return coroutine.create(function()
local line = 1
while true do
local x = receive(prod) – get new value
x = string.format(“%5d %s”, line, x)
send(x) – send it to consumer
line = line + 1
end
end)
end
function consumer(prod)
while true do
local x = receive(prod) – get new value
io.write(x, “\n”) – consume new value
end
end
最后需要做的仅仅是,创建所需要的组件(生产者)、将其与过滤器连接并启动消费者:
p = producer()
f = filter(p)
consumer(f)
或者:
consumer(filter(producer()))
看完上面这个例子你可能很自然地想到UNIX的管道,协同程序是一种非抢占式的多线程。管道方式下,每个任务在独立的进程中运行,而在协同程序方式下,每个任务运行于独立的协同程序中。管道在写(生产者)与读(消费者)之间提供了一个缓冲,因此两者之间的相对速度并不影响正常的读写这在管道的上下文中这是非常重要的,因为在进程间的进行切换(由于读写速度不一样,经常需要从速度快的进程切换到速度慢的进程以防止慢速的进程被淹没)的代价是很高的。对于协同程序的方式,任务间的切换代价要小很多,其代价几乎等同于函数调用的代价,因此程序可以在读写之间进行无间的切换。

示例2:批量下载
我们已经了解如何下载单个文件,下面回到如何下载多个文件这个问题上来。一种简单的方法是每次下载一个文件直到所有文件都被下载,但是这种依次下载的方式速度太慢:必须在一个文件下载完之后才能开始下载后续的文件,而在一个请求发送之后绝大部分时间都是在等待数据的到达,也就是说,大部分时间都花费在receive函数的运行上(此时receive处于等待数据接收的阻塞状态)。如果可以同时下载多个文件,速度将得到很大的提高:当一个连接没有数据到达时,可以从另一个连接读取数据。很显然,协同程序为这种并发下载提供了便利的支持,我们为每一个下载任务创建一个线程,当某一线程没有数据到达时,它将控制权交给一个分配器,分配器将唤醒其他线程尝试读取数据。
为了使用协同程序的方式,首先需要将上述下载文件的代码封装到一个函数内:
function download(host, file)
local c = assert(socket.connect(host, 80))
local count = 0 – counts number of bytes read
c:send(“GET “ .. file .. “ HTTP/1.0\r\n\r\n”)
while true do
local s, status = receive(c)
count = count + string.len(s)
if status == “closed” then
break
end
end
c:close()
print(file, count)
end
文件内容为何无关大体,因此上述函数只是计算文件尺寸,而没有将文件内容显示到标准输出上(多个线程读取多个文件时,输出会混杂在一起)。在新的代码中,我们使用一个辅助receive通过远程连接来接收数据,采用依次接收数据的方式时,代码如下:
function receive(connection)
return connection:receive(2 ^ 10)
end
如果采用并发接收数据的方式,函数在接收数据时不能被阻塞,在没有数据可取应该自行挂起,其代码如下:
function receive(connection)
connection:timeout(0) – do not block
local s, status = connection:receive(2 ^ 10)
if status == “timeout” then
coroutine.yield(connection)
end
return s, status
end
执行timeout(0)使得该连接上的任何操作都不会被阻塞,当返回的操作状态为“timeout”时意味着操作超时返回,此时,线程自行挂起。使用非假值作为yield的参数将通知分配器仍有线程在执行任务(稍后还将看到需要超时连接的分配器)。注意即使是超时返回,该连接依然返回它已经接收的数据,因此receive总是返回s给它的调用者。

下面的函数保证每一个下载任务都运行于独立的线程内:

threads = {} – list of all live threads
function get(host, file)
– create coroutine
local co = coroutine.create(function()
download(host, file)
end)
– insert it in the list
table.insert(threads, co)
end
代码中的threads表为分配器保存着所有处于活动状态的线程。
分配器的代码很简单,它遍历并调用每一个线程。当然,它也必须在线程完成其任务后将其从任务列表内移除,当任务列表为空时它将终止循环:
function dispatcher()
while true do
local n = table.getn(threads)
if n == 0 then break end – no more threads to run
for i = 1, n do
local status, res = coroutine.resume(threads[i])
if not res then – thread finished its task?
table.remove(threads, i)
break
end
end
end
end
最后,需要在主程序中创建所需的线程并调用分配器,假设需要从W3C下载四个文件,主程序如下:
host = “www.w3c.org"
get(host, “/TR/html401/html40.txt”)
get(host, “/TR/2002/REC-xhtml1-20020801/xhtml1.pdf”)
get(host, “/TR/REC-html32.html”)
get(host, “/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt”)
dispatcher() – main loop
采用协同程序的方式,在我的机器上花费了6秒下载完这四个文件,而依次下载的方式花费了15秒,超过两倍的时间。
尽管新的方式速度提高了,但还不是很理想,这种方式在至少有一个线程有数据可读取时,运行得还不错,如果所有线程都处于等待数据的状态时,分配器将进入忙等待状态,逐个检查各个线程,结果发现每一个线程都没有任何数据可读,由此导致了,协同程序实现的代码比依次下载的方式多花费了近30倍的CPU资源。
为了避免这种情况,我们可以使用LuaSocket库中的select函数,在程序能够从阻塞状态向唤醒状态切换前,一直处于被阻塞状态,也就是说,在程序无法摆脱阻塞状态时,不会花费CPU时间来查询其状态。为了实现这种能力,只需要稍稍修改已有的分配器即可,新版本的分配器代码如下:

function dispatcher()
while true do
local n = table.getn(threads)
if n == 0 then break end – no more threads to run
local connections = {}
for i = 1, n do
local status, res = coroutine.resume(threads[i])
if not res then – thread finished its task?
table.remove(threads, i)
break
else – timeout
table.insert(connections, res)
end
end
if table.getn(connections) == n then
socket.select(connections)
end
end
end
在内层循环中,分配器将所有超时的连接记录到connections表,记住receive将连接传递给yield,因此resume才能够将其返回。当所有的连接都超时的时候,分配器调用select函数,等待其中任何一个连接的状态改变。修改版本的运行速度与最初的版本相当,另外,因为它并不忙等待处于阻塞状态的连接,因此使用的CPU的也只是比依次下载的方式多了少许。