文章

Nim中的异步编程

上一篇文章中,我们对各种多任务进行了初步了解,在这篇文章中,我们将了解最简单的多任务形式,即异步执行。正如上一篇文章所讨论的,异步执行是我们的程序告诉硬件做某事,然后在等待硬件完成操作的同时做其他事的一种方式。例如,当我们要将文件读入内存或进行网络通信时,异步执行就非常有用。与在CPU上运行计算相比,这两项任务的速度都非常慢,而在等待它们完成的过程中,我们可以做很多事情。正如入门指南中提到的,异步执行只有在程序受IO约束时才能真正帮助我们加快速度,但对于这些任务来说,异步执行几乎是必不可少的。

Nim的异步调度分发器

Nims异步执行的实现是在一个库中完成的,该库分为几个部分。其核心是标准库中的asyncdispatch模块。它的工作原理是创建一个全局调度器(技术上是每个线程一个),负责运行在其中注册的过程。这些过程会用{async}语法进行标记,稍后我们会进一步探讨。在调用其他异步存储过程时,它们会使用{await}关键字等待该过程完成。这就意味着,在调用await时,存储过程会告诉调度程序,它已暂时执行完毕,可以自由执行其他操作,直到等待的操作完成。这种多任务处理被称为 “协作式多任务处理”,因为每个执行单元都会主动告诉调度程序何时可以改变执行内容。只有当我们主动告诉执行单元我们允许它改变时,执行单元才能改变。让我们来看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncdispatch # 它为我们提供了async语义和派发器
import times, strutils # 这是为了提供定时输出
 
let start = epochTime()
 
proc ticker() {.async.} =
  ## 这个简单的程序会显示输出10次“tick ”,每次间隔100毫秒。
  ## 每次滴答中,我们用它来显示其他程序之间的时间间隔。
  for i in 1..10:
    await sleepAsync(100)
    echo "tick ",
         i*100, "ms ",
         split($((epochTime() - start)*1000), '.')[0], "ms (real)"
 
proc delayedEcho(message: string, wait: int) {.async.} =
  ## 在显示 `message` 之前,只需等待 `wait` 毫秒。
  await sleepAsync(wait)
  echo message
 
let
  delayedEchoFuture = delayedEcho("Hello world", 550)
  tickerFuture = ticker()
 
waitFor tickerFuture and delayedEchoFuture
echo delayedEchoFuture.finished

运行时,将输出:

1
2
3
4
5
6
7
8
9
10
11
12
tick 100ms 100ms (real)
tick 200ms 200ms (real)
tick 300ms 300ms (real)
tick 400ms 401ms (real)
tick 500ms 501ms (real)
Hello world
tick 600ms 601ms (real)
tick 700ms 702ms (real)
tick 800ms 802ms (real)
tick 900ms 902ms (real)
tick 1000ms 1003ms (real)
true

这里发生的情况是,调用delayedEchoticker会返回一个Future[void]对象。这将立即发生,无需阻塞等待。该对象封装了调用的返回类型(在本例中为void,因为我们没有返回任何内容),并为我们提供了一个引用,我们可以用它来询问调度程序是否完成了调用。这就是最后一行发生的事情,我们在这一行检查delayedEchofuture对象是否已完成。但是,future对象本身并不能得到执行,我们需要实际运行调度程序,才能运行注册到调度程序中的代码。请记住,所有这些仍在单线程执行中运行。运行调度程序的方法有很多,但在本例中是通过调用waitFor来实现的。当我们运行waitFor时,调度程序将循环运行,直到给定的future完成,然后返回该future的包含的值。在本例中,我们使用and过程来接收两个future并创建一个新的future,当两个给定的future都完成时,这个新的future也就完成了。当这个future完成时,程序也就完成了。如果我们只等待delayedEchoFuture,那么“Hello world”之后的“tick”信息将不会显示,因为当delayedEchoFuture完成时,派发器循环停止后,该过程将不会继续执行。

滴答滴答,重复执行

就像在ticker中看到的那样,这样的Future在其生命周期中可以多次将其执行权交还给调度程序。如果我们看一下下面的示例(从一个非常类似的go协程示例修改而来),就可以看到在这种情况下,调度程序是如何在任务之间来回切换的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncdispatch # 它为我们提供了async语义和派发器
import times, strutils
 
let start = epochTime()
template timestampEcho(x: varargs[string, `$`]): untyped =
  for s in x:
    stdout.write s
  echo " ", split($(1000*(epochTime() - start)), '.')[0], "ms"
 
proc numbers() {.async.} =
  for i in 1..5:
    await sleepAsync(250)
    timestampEcho i
 
proc letters() {.async.} =
  for i in 'a'..'e':
    await sleepAsync(400)
    timestampEcho i
 
var
  n = numbers()
  l = letters()
 
waitFor sleepAsync(2500)
echo "main terminated"

这个程序会输出杂乱无章的字母和数字,但如果我们看一下时间戳,就能知道发生了什么:

1
2
3
4
5
6
7
8
9
10
11
1 250ms
a 400ms
2 501ms
3 751ms
b 801ms
4 1002ms
c 1202ms
5 1252ms
d 1603ms
e 2003ms
main terminated

如果我们仔细观察我们的两个异步任务,并绘制出它们的睡眠时间间隔,我们就能看到调度程序是如何交错执行并很好地绘制出来的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
         0ms     250ms   500ms   750ms   1000ms  1250ms
           ┌───────┬───────┬───────┬───────┬───────┐
numbers    │       │       │       │       │       │
           └───────┴───────┴───────┴───────┴───────┘
                   1       2       3       4       5

         0ms          400ms        800ms        1200ms       1600ms       2000ms
           ┌────────────┬────────────┬────────────┬────────────┬────────────┐
characters │            │            │            │            │            │
           └────────────┴────────────┴────────────┴────────────┴────────────┘
                        a            b            c            d            e

         0ms                                                                               2500ms
           ┌─────────────────────────────────────────────────────────────────────────────────┐
main       │                                                                                 │
           └─────────────────────────────────────────────────────────────────────────────────┘
                                                                                     main terminated

                      400ms        800ms        1200ms       1600ms       2000ms
         0ms     250ms   500ms   750ms   1000ms  1250ms                                    2500ms
           ┌───────┬────┬──┬───────┬─┬─────┬──────┬┬───────────┬────────────┬────────────────┐
executed   │       │    │  │       │ │     │      ││           │            │                │
           └───────┴────┴──┴───────┴─┴─────┴──────┴┴───────────┴────────────┴────────────────┘
                   1    a  2       3 b     4      c5           d            e        main terminated

时间在流逝!

您可能已经注意到,实际时间正在慢慢偏离“预期”时间。例如,在上一个示例中,当我们读到最后一个字母时,时间偏离了 3毫秒。这是由两个不同的原因造成的:首先,每个sleepAsync当然都是相对于它被调用的时间而言的。因此,由于我们的代码需要一些时间来执行,这些语句之间的累计执行时间就会开始显示出来。如果我们想要更精确的步进,我们可以在开始循环之前设置所有计时器,使它们都相对于程序开始的时间:

1
2
3
4
5
6
7
proc numbers() {.async.} =
  let sleeps = [sleepAsync(250), sleepAsync(500), sleepAsync(750),
                sleepAsync(1000), sleepAsync(1250)]
  for i in 1..5:
    let fut = sleeps[i-1]
    await fut
    timestampEcho i

或者,我们也可以根据延迟时间动态计算睡眠时间:

1
2
3
4
5
6
proc numbers() {.async.} =
  var fsleep = epochTime()
  for i in 1..5:
    let d = (epochTime() - fsleep)*1000 - (i.float-1)*250
    await sleepAsync(250 - d)
    timestampEcho i

但是,您可能仍然会看到预期时间和睡眠周期时间之间存在一些差异。这是由于异步执行提供的协作式多任务处理的另一个影响。由于我们依赖于每个执行片段主动将控制权交还给调度程序,因此我们永远无法确定需要多长时间才能将执行交还给调度程序。如果一个任务的执行时间超过了任务执行前的剩余时间,我们就会发现执行延迟了。调度程序本身也会花费一些时间来执行任务,这就是我们在上例中看到的情况。如果我们看一下上一个添加了动态休眠的示例,并在numbersletters程序的循环中添加sleep(100)语句(来自os模块,而不是asyncdispatch 提供的sleepAsync)来模拟一些工作,我们就会发现一些字母和数字是在错误的时间打印的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
proc numbers() {.async.} =
  var fsleep = epochTime()
  for i in 1..5:
    let d = (epochTime() - fsleep)*1000 - (i.float-1)*250
    await sleepAsync(250 - d)
    timestampEcho i
    sleep(100)
 
proc letters() {.async.} =
  var fsleep = epochTime()
  for i in 'a'..'e':
    let d = (epochTime() - fsleep)*1000 - (i.ord - 'a'.ord).float*400
    await sleepAsync(400 - d)
    timestampEcho i
    sleep(100)
1
2
3
4
5
6
7
8
9
10
11
1 250ms
a 400ms
2 501ms
3 750ms
b 850ms
4 1001ms
c 1200ms
5 1300ms
d 1601ms
e 2000ms
main terminated

如果我们看一下这里的时间安排,再绘制一下地图,就会明白为什么会出现这种情况。当一个任务的执行(用粗体方格标记)与另一个任务的开始重叠时,它就会被延迟,直到第一个任务完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
         0ms     250ms   500ms   750ms   1000ms  1250ms
           ┌───────┲━━┱────┲━━┱────┲━━┱────┲━━┱────┲━━┓
numbers    │       ┃  ┃    ┃  ┃    ┃  ┃    ┃  ┃    ┃  ┃
           └───────┺━━┹────┺━━┹────┺━━┹────┺━━┹────┺━━┛
                   1       2       3       4       5

         0ms          400ms        800ms        1200ms       1600ms       2000ms
           ┌────────────┲━━┱─────────┲━━┱─────────┲━━┱─────────┲━━┱─────────┲━━┓
characters │            ┃  ┃         ┃  ┃         ┃  ┃         ┃  ┃         ┃  ┃
           └────────────┺━━┹─────────┺━━┹─────────┺━━┹─────────┺━━┹─────────┺━━┛
                        a            b            c            d            e

         0ms                                                                               2500ms
           ┌─────────────────────────────────────────────────────────────────────────────────┐
main       │                                                                                 │
           └─────────────────────────────────────────────────────────────────────────────────┘
                                                                                     main terminated

                      400ms          850ms      1200ms       1600ms       2000ms
         0ms     250ms   500ms   750ms   1000ms    1300ms                                  2500ms
           ┌───────┲━━┱─┲━━┳━━┱────┲━━┳━━┱─┲━━┱───┲━━┳━━┱──────┲━━┱─────────┲━━┱─────────────┐
executed   │       ┃  ┃ ┃  ┃  ┃    ┃  ┃  ┃ ┃  ┃   ┃  ┃  ┃      ┃  ┃         ┃  ┃             │
           └───────┺━━┹─┺━━┻━━┹────┺━━┻━━┹─┺━━┹───┺━━┻━━┹──────┺━━┹─────────┺━━┹─────────────┘
                   1    a  2       3  b    4      c  5         d            e        main terminated

因此,我们可以看到,执行3时,b会偏离目标50毫秒;执行c时,5同样会偏离目标50毫秒。任务2被设置为紧随任务a 之后启动,因此如果任务a的实际工作进展缓慢,它可能会稍有延迟,但在本例中很难察觉。出现这种情况的原因是,os.sleep过程会让整个执行线程进入休眠状态,这对于调度程序来说,与任务需要很长时间才能完成是没有区别的。这意味着当睡眠计时器耗尽时,调度程序无法切换到另一个计划任务,因为它并没有运行。因此,sleepAsync过程更应该被视为一个“至少睡眠这么长时间”的过程。这种现象被称为 “执行饥饿”,因为耗时过长的任务实际上“饥饿”了调度程序和其他任务的执行时间。在协作式多任务处理中,除了确保我们的任务行为良好(毕竟它们应该 “合作”)外,我们对此无能为力。如果你知道某个过程会做很多工作,并想尝试手动将执行权交给其他人,你可以添加一些简单的await sleepAsync(0)语句,让调度程序不时运行其他任务。举例来说,如果我们添加10条sleep(10)语句和await sleepAsync(0) 语句,而不是单条sleep(100),我们的任务延迟时间就不会超过10毫秒。

结束语

正如我们所见,异步编程是加快代码速度的重要工具。不过,它也有自己的优点和缺点。异步编程易于使用,因为它清楚地知道我们的代码会在哪些地方让位于其他任务。但它只能用于加速IO绑定的代码,因为我们仍然只能在单线程上运行。事实上,这是一种合作方式,这也意味着长时间运行的任务可能会意外延迟其他任务的执行。在下一篇文章中,我们将探讨另一种多任务处理方式,即抢占式多任务处理,它可以避免这一问题,并允许我们加快计算绑定代码的速度,但它也有自己的一系列问题。

附录A,实施细节:

上面的文章试图解释如何在Nim中使用异步编程,但如果你对它的工作原理感到好奇,我还附上了这一小部分内容。即使你只是对使用异步编程感兴趣,这部分内容也值得一读,因为了解了异步编程的底层机制,你就更容易推理出其中的原理。

因此,异步任务的工作方式是{.async.}pragma将过程重写为迭代器,并添加所需的逻辑,以便向派发器注册future对象。迭代器在Nim中主要用于迭代事物。但如果你写过迭代器,你就会知道它们能做过程能做的任何事情,但允许将控制权交还给调用者,同时保留其作用域。为了更好地理解异步执行的工作原理,让我们来看看如何创建这样一个迭代器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
iterator ourIterator(): int {.closure.} =
  var i = 1
  yield i
  while i <= 3:
    yield i
    i += 1
 
var x = ourIterator # Create an instance of the iterator
echo x() # Call the iterator
echo x()
echo x()
echo x()
echo x.finished
echo x()
echo x.finished

打印输出为:

1
2
3
4
5
6
7
1
1
2
3
false
0
true

这里有几件事需要注意:ourIterator被标记为闭包,这意味着我们可以创建实例并任意调用它,而不是以这种方式创建的迭代器是inline的,其行为更像模板,因为它们将在编译时被重写。另外,我们也可以从过程中返回一个迭代器,这样就隐含地使其成为一个闭包,并使我们能够在创建实例时传递参数。事实上,这样做更接近async宏的做法。我们还在迭代器中加入了两个yield语句,我们可以随意添加,并将它们放在任何我们喜欢的地方。它们将返回给定的值,下一次调用迭代器时,它将从上次执行yield的地方继续执行。这就是迭代器的魅力所在,它允许我们在保持代码范围的情况下跳进跳出。我们还可以看到对system.finished的两次调用,当迭代器的最后一次运行命中完成或返回语句,system.dinished返回true,反之返回false。这意味着最后的值无效(如这里的值为 0 所示),再调用迭代器就没有任何意义了。异步宏利用这一点来判断任务是否已完成,然后将最后一个有效值复制到future并执行分配给它的任何回调。

现在你可能已经开始了解Nim是如何实现异步执行的了。当我们在存储过程中添加{.async.}pragma 时,它将被重写为一个向全局调度程序注册迭代器的过程。然后,我们可以告诉该调度程序运行,它将运行其中的迭代器。这些迭代器每次执行await操作时都会调用yield。它们产生的值就是它们想要等待完成的future。当我们等待它的future结果时,调度程序将继续运行该任务,直到未来完成并将控制权传回给我们的迭代器。调度程序还支持定时器,这就是我们上面使用的sleepAsync的工作原理。这些定时器被简单地保存在一个列表中,调度程序将检查是否有定时器到期执行。

本文由作者按照 CC BY 4.0 进行授权