为什么会有这个文章?在嵌入式的环境下,有需要执行多个并行的任务的需求,但是 RTOS 又太重了,比如控制 LED 闪烁的时候,同时控制蜂鸣器,同时又要解析串口数据,又有一堆需要等待运行的任务,如 GPS 对时,配置通信模块,但是远距离通信模块的速率又不能太快,容易爆缓存,发送一条以后需要等待一定的时间,如果使用简单的 HAL_Delay 会导致 CPU 空转,影响其他任务的执行,这个时候需要一种轻量级的多任务工具,在不同任务之间切换,Delay 的时候释放出 CPU 资源给其他任务使用,这就是协程。

简介

协程(coroutine)顾名思义就是“协作的例程”(co-operative routines)。跟具有操作系统概念的线程不一样,协程是在用户空间利用程序语言的语法语义就能实现逻辑上类似多任务的编程技巧。实际上协程的概念比线程还要早,按照 Knuth 的说法“子例程是协程的特例”,一个子例程就是一次子函数调用,那么实际上协程就是类函数一样的程序组件,你可以在一个线程里面轻松创建数十万个协程,就像数十万次函数调用一样。只不过子例程只有一个调用入口起始点,返回之后就结束了,而协程入口既可以是起始点,又可以从上一个返回点继续执行,也就是说协程之间可以通过 yield 方式转移执行权,对称(symmetric)、平级地调用对方,而不是像例程那样上下级调用关系。当然 Knuth 的“特例”指的是协程也可以模拟例程那样实现上下级调用关系,这就叫非对称协程(asymmetric coroutines)。

switch-case

首先思考一个问题,我们现在的需求是在单个任务中,在 delay 的时候,要将 cpu 释放出来,下次再进来的时候,可以恢复运行到现在的位置。在有操作系统的情况下,可以通过保存上下文(比如程序计数器)保存在堆栈上,然后任务调度器再去恢复别的任务,进行上下文切换,遗憾的是似乎只有更底层的汇编语言才能做到这一点。协程虽然也可以模仿这个去实现,大概意思是:传统意义上的函数调用是怎样的,协程也尝试去模仿、改造、封装。但是在嵌入式的单线程情况下,这虽然实现了协程的上下文切换,相较之下前者在应用上会产生相当的不确定性(比如不好封装),不希望引入太多的不确定性,稳定才是第一要素。

那么,什么样的结构能够实现随时退出,并且回到当前位置呢?就像 Python 里面的 yield 一样

1
2
3
def function():
for i in range(10):
yield i

连续对它调用 10 次,它能分别返回 0 到 9。该怎样实现呢?可以利用 goto 语句,如果在函数中加入一个状态变量,就可以这样实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
int function(void) {
static int i, state = 0;
switch (state) {
case 0: goto LABEL0;
case 1: goto LABEL1;
}
LABEL0: /* start of function */
for (i = 0; i < 10; i++) {
state = 1; /* so we will come back to LABEL1 */
return i;
LABEL1:; /* resume control straight after the return */
}
}

但是仔细想想,其实可以不用 switch 语句来决定要跳转到哪里去执行,而是直接利用 switch 语句本身来实现跳转:

1
2
3
4
5
6
7
8
9
10
11
int function(void) {
static int i, state = 0;
switch (state) {
case 0: /* start of function */
for (i = 0; i < 10; i++) {
state = 1; /* so we will come back to "case 1" */
return i;
case 1:; /* resume control straight after the return */
}
}
}

到这里肯定很多人都一头雾水了,这 switch 还能这么用?写了十几年的 c 语言白写了?其实说白了 C 语言就是脱胎于汇编语言的,switch-case 跟 if-else 一样,无非就是汇编的条件跳转指令的另类实现而已。
还可以用 LINE 宏使其更加一般化:

1
2
3
4
5
6
7
8
9
10
11
int function(void) {
static int i, state = 0;
switch (state) {
case 0: /* start of function */
for (i = 0; i < 10; i++) {
state = __LINE__ + 2; /* so we will come back to "case __LINE__" */
return i;
case __LINE__:; /* resume control straight after the return */
}
}
}

进一步使用宏提炼出一种范式,封装成组件:

1
2
3
4
5
6
7
8
9
10
#define Begin() static int state=0; switch(state) { case 0:
#define Yield(x) do { state=__LINE__; return x; case __LINE__:; } while (0)
#define End() }
int function(void) {
static int i;
Begin();
for (i = 0; i < 10; i++)
Yield(i);
End();
}

怎么样,看起来像不像发明了一种全新的语言?实际上我们利用了 switch-case 的分支跳转特性,以及预编译的 LINE 宏,实现了一种隐式状态机,最终实现了“yield 语义”。每次进入 function 以后,都会跳转到 Yield 处执行,然后释放出 CPU 的权限。

超级轻量级的无栈协程实现

下面的教程来自于一位 ARM 工程师、天才黑客 Simon Tatham 的开源 C 协程库 protothreads,这是一个全部用 ANSI C 写成的库,就是说,实现已经不能再精简了,几乎就是原语级别。事实上 protothreads 整个库不需要链接加载,因为所有源码都是头文件,类似于 STL 这样不依赖任何第三方库,在任何平台上可移植;总共也就 5 个头文件,有效代码量不足 100 行;API 都是宏定义的,所以不存在调用开销;最后,每个协程的空间开销是 2 个字节(是的,你没有看错,就是一个 short 单位的“栈”!)当然这种精简是要以使用上的局限为代价的,接下来的分析会说明这一点。

Protothreads 的原语和组件

1
2
3
4
#define LC_INIT(s) s = 0
#define LC_RESUME(s) switch (s) { case 0:
#define LC_SET(s) s = __LINE__; case __LINE__:
#define LC_END(s) }

但这种“原语”有个难以察觉的缺陷:就是你无法在 LC_RESUME 和 LC_END (或者包含它们的组件)之间的代码中使用 switch-case 语句,因为这会引起外围的 switch 跳转错误!为此,protothreads 又实现了基于 GNU C 的调度“原语”。在 GNU C 下还有一种语法糖叫做标签指针,就是在一个 label 前面加 &&(不是地址的地址,是 GNU 自定义的符号),可以用 void 指针类型保存,然后 goto 跳转:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typedef void * lc_t
#define LC_INIT(s) s = NULL
#define LC_RESUME(s) \
do { \
if (s != NULL) { \
goto *s; \
}
} while (0)
#define LC_CONCAT2(s1, s2) s1##s2
#define LC_CONCAT(s1, s2) LC_CONCAT2(s1, s2)
#define LC_SET(s) \
do { \
LC_CONCAT(LC_LABEL, __LINE__): \
(s) = &&LC_CONCAT(LC_LABEL, __LINE__); \
} while (0)

好了,有了前面的基础知识,理解这些“原语”就是小菜一叠,下面看看如何建立“组件”,同时也是 protothreads API,我们先定义四个退出码作为协程的调度状态机:

1
2
3
4
#define PT_WAITING 0
#define PT_YIELDED 1
#define PT_EXITED 2
#define PT_ENDED 3

下面这些 API 可直接在应用程序中调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/* 初始化一个协程,也即初始化状态变量 */
#define PT_INIT(pt) LC_INIT((pt)->lc)

/* 声明一个函数,返回值为 char 即退出码,表示函数体内使用了 proto thread,(个人觉得有些多此一举) */
#define PT_THREAD(name_args) char name_args

/* 协程入口点, PT_YIELD_FLAG=0表示出让,=1表示不出让,放在 switch 语句前面,下次调用的时候可以跳转到上次出让点继续执行 */
#define PT_BEGIN(pt) { char PT_YIELD_FLAG = 1; LC_RESUME((pt)->lc)

/* 协程退出点,至此一个协程算是终止了,清空所有上下文和标志 */
#define PT_END(pt) LC_END((pt)->lc); PT_YIELD_FLAG = 0; \
PT_INIT(pt); return PT_ENDED; }

/* 协程出让点,如果此时协程状态变量 lc 已经变为 __LINE__ 跳转过来的,那么 PT_YIELD_FLAG = 1,表示从出让点继续执行。 */
#define PT_YIELD(pt) \
do { \
PT_YIELD_FLAG = 0; \
LC_SET((pt)->lc); \
if(PT_YIELD_FLAG == 0) { \
return PT_YIELDED; \
} \
} while(0)

/* 附加出让条件 */
#define PT_YIELD_UNTIL(pt, cond) \
do { \
PT_YIELD_FLAG = 0; \
LC_SET((pt)->lc); \
if((PT_YIELD_FLAG == 0) || !(cond)) { \
return PT_YIELDED; \
} \
} while(0)

/* 协程阻塞点(blocking),本质上等同于 PT_YIELD_UNTIL,只不过退出码是 PT_WAITING,用来模拟信号量同步 */
#define PT_WAIT_UNTIL(pt, condition) \
do { \
LC_SET((pt)->lc); \
if(!(condition)) { \
return PT_WAITING; \
} \
} while(0)

/* 同 PT_WAIT_UNTIL 条件反转 */
#define PT_WAIT_WHILE(pt, cond) PT_WAIT_UNTIL((pt), !(cond))

/* 协程调度,调用协程 f 并检查它的退出码,直到协程终止返回 0,否则返回 1。 */
#define PT_SCHEDULE(f) ((f) < PT_EXITED)

/* 这用于非对称协程,调用者是主协程,pt 是和子协程 thread (可以是多个)关联的上下文句柄,主协程阻塞自己调度子协程,直到所有子协程终止 */
#define PT_WAIT_THREAD(pt, thread) PT_WAIT_WHILE((pt), PT_SCHEDULE(thread))

/* 用于协程嵌套调度,child 是子协程的上下文句柄 */
#define PT_SPAWN(pt, child, thread) \
do { \
PT_INIT((child)); \
PT_WAIT_THREAD((pt), (thread)); \
} while(0)

用户还可以根据自己的需求随意扩展组件,比如实现信号量,你会发现脱离了操作系统环境下的信号量竟是如此简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
struct pt_sem {
unsigned int count;
};

#define PT_SEM_INIT(s, c) (s)->count = c

#define PT_SEM_WAIT(pt, s) \
do { \
PT_WAIT_UNTIL(pt, (s)->count > 0); \
--(s)->count; \
} while(0)

#define PT_SEM_SIGNAL(pt, s) ++(s)->count

使用示例

在嵌入式项目中,如 stm32 的 hal 项目中使用,只需要把 pt 的几个头文件加到 includePath 中,然后#include"pt.h"就可以了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include "pt.h"
#include "main.h"

static int
protothread1(struct pt *pt)
{
/* A protothread function must begin with PT_BEGIN() which takes a
pointer to a struct pt. */
PT_BEGIN(pt);

/* We loop forever here. */
while (1)
{
delay_ticks = HAL_GetTick() + 1000;
PT_WAIT_UNTIL(pt, HAL_GetTick() > delay_ticks);
HAL_GPIO_Troggle(LED_Port,LED_Pin);
}

PT_END(pt);
}

static int
protothread2(struct pt *pt)
{
/* A protothread function must begin with PT_BEGIN() which takes a
pointer to a struct pt. */
PT_BEGIN(pt);

/* We loop forever here. */
while (1)
{
delay_ticks = HAL_GetTick() + 1000;
PT_WAIT_UNTIL(pt, HAL_GetTick() > delay_ticks);
HAL_GPIO_Troggle(BEEP_Port,BEEP_Pin);
}

PT_END(pt);
}

/**
* Finally, we have the main loop. Here is where the protothreads are
* initialized and scheduled. First, however, we define the
* protothread state variables pt1 and pt2, which hold the state of
* the two protothreads.
*/
static struct pt pt1, pt2;
int
main(void)
{
/* Initialize the protothread state variables with PT_INIT(). */
PT_INIT(&pt1);
PT_INIT(&pt2);

/*
* Then we schedule the two protothreads by repeatedly calling their
* protothread functions and passing a pointer to the protothread
* state variables as arguments.
*/
while(1) {
protothread1(&pt1);
protothread2(&pt2);
}
}