FreeRTOS-使用互斥量(mutex)保护共享数据
1 为什么要保护共享数据
我们直接以一个测试用例,来演示,当任务并发访问某个共享的全局变量时,如果不对数据访问加以保护,会产生什么样的结果。
首先,我们将工程配置文件FreeRTOSConfig.h中的configTICK_RATE_HZ
设置大一点:
1 |
这会让内核产生Tick
中断频率更高,任务间地切换会更频繁,更容易暴露出多任务并发访问共享数据导致的错误。
我们定义一个任务:该任务就是对全局变量累加 NUM
次,然后输出结果。如下所示:
1 | int32_t g_variable = 0; |
使用上面的函数,创建3个任务,3个任务对g_variable
共累加 3*NUM
次,因此g_variable
最终的结果应该是3*NUM
。main 函数如下:
1 | int main(void) { |
烧录运行后应该可以看到如下输出:当所有累加操作完成后,输出符合期望,即:g_variable == 3*NUM
1 | start FreeRTOS |
现在,我们将累加次数NUM
调高:
1 |
编译后,烧录运行。此时可以观察到,当所有累加操作完成后,输出的结果不符合期望:即:g_variable != 3*NUM
1 | start FreeRTOS |
为什么会产生这样的结果?我们利用开发工具来查看下g_variable += 1
这条 C 语句,所对应的汇编代码(keil工具点击Debug进入调试,点击菜单栏的view->Disassembly Window可以看到汇编代码):将编译优化等级设置为-O0,关闭优化。开启编译器优化后的代码,一般会有指令重排,看起来会比较乱,没有下图的好解释。
从上图,可以看到 g_variable += 1
这条语句被翻译成了5 调汇编。
MOVW 和 MOVT 两条指令的最终结果是将寄存器 r1 设置为常量: 0x20000524
在keil的编译输出文件 xxx.map 中,搜索 g_variable
,可以看到它的地址:就是 0x20000524
1 | g_variable 0x20000524 Data 4 main.o(.bss.g_variable) |
即,MOVW 和 MOVT 两条指令的作用是将 g_variable
的地址加载到 r1 寄存器中。
接着 LDR 就是从 r1中值所指示的地址处读取数据(即读取g_variable
)并放入 r0 寄存器中。
之后 ADDS 指令,将 r0 寄存器中的值加1
最后 STR 指令,就 r0 中的值,写到 r1值所指示的地址处(即更新g_variable
)。
综上,C 语言的一个更新变量操作g_variable += 1
。
在汇编层面,并不是一条指令就能完成,实际是由多条指令完成的:
- 将
g_variable
的值,从内存加载到到寄存器中 - 寄存器中的值加 1
- 寄存器中的数据写回内存
而内核的任务调度,可能会在上述三条指令的任意一条后发生。那么,就可能会发生任务A
正在更新变量g_variable
的值,但还未完成更新时,内核进行了任务切换,让另一个任务B
开始运行。而任务B
刚好也在更新这个值,那么会发生错误。如下图所示:
如上图所示,当task_a
将全局变量g_variable
的值加载到了寄存器后,还未更新数据并写回。此时,发生切换,内核会先将当前的“现场”(即一些硬件寄存器中的值)保存在task_a
自己的栈中,然后开始运行task_b
,task_b
也会更新g_variable的值
。过了一段时间,再次发生任务切换,任务task_a
恢复运行,内核会先将task_a
栈中保存的“现场”数据恢复(这样task_a
的运行状态就会和被打断前一致),之后执行 加 1 操作,以及写回操作。 此时,写回变量g_variable
的值,是task_a
被中断前记录的旧值 + 1,而不是task_b
更新后的值 + 1。 最终,导致了数据结果不符合预期。
这个例子中,共享资源只是一个 全局整型变量。有时,我们可能需要自定义一种结构体数据,并且定义了一个该类型的共享变量。如下所示:
1 | struct MyData{ |
我们定义了一个 结构体类型变量MyData var;
,该变量会被多个任务并发读/写。假设,每次写它时,需要写它的三个域;每次读时,也要读三个域的值。如下所示:
1 | // write |
对于这种情况,在 C 语言层面上的,写和读 就已经是多条语句了。所以不仅写需要操作需要保护(要么不执行,要么一次三个域都写完),读操作也需要进行保护(要么不读,要么三个域一起读完),否则读到一半,切换到另一个也会修改该变量的任务,等一段时间后又切换回被打断的这个任务,那么读到的三个域,就是部分属于旧值,部分属于新值(切换的那个任务修改的)。
2 如何保护对共享数据的访问
了解了问题产生原因,那么如何解决这种问题? 如果对共享资源的访问操作(如g_variable += 1
)是“原子操作”。即,要么执行了,要么没执行,不出现执行到一半被打断的情况。那就能保证数据的最终结果是符合预期的。
因此,当有多个任务并发访问共享数据时,我们需要让访问操作成为“原子操作”,要么不执行,执行就一定执行完。 或者说构建一个临界区,将访问操作保护起来,只要有一个任务在临界区中访问数据,该任务退出临界区前,其它任务就不能进入临界区。如下图所示:
FreeRTOS实现临界区的办法很多:
2.1 临时关闭中断实现临界区
在FreeRTOS-硬件中断嵌套模型和内核临界区一文中,我们介绍了FreeRTOS通过临时关闭中断的方式,实现了多任务可以安全地并发访问内核资源。例如,当我们创建多个任务,当这些任务均会向同一个消息队列中发送/提取 消息时(这会涉及到消息队列内部数据的变动,如果不加保护,多任务并发访问时肯定会破坏内部数据状态),我们直接使用FreeRTOS的发送/提取消息 API,不需要考虑多任务/中断处理函数并发访问消息队列时,如何保证消息队列内部数据结构不被破坏,因为内核在这些 API 内部已经实现了临界区,将对消息队列的访问过程进行了保护,使得消息队列内部数据状态不会被因任务并发访问而被破坏。
内核的实现临界区的方式,就是临时屏蔽中断(进入临界区),这样可以避免内核任务切换,以及其它中断的抢占,实现对共享数据的互斥访问。当访问完成后,再取消屏蔽(退出临界区)。
但关闭中断的方式,过于粗糙。毕竟中断都是用来响应重要事件,临时屏蔽中断很可能会导致一些重要事件的响应被延迟。因此,非必要的情况下,不应该使用这种方式。
2.2 临时关闭内核任务调度
另一种实现临界区的方式,是通过vTaskSuspendAll
临时关闭内核的任务调用(进入临界区),共享资源访问结束后,再通过xTaskResumeAll
恢复内核任务调度(退出临界区)。
不过, vTaskSuspendAll
只关闭了内核任务调度,避免了任务间的切换。但是,中断此时仍是正常使能的。
如果只是多个任务会并发的访问共享资源,通过临时关闭内核任务调度功能来实现临界区是可行的,任务在访问数据期间不会切换,也就不会出现任务访问共享数据一半时,内核切换到其它任务(该任务也访问共享数据)的情况。
任务调度的挂起操作虽然开销很低(更新内部标记变量),但是任务调度的恢复操作开销可能会很高。因为,在调度关闭过程中,大量内核tick
中断中的内核任务调度相关工作都被延迟到了 任务调度恢复的时候再执行。
2.3 使用互斥量
对于应用开发来说,互斥量(mutex)是实现临界区最合适工具。FreeRTOS 提供的互斥量,实际上是一种特殊的二值信号量。FreeRTOS的互斥量,就是在二值信号量的基础上增加了优先级继承特性,优先级继承特性使得互斥量相比普通的二值信号量,能最小化优先级反转带来的负面影响,更适用来实现临界区,保护数据的访问过程。
我们可以将互斥量理解成一把钥匙,每次访问共享资源前(进入临界区),必须需要先拿到钥匙,否则就不允许访问,任务可以原地阻塞等到可以拿到钥匙,或者返回拿不到钥匙的错误信息。 一旦拿到钥匙(进入了临界区中),就可以安全的访问共享资源了(因为此时不会有其它人拿到钥匙),等访问结束,将钥匙还回去,这样后续的其它访问请求就能拿到钥匙,继而访问数据,如下图所示:
可以发现,使用互斥量实现临界区来保护数据的访问过程,依赖于开发者保持一致的协议。即所有访问请求,必须拿到钥匙后才能执行后续访问,访问完成需要还回钥匙。如果,和你协作的开发者不遵守这个约定,不请求获取钥匙就直接去访问共享数据,那么数据的访问过程就不再是受保护的了。
由于互斥量本身就是属于一种特殊地二值信号量。因此,内核实现互斥量地原理和实现信号量地原理是一样地,都是基于内核消息队列来实现的(更详细的原理解释,可以参考文章二值信号量的第二节内容)。
创建一个互斥量,实际是创建一个大小为 1 的消息队列来,并且初始状态下,该消息队列中就存储了一个“消息”(只是内部的一个计数表示存在一个消息,没有实际存储)。
获取钥匙,就是从该消息队列中提取消息,如果没有消息(钥匙被别人拿走了),就原地阻塞,或者返回错误信息。
返还钥匙,就是向该消息队列中发送一个消息。此时如果刚好存在其它任务在等待“钥匙”,就能立刻获取到钥匙了。
在FreeRTOS中,互斥量的操作API 有部分是和信号量公用的(毕竟互斥量本身就是一种特殊的二值信号量)。
使用互斥量,首先需要在工程配置文件FreeRTOSConfig.h 中添加宏定义:
1 |
创建互斥量的 API 为:
1 | SemaphoreHandle_t xSemaphoreCreateMutex( void ); |
- 返回值为NULL,表示创建失败。否则,返回的就是用来识别所创建的互斥量的句柄。
获取互斥量(拿到钥匙)的 API为: 和获取信号量的API为同一个
1 | BaseType_t xSemaphoreTake( SemaphoreHandle_t xSemaphore, TickType_t xTicksToWait ); |
- xSemaphore:互斥量句柄,创建API 的返回值。
- xTicksToWait:如果当前“钥匙”已经被人拿走了,任务想原地阻塞,等待“钥匙”被其它访问者还回来时。该值就是设置最长等待多久。如果为0,表示不等待,立刻返回结果。
- 返回值:
- pdPASS-成功获取到“钥匙”,可以安全访问数据了。
- pdFAIL-没有获取到“钥匙”,说明有其它访问者正在访问,此时任务不应该访问共享数据。
返还互斥量(返还钥匙)的 API为:和设置信号量的API 为同一个
1 | BaseType_t xSemaphoreGive( SemaphoreHandle_t xSemaphore ); |
需要注意的是:互斥量不能在中断环境中使用(普通的信号量是可以的)! 这是由于互斥量含有的优先级继承特性(参考文章:优先级反转),是和当前持有该互斥量的任务的优先级关联的。 而中断是没有任务优先级概念的!
现在,我们来使用互斥量,解决文章开始例子中多任务对全局变量 g_variable
并发访问造成的结果不符合期望的问题。
我们使用互斥量实现临界区,将g_variable += 1
操作保护起来,如下所示:
1 | int32_t g_variable = 0; |
main 函数代码如下:
1 | int main(void) { |
编译后烧录运行,结果如下:可以发现最终结果是服务期望的了
1 | start FreeRTOS |
运行的时候,可以发现一个问题,就是运行时间变长了。
这是因为三个任务都在频繁的竞争“钥匙”,耗费了大量时间。并且由于 任务a 获得“钥匙”后,如果内核切换到了 任务b,上面的代码中由于任务b 拿不到“钥匙”,一直在空转,直到内核再次切换任务。这也浪费了很多时间。
将 if (pdPASS == xSemaphoreTake(mutex, 0))
改为 if (pdPASS == xSemaphoreTake(mutex, pdMS_TO_TICKS(100)))
,可以提高运行速度。因为,设置了等待时间,那么任务拿不到“钥匙”时就会阻塞而不是一直空转,这样持有“钥匙”的任务就能尽快恢复运行,继而返还“钥匙”,让其它任务能获得“钥匙”并访问数据。
多线程(FreeRTOS中为多任务)的数据竞争问题(并发或并行访问共享数据),不管是在PC平台中,还是在嵌入式平台中,都是一个很复杂的主题,实际开发中会有很多涉及效率和数据安全的问题,例如:
- 临界区应该尽可能的小:在实际开发中,当使用互斥量实现临界区对数据访问过程进行保护时,应该让临界区尽可能地“小”(临界区内执行的耗时尽可能地短),这样可以减少多任务因为竞争“钥匙”造成的运行效率损失, 特别是在多核系统中,临界区的存在会使得多线程并行转成串行(执行临界区保护的那段数据访问过程)。所以,临界区越小越好。
- 尽量不使用递归互斥量:互斥量一旦被一个任务持有了,其它任务就不可能再获取到,只有持有互斥量的任务释放了互斥量,其它任务才能获取到。 并且,当任务a 已经持有互斥量b 时,任务a 不能再次去获取互斥量 b。因为互斥量b 已经被获取了,不管它是被谁获取的。
但是,可递归互斥量(有一套独立的API),允许任务a 再持有信号量b 的情况下,再次获取信号量b。但实际开发中,我们应该避免使用可递归互斥量,它会增加复杂性,语义也没有一般互斥量那么明确(任何时候只能被一个任务持有一次)。如果你需要使用可递归互斥量,首先应该考虑的是软件结构能否优化?尽量避免使用它。 - 小心死锁问题:这是个教科书里都会介绍的基本问题,但开发中仍旧很容易碰到,网上有很多针对死锁问题的资料。
- 中断处理函数需要访问共享数据怎么办?:前文提到,中断中不能使用互斥量(中断没有任务优先级的概念,而互斥量会和持有它的任务的优先级关联)。因此,涉及到中断服务中访问共享数据,可以创建一个额外的任务,使用信号量同步中断事件给该任务,并在该任务中执行访问共享数据的实际操作(等于将访问操作延后了,延后到中断退出后,在一个普通任务中再执行数据访问)。或者使用FreeRTOS提供的延后执行机制将。可以参考文章:FreeRTOS-延后执行机制
ps:需要注意文章代码中的日志输出函数,产品代码中如果需要使用的话,需要考虑线程安全性(多任务安全性),因为中断/任务切换可能发生在另一个任务正在输出日志但还未输出完的时候,这就可能造成日志错乱
FreeRTOS交流QQ群-663806972