网志

烧焦器,第3部分:具有双缓冲和旋转壁炉的裸机并发

杰森·萨克斯(Jason Sachs) 2020年7月25日

This 是 a short article about one technique for communicating between asynchronous 流程 on 裸机嵌入式系统.

问:为什么多线程鸡肉过马路?

答:到另一边。得到

— Jason Whittington

那里 are many reasons why 并发 管理— at least if you’re using an 具有共享可变状态的命令式编程语言. Using 信息传递 between 是 olated 流程 (exemplified by languages such as Go 和 Erlang )是处理并发的一种方法…但是您需要一个支持消息传递作为原始操作的系统以及支持它的资源。

在资源受限的嵌入式系统上运行的任何人运行消息传递的概念,他们可能会看着您,好像您有两个头脑。您’需要某种具有足够空间来处理未决消息的硬件或软件队列。这种方法在高端嵌入式系统或通用PC上效果更好,您可以在其中利用操作系统的消息传递功能,并且所需的开销很小。 运行裸机且具有数千字节(或更少!)RAM而不是兆字节或千兆字节的典型8位或16位系统不会’无处不在使用队列—至少并非没有精心设计。

所以让’s set the stage for one alternative, a bare-metal system that has two 流程 that need to communicate.

迅速 和 ky

让’s say we’重新使用具有以下功能的微控制器:

  • 标称数量的RAM(例如4096字节)
  • 单核处理器,其访问方式可以通过以下方式建模: 顺序一致性
  • 模数转换器(ADC),可以将其某些引脚上的一些信号数字化
  • 某种定时机制可以定期触发ADC,例如以10kHz触发
  • 当ADC完成对这些信号的数字化处理时可以触发的中断

这些都是很常见的。除了没有ADC的低端8位处理器之外,您’找到这样的微控制器将毫无困难。 (如果ADC的自动触发不’如果有可用的话,稍微不太理想的后备方法是在进入定时器中断服务程序后以软件手动触发它。)

可能更难理解和验证的唯一方面是顺序一致性。这是一个用于处理器核心的简单计算模型,该处理器核心一个接一个地执行一系列指令,其中读取存储器的每个指令都可以查看任何先前写入存储器的指令的结果。这不是唯一的 计算记忆模型 存在,但它’是规则,而不是简单的嵌入式系统中的例外。在另一端是今天’的台式机和服务器处理器,它们不仅具有多个内核使用多级内存缓存访问共享内存,而且还具有 乱序执行。在那种系统中,内存访问的顺序保证放宽了,只有在使用时才获得更严格的保证。 屏障或栅栏 确保一次计算的结果 发生在之前 从另一个读。

我们的微控制器的固件将使用C编程,并且不使用操作系统。我们有两个过程(我用术语“processes” in the abstract sense, 没有 t in the sense of an OS like Linux that has some number of 流程 trying to be scheduled.) that we’ll call 迅速ky:

  • ky starts executing from main(), 和 after some initialization steps, its only job 是 to repeat the following tasks in order:
    • 通过某种串行通信端口(CAN或UART)接受命令
    • 处理它们(可能需要与Speedy协调)
    • 通过串行通信端口将结果发送回
  • 迅速 只要有ADC采样可用,就在一个称为10kHz的中断服务程序中执行,执行以下任务:
    • 获取ADC样本的结果
    • performing just enough signal processing to relieve ky from the burden of these computations
    • exchanging information with ky
    • 在某些情况下,通过PWM信号或DAC改变微控制器的输出

这种Speedy / ky系统并非人为设计:’s something I’在过去的25年中,我们一直与之合作,并在许多电机驱动器,数字电源转换器和其他工业系统中得到广泛应用。

因为这是单核处理器,所以在发生中断时:

  • ky’的执行停止(这可能不会立即发生;对于诸如此类的事情,通常需要花费一些指令周期 管道 清除)
  • 程序计数器(PC)保存在硬件中
  • ISR开始运行,并保存可能需要更改的所有CPU寄存器
  • 迅速 executes
  • ISR将所有CPU寄存器恢复到其保存状态
  • the CPU jumps back to the saved PC 和 continues ky’s execution.

从Poky的角度来看,Speedy在Poky的指令之间瞬间即刻执行’的固件。除了一个例外,Poky必须假设Speedy’执行可以随时发生,通常在最不方便的指令之间执行。一个例外是Poky可以在代码的关键部分暂时禁用ADC中断,并且在禁用ADC中断的同时,迅速’的执行被推迟。这意味着Poky不会’不必担心在关键部分快速更改RAM。但这意味着Speedy可能无法完全以10kHz的频率运行,特别是如果Poky长时间禁用中断。

这里的挑战是如何让Speedy和Poky相互交流。如果我们定义了一种机制,包括共享状态以及一些有关如何使用它的规则,那么我们可以做到。

阻塞和非阻塞并发

现在,在使用共享内存的计算机上管理并发的最安全方法是使用精心设计的库原语,例如 互斥体 。互斥锁是阻止并发的示例,其中两个进程通过等待获取互斥锁来争用资源。进程仅在获取互斥量后才访问共享内存,并且一旦完成,便将其释放,以便另一个进程可以访问。该技术可以安全地序列化对共享内存的访问。

In the case of 迅速 和 ky, though, using a 互斥体 may 没有 t work: 迅速 是 an example of a 硬实时过程,其中某些内容必须经常执行,并带有一定的最大延迟。在迅速’s case, we haven’t指定必须严格管理中断处理程序的执行时间,以便它可以运行并完成其处理,但是我们’d希望它是每100微秒,也许这意味着我们可以容忍10或20μ中断开始前的等待时间。这是你的事’d必须逐案研究,看看迟到的后果。 (例如,在电动机驱动器中,在将ADC输入数字化和更新PWM输出之间有一定的时间,迟到会导致对控制环路的干扰,从而导致听得见的噪声,振动甚至失去控制。)快速不能容忍,但是,是无限期的延迟。在互斥锁上进行阻塞不是在ISR中应该执行的操作;即使您可以以某种方式保证最大延迟,’一个红旗。 ISR应该仅包含非阻塞代码。

此外,尽管在ISR中使用非阻塞代码是必要的,但这还不够。快速不仅可以’t get stuck; it 必须 完成其所需的任务。如果Speedy检测到由于Poky正在写东西而无法访问共享内存,则Speedy’任务失败,它所能做的就是设置某种标志,以指出它无法完成其处理,从而有可能危害所有将来计算的正确性。

那里 are a number of 不用等待 data structures that could be used in cases where concurrent 流程 need to access shared memory —用于链接列表,队列和哈希映射之类的东西。那些在这里过大了。您可能会在通用计算机上的多线程软件中看到它们。

We’将要利用Speedy / ky系统的一些特性’存在于通用计算机中,并描述了一些简单的东西, 双缓冲 我称之为旋转壁炉。

旋转壁炉

图片如下:

毛茸茸 史酷比 are in a haunted mansion. They are trying to solve a mystery with their friends, but in the meantime they are hungry 和 a bit cold, so they plunder sandwich fixings from the pantry 和 bring them to another room where there are some regal burgundy velvet chairs in front of a 壁炉. 那里 是 a warm, crackling fire going, 和 a small table next to the 壁炉, so they each begin fixing a sandwich on the table. These are soon piled high with all the food they have found.

史酷比听到微弱的how叫声,跳到毛茸茸的’的手臂。他们走进大厅去调查,但是不要’什么都看不到。一阵石刮的声音,就像一个沉重的地穴盖滑开的声音,当它们回来时,桌子上光秃秃的:三明治和所有食物都没了。

毛茸茸的和史酷比搜索房间,寻找他们的三明治,当他们的背转过身时,又有刮擦的声音,这一次,他们露出了眼角,发誓他们看到桌子移动了一会儿。这次,一个大毛绒黑熊站在桌子旁边,动物标本师’的杰作,举起手臂和爪子,瞪着它们。毛茸茸的笑着。他在熊面前挥舞着手’s face. “It’s only a dummy,”他说,敲熊’的头骨。眼睛在熊内移动’的头,发出嘶哑的声音。“Zoinks!”大吼大叫。他和史酷比跑出房间,穿过大厅,胳膊和腿在走动时异常摆动。

史酷比跳上楼梯栏杆并滑落,随后沙吉紧追在身,随后他们降落在楼梯底部的一堆垃圾中。弗雷德,费尔玛和达芙妮不赞成地看着他们。“你们两个去哪了” says Fred. “We’在寻找这个谜的线索,” says Velma.

“We found a h-h-h… a h-haunted 壁炉!” says 毛茸茸 .

“Raunted rireplace!” says Scooby.

Fred, Velma, 和 Daphne are skeptical, but 毛茸茸 和 Scooby lead the way back upstairs 和 into the room with the 壁炉, where their sandwiches are sitting quietly on the table, just the way they’d left them.

“But…但是这里有只熊!”毛茸茸的说。史酷比吃了两个三明治。

弗雷德,维尔玛和达芙妮不服气。

在剧集中的晚些时候,达芙妮从一个洗衣槽掉落到另一个房间,房间里摆放着富丽堂皇的蓝色天鹅绒椅子,放在同一个壁炉和桌子前。该团伙的其余成员设法找到达芙妮,以及一个秘密杠杆,使壁炉从一个房间到另一个房间串联旋转。事实证明,有一对伪造者只是在与傻子和史酷比开玩笑,这是一种恶作剧,尽管他们所有的伪造机器都安全地藏在了小镇的另一边的一家废弃糖果厂中,但该团伙抓住了它们并发现了伪造品。 20美元的钞票,这使地方检察官可能有理由发出搜查糖果工厂的手令。“如果没有的话,我们也将一事无成。’为您爱管闲事的孩子们!”一名造假者说,当警察戴上手铐时。

Hypothetically speaking, that 是 . I could have sworn there was an episode of 史酷比 with a pair of rotating 壁炉 (with the Harlem Globetrotters?). Unfortunately, there does 没有 t seem to be such a thing. I seem to have gotten this mixed up with 凯文船长与双子滑雪小屋的那集 或许是《印第安纳·琼斯》和《最后的十字军东征》中的火景:

旋转 什么 ?!

As a 并发 protocol, the revolving 壁炉 是 a poor-man’s messaging system.

一个真实的 信息传递 architecture, with 是 olation between transmit 和 receive 流程, requires these elements:

  • 传输端的临时存储,以构造一条消息
  • 接收端的临时存储,以接收消息
  • some entity acting as an intermediary between transmit 和 receive 流程 (for example, an operating system, or in cases of bare-metal systems, a module containing functions 和 data structures for facilitating 信息传递 )
  • 允许发送端知道是否可以发送消息的某种方法
  • 允许发送端请求中介从发送器发送消息的某种方法’的临时存储,并知道何时传输完成
  • 允许接收端知道消息是否可以接收的某种方法
  • 允许接收端请求中介者将下一个可用消息传递到接收者的某种方法’的临时存储,并知道接收完成的时间
  • 中间人在其发送和接收之间保存消息的存储容量

这给中介带来了任何并发的负担。发送器和接收器本身解耦,在那里’没有共享状态,不需要互斥或使它们合作。

In the case of 迅速 和 ky, we don’t need all that.

迅速和Poky各自需要某种工作空间来准备他们的信息’打算互相分享。使用共享内存而不是消息传递,Poky需要自由地将信息写入适当的位置,并且只有在完成时才向Speedy发信号以读取信息。我们的“fireplaces”共享内存的两个部分可以在Speedy和Poky之间交换;我们还需要一些同步机制来通知事件(“ready to publish” or “要求更改所有权”),以及一些适当合作的规则。

可以使用单个共享变量和操作它的操作来实现同步机制,只要这些操作具有三个重要的属性即可:

  • 访问共享变量是原子的
  • the shared variable 是 considered 易挥发的
  • 禁止通过处理器或编译器对共享变量的访问与其他键操作进行重新排序。

这里’是棘手的地方,因为不幸的是我们没有’从C语言标准中获得很多帮助,而这是普通的普通程序员易于理解的。

Atomic 和 易挥发的: Another Visit to the Basement

易挥发的

The 易挥发的 property just requires use of the 易挥发的 keyword in C, which tells the compiler 没有 t to make any optimization based on an assumption that the compiler knows 什么 是 contained in a variable. For example:

int16_t ultimate_answer(void)
{
    易挥发的 int16_t x = 6*9;
    x = 42;
    return x;
}

Without the 易挥发的 qualifier on the local variable x, the compiler could just have ultimate_answer() return 42 to the caller. Instead, the compiler 是 forced to store 54 into x, then store 42 into x, 和 then read the content of x before returning that value to the caller.

Furthermore, access to different 易挥发的 variables cannot be reordered (although the compiler can reorder 没有 n-易挥发的 variables with respect to 易挥发的 variables); the C standard’声明这的方法是 N1256草案第5.1.2.3节)

访问易失性对象,修改对象,修改文件或调用函数 这些操作中的任何一个都是副作用,即状态的变化 执行环境。评价表达可能会产生副作用。在 执行序列中的某些指定点称为序列点,所有副作用 以前的评估应完整,且以后的评估无副作用 应该发生了。 (序列点的摘要在附件C中给出。)

(最近的草稿,例如N1570,已经使水变得混乱,并以普通读者不太清楚的方式表达出来。)

I’ve mentioned 易挥发的 在上一篇文章中,因此,如果您想更深入地了解它,请阅读它或其中之一:

定购

只要计算的关键顺序由C确定’s rules on 易挥发的 variables, there’s 没有 additional synchronization needed. Memory barriers (e.g. asm 易挥发的 ("" : : : "memory"); as outlined in John Regehr’s post) may be needed to constrain 没有 n-易挥发的 variables from being reordered.

原子性

对于原子性,我们只需要确保在一个不可分割的操作中读取或写入共享变量即可;如果Poky需要两步来完成某件事,而Speedy在这两步之间执行,则Speedy可能会在其共享状态下看到无效的值。 (例如,设置32位指针的16位一半。)

大多数(如果不是全部的话)处理器可以确保内存加载或存储的机器字大小是原子的:例如,在16位系统上,您可以在一条指令中加载或存储16位值在内存中,并且中断可以’t pop up in the middle of a load 和 store. Similarly, sometimes there are 原子 instructions for setting or clearing bits, or even for loading/storing two words at once. (Read the fine print, though; on a dsPIC33 device, there 是 a MOV.D instruction that takes 2 cycles 和 是 没有 t interruptable, but because the device has a 16-bit bus, it 是 possible for DMA to sneak in between the cycles 和 ruin the 原子ity, at least with respect to DMA .)

如果编译器没有’如果没有保证可以映射到这些原子指令之一的内建函数或内在函数,那么您可能需要依靠内联汇编才能使用它们。

那里’较弱的方法,那就是信任编译器,但是在这种情况下,负担就在于 信任但要验证 编译器正在执行您认为正在执行的操作。这很危险。

如果你’很幸运拥有现代C或C ++编译器— where “modern”表示C11 / C ++ 11或更高版本—有一些功能由 std::atomic library in C++, or <stdatomic.h> in C that can guarantee all three of these behaviors from the compiler:

  • 值将以不可分割的操作读取或更新
  • 对值的访问将以顺序一致的顺序进行
  • 禁止编译器根据值的假设进行优化

不幸的是,我’我对C11 / C ++ 11原子不够熟悉,无法对它们的正确使用提供很好的建议;您可以查看下面的小节 投机建议. 如果你不这样做’没有现代化的C或C ++编译器,您’我将不得不看看您的C编译器是否具有可以保证原子操作的内建函数或内在函数。

投机建议

本节包含有关C语言中原子支持的有限建议,但在某些方面存在疑问。(警告!由于我的工作环境有限,因此我从未使用过这些建议,因此请稍加盐味,并自行进行尽职调查。)

如果你r C compiler supports C11 ’s <stdatomic.h> (compilers are allowed to define __STDC_NO_ATOMICS__ to say that they can’不要被这个东西困扰,你’re on your own), then you 应该 be able to use any of the base types such as 原子_int or 原子_bool, or qualify any type with _Atomic() such as _Atomic(uint32_t). 这里 you just use the variables the way you want 和 the compiler will guarantee there aren’t数据争用,尽管即使您的C代码未指定使用锁,也可能需要在基础实现中使用锁才能保证这一点。例如:

#include <stdint.h>
#include <stdatomic.h>

_Atomic(uint32_t) shared_var1;
atomic_uint shared_count = 0;

// thread 1
void store_var1(uint32_t val)
{
    shared_var1 = val;
    ++shared_count;
}

// thread 2
uint32_t replace_old_with_44_maybe(unsigned int maxcount)
{
    uint32_t oldval = shared_var1;
    /* Warning: the code below doesn't perform the if-statement  和  body 
     * in an 原子 manner; for that, you would need to use 
     * a critical section or  互斥体 .
     */
    if (shared_count <= maxcount) 
    {
        shared_var1 = 44;
    }
    return oldval;
}

The accesses to shared_var1 might turn into a function call that allows only the currently-running thread to read or write it.

另外,您可以使用类似 原子_load原子_store 跟...共事“regular”原子方式的变量:

#include <stdint.h>
#include <stdatomic.h>

uint32_t shared_var1;
unsigned int shared_count = 0;

// thread 1
void store_var1(uint32_t val)
{
    原子_store(&shared_var1, val);
    原子_fetch_add(&shared_count, 1);  
    // 原子 version of ++shared_count
}

// thread 2
uint32_t replace_old_with_44_maybe(unsigned int maxcount)
{
    uint32_t oldval = 原子_load(&shared_var1);
    /* Warning: the code below doesn't perform the if-statement  和  body 
     * in an 原子 manner; for that, you would need to use 
     * a critical section or  互斥体 .
     */
    if (atomic_load(&shared_count) <= maxcount) 
    {
        原子_store(&shared_var1, 44);
    }
    return oldval;
}

仅通过这些功能访问共享变量之间才能保证顺序一致性。如果一个线程直接从这些变量之一读取或写入,则所有选择均关闭。

如果你r compiler does 没有 t support C11 or does 没有 t provide <stdatomic.h> support, then some compilers provide intrinsic substitutes. For example, GCC 4.7.0 (and later) has __atomic_load_n__atomic_store_n:

#include <stdint.h>

uint32_t shared_var1;
unsigned int shared_count = 0;

// thread 1
void store_var1(uint32_t val)
{
    __atomic_store_n(&shared_var1, val, __ATOMIC_SEQ_CST);
    __atomic_fetch_add(&shared_count, 1, __ATOMIC_SEQ_CST);  
    // 原子 version of ++shared_count
}

// thread 2
uint32_t replace_old_with_44_maybe(unsigned int maxcount)
{
    uint32_t oldval = __atomic_load_n(&shared_var1, __ATOMIC_SEQ_CST);
    /* Warning: the code below doesn't perform the if-statement  和  body 
     * in an 原子 manner; for that, you would need to use
     * a critical section or  互斥体 .
     */
    if (__atomic_load_n(&shared_count, __ATOMIC_SEQ_CST) <= maxcount) 
    {
        __atomic_store_n(&shared_var1, 44, __ATOMIC_SEQ_CST);
    }
    return oldval;
}

Clang appears to provide builtins like __c11_atomic_store__c11_atomic_load 但它’s unclear if there’在任何情况下,都应使用此功能代替标准的C11功能。

至于在具有顺序一致的内存访问的典型低端单核处理器上,什么样的代码实际上在后台运行,我们可以看一下 MSP430上的编译器资源管理器哪里<stdatomic.h> 是 supported. I wrote a handful of short functions with _a_ (atomic) 和 _v_ (volatile) variants to perform loads 和 stores of bool, uint16_t, 和 uint32_t values, along with a 16-bit increment:

#include <stdatomic.h>
#include <stdint.h>
#include <stdbool.h>

typedef _Atomic(uint16_t) 原子_uint16_t; 
typedef _Atomic(uint32_t) 原子_uint32_t;

void store_v_b(volatile bool *pshared, bool b) {
    *pshared = b;
}

void store_v_16(volatile uint16_t *pshared, uint16_t x) {
    *pshared = x;
}

void store_v_32(volatile uint32_t *pshared, uint32_t x) {
    *pshared = x;
}

void store_a_b(atomic_bool *pshared, bool b) {
    *pshared = b;
}

void store_a_16(atomic_uint16_t *pshared, uint16_t x) {
    *pshared = x;
}

void store_a_32(atomic_uint32_t *pshared, uint32_t x) {
    *pshared = x;
}

bool load_v_b(volatile bool *pshared) {
    return *pshared;
} 

uint16_t load_v_16(volatile uint16_t *pshared) {
    return *pshared;
}

uint16_t load_v_32(volatile uint32_t *pshared)
{
    return *pshared;
}

bool load_a_b(atomic_bool *pshared)
{
    return *pshared;
}

uint16_t load_a_16(atomic_uint16_t *pshared)
{
    return *pshared;
}

uint16_t load_a_32(atomic_uint32_t *pshared)
{
    return *pshared;
}

void inc_a_16(atomic_uint16_t *pshared)
{
    原子_fetch_add(pshared, 1);
}

void inc_v_16(volatile uint16_t *pshared)
{
    ++*pshared;
}

With MSP430 gcc 6.2.1 -O2, this compiles to

store_v_b:
        MOV.B   R13, @R12
        RET
store_v_16:
        MOV.W   R13, @R12
        RET
store_v_32:
        MOV.W   R13, @R12
        MOV.W   R14, 2(R12)
        RET
store_a_b:
        AND     #0xff, R13
        MOV.B   R13, @R12
        RET
store_a_16:
        MOV.W   R13, @R12
        RET
store_a_32:
        MOV.B   #5, R15
        CALL    #__atomic_store_4
        RET
load_v_b:
        MOV.B   @R12, R12
        RET
load_v_16:
        MOV.W   @R12, R12
        RET
load_v_32:
        MOV.W   2(R12), R13
        MOV.W   @R12, R12
        RET
load_a_b:
        MOV.B   @R12, R12
        RET
load_a_16:
        MOV.W   @R12, R12
        RET
load_a_32:
        MOV.B   #5, R13
        CALL    #__atomic_load_4
        RET
inc_a_16:
        MOV.B   #5, R14
        MOV.B   #1, R13
        CALL    #__atomic_fetch_add_2
        RET
inc_v_16:
        ADD.W   #1, @R12
        RET

您’ll 没有 te that in most cases these are trivial, lightweight implementations, even for the _Atomic variants. The exceptions are:

  • 32-bit 原子 load
  • 32-bit 原子 store
  • 16-bit 原子 increment

在这些情况下,编译器将调用一个库函数,该函数可能会禁用中断足够长的时间以使其能够运行一些关键指令。 (任何人都对MSP430编译器足够熟悉,可以说出什么’这些里面?)32位加载和存储是有意义的,但是我’我对16位的增量很感兴趣— you can see there 是 a single instruction ADD.W #1, @R12 that works in the 易挥发的 case. Either there 是 something about the CPU execution or memory models on the MSP430 that makes this insufficient for the guarantees needed by C11 原子s, or the compiler writers haven’t finished tweaking the compiler to reduce the 16-bit fetch_add case to the ADD.W instruction.

DIY 原子s with extended assembly

如果你不这样做’t have access to <stdatomic.h> or an appropriate builtin, 和 you REALLY want to try implementing 和 testing your own 原子 functions, you might try using extended assembly. 这里’一个例子,用 pyxc16:

import pyxc16

src = r'''
#include <stdint.h>
#include <stdbool.h>

#define DECLARE_ATOMIC_READWRITE(T, M) \
inline static void _util_atomic_##T##_write(volatile T *location, T value) \
{ \
    /* 原子 equivalent of *location = value; */                \
    asm 易挥发的(";! BEGIN _util_atomic_" #T "_write\n"          \
                 "\t" M " %[val], [%[loc]]\n"                    \
                 "\t;! END   _util_atomic_" #T "_write"          \
                 : /*  没有  outputs */                              \
                 : [val] "r" (value), [loc] "r" (location));     \
} \
\
inline static uint16_t _util_atomic_##T##_read(volatile uint16_t *location) \
{ \
    /* 原子 equivalent of return *location; */                 \
    uint16_t result;                                             \
    asm 易挥发的(";! BEGIN _util_atomic_" #T "_read\n"           \
                 "\t" M " [%[loc]], %[result]\n"                 \
                 "\t;! END   _util_atomic_" #T "_read"           \
                 : [result] "=r" (result)                        \
                 : [loc] "r" (location));                        \
    return result;                                               \
}

typedef 易挥发的 void *voidptr;
DECLARE_ATOMIC_READWRITE(uint16_t, "mov")
DECLARE_ATOMIC_READWRITE(voidptr, "mov")
DECLARE_ATOMIC_READWRITE(bool, "mov.b")

// just using 易挥发的
uint16_t test0(volatile uint16_t *px, uint16_t v, 
               易挥发的 voidptr *pv, 
               易挥发的 bool *pb)
{
    *px = v;
    *px = ++v;
    *++px = v+37;
    *pv = px;
    uint16_t y = *px;
    *pb = y > 100;
    
    return y;
}

uint16_t test1(uint16_t *px, uint16_t v, 
               voidptr *pv, 
               bool *pb)
{
    _util_atomic_uint16_t_write(px, v);
    _util_atomic_uint16_t_write(px, ++v);
    _util_atomic_uint16_t_write(++px, v+37);
    _util_atomic_voidptr_write(pv, px);
    uint16_t y = _util_atomic_uint16_t_read(px);
    _util_atomic_bool_write(pb, y > 100);
    
    return y;
}
'''
pyxc16.compile(src, '-O2', comment_filter=';!')
_test0:
	mov	w1,[w0]
	inc	w1,[w0]
	mov	#38,w4
	add	w1,w4,[++w0]
	mov	w0,[w2]
	mov	[w0],w0
	mov.b	#1,w1
	mov	#100,w2
	sub	w0,w2,[w15]
	bra	gtu,.L2
	clr.b	w1
.L2:
	mov.b	w1,[w3]
	return
_test1:
	;!	BEGIN _util_atomic_uint16_t_write
	mov	w1, [w0]
	;!	END   _util_atomic_uint16_t_write
	inc	w1,w1
	;!	BEGIN _util_atomic_uint16_t_write
	mov	w1, [w0]
	;!	END   _util_atomic_uint16_t_write
	inc2	w0,w0
	add	#37,w1
	;!	BEGIN _util_atomic_uint16_t_write
	mov	w1, [w0]
	;!	END   _util_atomic_uint16_t_write
	;!	BEGIN _util_atomic_voidptr_write
	mov	w0, [w2]
	;!	END   _util_atomic_voidptr_write
	;!	BEGIN _util_atomic_uint16_t_read
	mov	[w0], w0
	;!	END   _util_atomic_uint16_t_read
	mov.b	#1,w1
	mov	#100,w2
	sub	w0,w2,[w15]
	bra	gtu,.L6
	clr.b	w1
.L6:
	;!	BEGIN _util_atomic_bool_write
	mov.b	w1, [w3]
	;!	END   _util_atomic_bool_write
	return

The DECLARE_ATOMIC_READWRITE macro takes in a typename T 和 a move instruction M, 和 generates functions that utilize inline assembly with a single line of implementation (surrounded by BEGIN 和 END comments to tell where it comes from). The single line 是 the key — if you can implement a load or store on a dsPIC with a single MOV.B or MOV or MOV.D then it 是 uninterruptable, 和 does 没有 t suffer from any read-modify-write problems. (although again: MOV.D 是 a 2-cycle instruction due to the use of a 16-bit data bus; it’不会中断,但是DMA事务可能会在总线访问之间潜入。)

(这里 ’s why read-modify-write 是 a tricky 是 sue. The dsPIC33 architecture has pipelined execution where each instruction 是 handled with a fetch 和 execute phase. We have to be sure that an interrupt cannot sneak in between those fetch 和 execute stages for something like an INC (increment) instruction in indirect mode where contents of memory, specified by a pointer, are incremented, since it accesses memory both on the fetch 和 execute stages of the pipeline. The manuals on the dsPIC33 cores are mostly clear on 中断处理,即允许所有指令在中断发生之前完成。读取-修改-写入行为可能导致 停滞在指令管道中 为了防止下一条指令获取旧数据,但这是不间断的。如果您使用的是其他架构,则需要非常仔细地了解它如何执行代码。)

无论如何,你’ll 没有 te that the assembly generated for test0()test1() are almost the same — in test0() the compiler can save 2 instructions because it can optimize some of the generated code, whereas in test1() it 是 restricted to plunking down the inline assembly of the __util_atomic_uint16_t* functions as black boxes it cannot change. If I weren’t picky, I’d just use test0() because the compiler 应该 更好地了解并将16位分配视为原子操作。但是那里’至少在C标准方面,编译器不这样做。

仅重申一点:C11 和C ++ 11 原子<> 应该可以帮助您编写正确的程序。 如果可以使用可以为编译器提供更好,更安全的保证的东西,则不建议使用DIY方法。

如果你 really want to delve into the dark corners of memory models 和 the C11 /C++11 原子s, check out 香草萨特’s talks on “atomic<> Weapons” 进入多核处理器的行为 获取/释放语义,以及你如何’应该使用C11或C ++ 11原子编写C代码。

反正’是地下室里的单词。 Yeccch,我感觉有些不舒服。如果您需要有关这些主题的建议,请咨询熟悉C标准的专业人员。

旋转壁炉,示例1

OK,现在到一些实际代码!这里’s one example of a revolving 壁炉 implementation. We’这将使Speedy累积ADC样本的总和,以获得电流和电压。 ky将读取累积的总和并加以利用,并在需要时通过通信端口进行传输。

在此示例中,协议如下:

  • At any given time outside of a switch operation, ky 和 迅速 each have access to their own 壁炉, 和 are 没有 t allowed to access the contents of the other 壁炉.
  • Only ky 是 allowed to switch the 壁炉.
  • 迅速 必须 leave its 壁炉 contents in a 有效 state after each ISR completion.
/* 壁炉.h */

#include <stdint.h>
#include <stdbool.h>

typedef struct {
    int32_t voltage_sum;
    int32_t current_sum;
    uint16_t count;
} FIREPLACE;

typedef struct {
    FIREPLACE 壁炉[2];
    struct {
        易挥发的 FIREPLACE *speedy;
        易挥发的 FIREPLACE *poky;
    } access;
} RAW_SHARED_MEMORY; // w/o 易挥发的 -- do  没有 t use directly

typedef 易挥发的 RAW_SHARED_MEMORY SHARED_MEMORY;

inline static void 壁炉_init(volatile FIREPLACE *fp)
{
    fp->voltage_sum = 0;
    fp->current_sum = 0;
    fp->count = 0;
}

inline static void 壁炉_switch(SHARED_MEMORY *shmem)
{
    易挥发的 FIREPLACE *tmp = shmem->access.poky;
    shmem->access.poky = shmem->access.speedy;
    shmem->access.speedy = tmp;
}
/* poky.h */
typedef struct {
    int64_t voltage_sum;
    int64_t current_sum;
    uint32_t count;
} POKY_STATE;

inline static void poky_sum_init(POKY_STATE *pstate)
{
    pstate->voltage_sum = 0;
    pstate->current_sum = 0;
    pstate->count = 0;
}
/* poky.c */

#include "fireplace.h"
#include "poky.h"

void poky_init(POKY_STATE *pstate, SHARED_MEMORY *shmem)
{
    poky_sum_init(pstate);
    壁炉_init(&shmem->fireplaces[0]);
    壁炉_init(&shmem->fireplaces[1]);
    shmem->access.poky   = &shmem->fireplaces[0];
    shmem->access.speedy = &shmem->fireplaces[1];
}

void poky_step(POKY_STATE *pstate, SHARED_MEMORY *shmem)
{
    /* called during the main loop */
    壁炉_switch(shmem);
    易挥发的 FIREPLACE *mine = shmem->access.poky;
    if (mine->count > 0)
    {
        // 迅速 has accumulated samples! 
        // 让's accumulate that into an overall sum.
        pstate->voltage_sum += mine->voltage_sum;
        pstate->current_sum += mine->current_sum;
        pstate->count += mine->count;

        // Now zero out the accumulators 
        // so we can switch 壁炉 next time.
        壁炉_init(mine);
    }

    // If we get a message asking for the sums, send them  和  zero them out.
    if (should_we_transmit_sums())
    {
        transmit_sums(pstate->voltage_sum, pstate->current_sum, pstate->count);
        poky_sum_init(pstate);
    }
}
/* speedy.c */

#include "fireplace.h"

void speedy_step(SHARED_MEMORY *shmem)
{
    // read ADC
    int16_t current = read_adc(ADC_CURRENT);
    int16_t voltage = read_adc(ADC_VOLTAGE);

    易挥发的 FIREPLACE *mine = shmem->access.speedy;
    mine->voltage_sum += voltage;
    mine->current_sum += current;
    ++mine->count;
}

那里! We have our 壁炉 protocol. 迅速 doesn’t care, he just dumps ADC samples into accumulators in his 壁炉, because 迅速 can never be interrupted by ky.

ky可以被Speedy打断,但是因为Speedy不’t allowed to switch the 壁炉, it 是 safe for ky to read or write from ky’s own 壁炉.

Note that the 壁炉_switch function 是 n’t 原子 — 迅速 can interrupt between the read 和 writes to shmem->access — but this doesn’t matter; 迅速 always gets a consistent 壁炉 to use, 和 ky only accesses the 壁炉 before or after the switch.

The only other requirement 是 that ky 必须 switch the 壁炉 before the variables overflow, in other words, at least once every 65536 samples.

除此之外,Poky现在可以确保它具有每个ADC读数总数的汇总统计信息,而不会丢失任何样本或重复计算,即使

  • 它没有’t以ADC速率运行
  • 它没有’t run periodically
  • 迅速可以在任何地方打断它
  • 它可能在两次Speedy执行之间运行了好几次(有时主循环几乎无所事事,并且重复很快!)

旋转壁炉,示例2

这次,我们’ll have 迅速 switch the 壁炉 on a request from ky. The differences here are:

  • 迅速 没有 longer has to keep its 壁炉 content ready for immediate use by ky; instead, it can provide its data into the 壁炉 on demand when it’s time to switch
  • After requesting a switch, ky cannot access the 壁炉 until 迅速 completes the switch. (Once the request 是 made, 迅速 effectively owns both 壁炉.)
/* 壁炉.h */

#include <stdint.h>
#include <stdbool.h>

typedef struct {
    int32_t voltage_sum;
    int32_t current_sum;
    uint16_t count;
} FIREPLACE;

typedef struct {
    FIREPLACE 壁炉[2];
    struct {
        易挥发的 FIREPLACE *speedy;
        易挥发的 FIREPLACE *poky;
    } access;

    bool switch_request;  
    // only ky  是  allowed to set, 
    // only 迅速  是  allowed to clear
} RAW_SHARED_MEMORY; // w/o 易挥发的 -- do  没有 t use directly

typedef 易挥发的 RAW_SHARED_MEMORY SHARED_MEMORY;

inline static void 壁炉_init(volatile FIREPLACE *fp)
{
    fp->voltage_sum = 0;
    fp->current_sum = 0;
    fp->count = 0;
}

inline static void 壁炉_switch(SHARED_MEMORY *shmem)
{
    易挥发的 FIREPLACE *tmp = shmem->access.poky;
    shmem->access.poky = shmem->access.speedy;
    shmem->access.speedy = tmp;
}
/* poky.h */
typedef struct {
    int64_t voltage_sum;
    int64_t current_sum;
    uint32_t count;
} POKY_STATE;

inline static void poky_sum_init(POKY_STATE *pstate)
{
    pstate->voltage_sum = 0;
    pstate->current_sum = 0;
    pstate->count = 0;
}
/* poky.c */

#include "fireplace.h"
#include "poky.h"

void poky_init(POKY_STATE *pstate, SHARED_MEMORY *shmem)
{
    poky_sum_init(pstate);
    壁炉_init(&shmem->fireplaces[0]);
    壁炉_init(&shmem->fireplaces[1]);
    shmem->access.poky   = &shmem->fireplaces[0];
    shmem->access.speedy = &shmem->fireplaces[1];
    shmem->switch_request = false;
}

void poky_step(POKY_STATE *pstate, SHARED_MEMORY *shmem)
{
    /* called during the main loop */

    // Skip if 迅速 hasn't processed the switch request
    if (!shmem->switch_request)
    {
        易挥发的 FIREPLACE *mine = shmem->access.poky;
        if (mine->count > 0)
        {
            // 迅速 has accumulated samples! 
            // 让's accumulate that into an overall sum.
            pstate->voltage_sum += mine->voltage_sum;
            pstate->current_sum += mine->current_sum;
            pstate->count += mine->count;

            // Now zero out the accumulators 
            // so we can switch 壁炉 next time.
            壁炉_init(mine);
        }
        shmem->switch_request = true;
    }

    // If we get a message asking for the sums, send them  和  zero them out.
    if (should_we_transmit_sums())
    {
        transmit_sums(pstate->voltage_sum, pstate->current_sum, pstate->count);
        poky_sum_init(pstate);
    }
}
/* speedy.c */

#include "fireplace.h"

typedef struct {
    FIREPLACE private_fireplace;
} SPEEDY_STATE;

void speedy_init(SPEEDY_STATE *pstate)
{
    壁炉_init(&pstate->private_fireplace);
}

void speedy_step(SPEEDY_STATE *pstate, SHARED_MEMORY *shmem)
{
    // read ADC
    int16_t current = read_adc(ADC_CURRENT);
    int16_t voltage = read_adc(ADC_VOLTAGE);

    FIREPLACE *my_own = &pstate->private_fireplace;
    my_own->voltage_sum += voltage;
    my_own->current_sum += current;
    ++my_own->count;

    if (shmem->switch_request)
    {
        // Time to switch 壁炉! Put latest stats in the 壁炉
        易挥发的 FIREPLACE *mine = shmem->access.speedy;
        mine->voltage_sum = my_own->voltage_sum;
        mine->current_sum = my_own->current_sum;
        mine->count = my_own->count;
        壁炉_switch(shmem);
        shmem->switch_request = false;

        壁炉_init(my_own);
    }
}

With this method, 迅速 can do the accumulations in its own private 壁炉 that 是 n’t declared 易挥发的. When it’s time to switch 壁炉, that’s when 迅速 copies its accumulated statistics from its private 壁炉 into the shared 壁炉, 和 triggers the switch, clearing the switch_request flag.

From an execution time tradeoff, Example 2 may allow the compiler to achieve a slightly quicker ISR outside of 壁炉 switching (because it can optimize access to 迅速’s private 壁炉, which 是 n’t 易挥发的), but slower worst-case when switching 是 necessary (because it has to copy the data). Example 1 has consistent ISR timing 但它 may be slower on average (when switching 是 没有 t necessary).

完整的例子

我已经以 我的Github帐户上的MPLAB X项目. These also include a primitive testing facility where 迅速 executes from a timer interrupt 和 ky executes from the main loop, running some number iter_count_end of iterations which default to 500. Instead of using the real ADC, I provided a source of mock ADC readings from a 线性同余生成器 for a reproducible series of samples that I could repeat sequentially to check the correctness of the results. I ran both examples in the MPLAB X simulator with iter_count_end = 16384 和 got correct sums in both cases. Does this prove correctness? Absolutely 没有 t, but if I had found a bug it would have given me a chance to troubleshoot.

其他注意事项

唐’不要过度使用共享内存!

无论你’要使用这些方法之一或其他方法,请尽量减少共享内存中由多个线程访问的程序状态的数量。它’编写正确的程序要容易得多’不必担心共享变量。这不仅适用于您,还适用于编译器,当编译器可以假设只有一个线程访问内存区域时,它可以自由地创建优化的程序实现。所以如果你’即使在简单的Speedy / ky系统中,也可以使用线程之间的共享内存,并且您’re using 易挥发的 or _Atomic or some kind of synchronization mechanism, don’只是丢掉你所有的程序’s state into the 并发 bucket 和 use 易挥发的 or _Atomic everywhere. That’s overkill 和 you’可能会遭受不必要的性能损失。

单向与双向

这些示例说明了单向数据流:Speedy正在向Poky提供信息。那里’s 没有 reason it can’t be bidirectional —例如,Poky向Speedy发送电压命令(对于可变DC / DC转换器),而Speedy发送Poky电压和电流反馈。

In the bidirectional case, if you are really stingy about memory use, the 壁炉 can contain a union of the unidirectional data used, for example:

typedef struct {
    union {
        struct {
            int16_t voltage_command;
        } to_speedy;
        struct {
            int16_t voltage_feedback;
            int16_t current_feedback;
        } to_poky;
    } u;

    /* things that aren't unidirectional go here */
} FIREPLACE;

so that ky only reads from mine->u.to_poky 和 only writes to mine->u.to_speedy, whereas 迅速 only reads from mine->u.to_speedy 和 only writes to mine->u.to_poky. This adds a couple of requirements, so it’在大多数情况下,这可能不是一个好的解决方案,尤其是在仅交换几个字节的数据的情况下:

  • neither process can rely on the 壁炉 as a storage location for a persistent state variable —相反,它只能用于通讯
  • before switching 壁炉, one of the following conditions has to be true
    • the contents of both 壁炉 必须 be 有效 (which means each process has to update the content)
    • each 壁炉 contains a “valid” flag that indicates whether the data in the 壁炉 是 有效 or 没有 t, 和 both 流程 必须 set or clear the 有效 flag

原子度要求

我之前提到原子性。这两个示例中的原子性要求非常小:

  • 迅速’s code has 没有 原子ity requirements, since it can never be interrupted by ky.
  • ky’s代码仅对用于同步的数据具有原子性要求:
  • in example 1, this 是 the assignment to the pointer shmem->access.speedy in 壁炉_switch()
  • in example 2, this 是 reading 和 writing of the flag shmem->switch_request (it’s 硬 to imagine a bool read or write being 没有 n-atomic, but I try to make 没有 assumptions here)

Testing (Ay, 那里’s the Rub)

Concurrency mechanisms are extremely 硬 to test because of their 没有 ndeterministic behavior. 那里 might be one tiny error that 是 only encountered when 流程 A 和 B are executing their instructions in a very specific order relative to each other.

我将提出一个想法作为促进测试的一种方式。

The 壁炉 could contain a guard variable, for example:

  • ky sets the guard to 0xDEAD just before it starts writing to data in its 壁炉, 和 sets it back to 0x0000 just before it switches 壁炉 (or signals a 壁炉 switch request). This sets up an invariant that 迅速 应该 always see 0x0000 in its 壁炉 和 never anything else, 和 that the guard 是 set to 0xDEAD when ky 是 writing to ky’s side of the 壁炉.
  • 快速应检查警卫;如果看到非零值,则表示存在错误,应采取适当的措施。 (在偏执的情况下,触发系统范围的错误;出于调查目的,增加一个可以报告给开发人员以进行进一步调查的计数器)

Isn’这和双缓冲一样吗?

双缓冲是计算机图形学中的经典技术 两个进程共享一对缓冲区以更新显示。缓冲器代表显示像素的网格。一个过程负责将形状绘制到一个缓冲区上。另一个过程显示其他缓冲区的内容。这样,绘制过程就可以花一些时间,直到缓冲区完成并准备好移交给显示过程为止。为了更新显示,绘图过程将切换缓冲区。结果,您可以减少显示闪烁的机会—特别是如果使用类似 页面交换 .

双缓冲的另一个用途是 DMA ,有时也称为乒乓缓冲区,其中一些硬件过程(如ADC)用于用一系列样本逐渐填充两个缓冲区中的一个。当缓冲区已满时,将交换缓冲区,ADC会继续填充另一个缓冲区,而其他软件将处理第一个缓冲区的内容。

Structurally, the revolving 壁炉 technique 是 a form of 双缓冲, 但它 是 explicitly aimed at 并发 和 the 壁炉 are 没有 t really “buffers”由元素的均质数组组成,就像我刚才概述的用于双缓冲的两种情况一样。

为什么这比使用同步标志保护共享内存更好?

旋转 壁炉 allows each side to work on its data without worry of corruption or invalid data transfer; switching 壁炉 是 where the exchange occurs.

假设您要使用共享内存和一个非阻塞易失事件标志READY来将电压命令从Poky传输到Speedy:

  • For ky to update the command:
    • ky sets READY to FALSE
    • ky writes a new voltage command to shared memory
    • ky sets READY to TRUE
  • 为了让Speedy能够执行以下命令: 。快速检查
    • 如果READY为TRUE,则Speedy将命令复制到Speedy’s own internal state
    • 如果READY为FALSE,Speedy将忽略该命令,并依赖于其内部状态的最后一个副本

这在理论上是可行的,但在最坏的时序情况下,Speedy可能会反复看到READY = FALSE,并被迫处理陈旧的命令。

另一个方向需要类似的内容:

  • For ky to read feedback from shared memory:
    • ky sets READY to FALSE
    • ky reads 什么 it needs from shared memory
    • ky sets READY to TRUE
  • 为了让Speedy更新其反馈: 。快速检查
    • 如果READY为TRUE,Speedy将从其内部状态更新共享内存
    • 如果READY为FALSE,Speedy会默默地流下眼泪,因为它可以’不能做任何事情,必须等到下一个ISR再试一次

Again, in the worst case for timing, 迅速 might repeatedly see READY = FALSE, leaving ky with very stale data

旋转 壁炉 是 olates the sender 和 receiver so that they are 没有 t forced to share a single area of storage.

免责声明

最后,重要的免责声明:

并发很难。您可以自由使用这些机制,但是我没有提供铁定的证据证明它们可以免受竞争条件或其他并发危害的影响。我也不保证此处的示例代码没有错误。我在实际系统上使用过这种类型的机制,但是它们是专有的,我不得不在这里解释这些想法,而不是发布专有代码。

关于内存模型和并发问题,我对C标准和微控制器的保证和挑战的表述并不具有权威性。在这里报告这些细微之处之前,我已经尽力了调查这些细微之处。如果我有任何错误或误导性陈述,请引起我注意–我深表歉意,并将尽其所能纠正。

在将新的并发机制合并到您自己的系统中之前,请执行您自己的尽职调查。 (或唐’t,并依靠RTOS的机制来满足您的需求。)

包起来

我们了解了嵌入式控制系统中的一种常见模式,以定期执行的中断服务例程为例— aka 迅速 — 和 a main loop — aka ky —需要交换数据。 迅速可以打断Poky,但Poky永远不会打断Speedy。

我们探索了一种称为“旋转壁炉”的双缓冲变体,它允许Speedy和Poky之间的通信通过允许它们分别从单独的内存部分进行读取或写入(“fireplace”), with the exchange of data occurring when access to the memory sections are switched, typically by swapping pointers. This lets us focus our 并发 efforts on the switching mechanism; either 迅速 can switch the 壁炉 directly, or can raise a flag to signal that ky can switch the 壁炉.

我们谈到了支持旋转壁炉所需的一些并发要求:

  • 可通过顺序一致性建模的微控制器(无乱序执行)
  • the use of 易挥发的 to restrict the compiler from optimizing out reads or writes, 和 from reordering with respect to other reads 和 writes of 易挥发的 variables
  • the need for 原子 updates in the 壁炉-switching mechanism

我们深入研究了C11的地下室’s <stdatomic.h> 和 talked about alternatives for those of us who don’t have access to a C11 compiler. (Note again: if you can use the <stdatomic.h> or C++11’s std::atomic<>, it’可能值得这样做—编译器可以保证某些正确性,这些保证比手动替代方法更强大和更简单。)

I’d很想听听您在资源有限的嵌入式系统中使用此或其他并发机制的经验!

致谢

谢谢 马修·埃斯莱曼 和约翰·佩森(John Payson)的意见和建议。


©2020 Jason M. Sachs,保留所有权利。


杰森·萨克斯(Jason Sachs)上一篇文章:
   公差分析
杰森·萨克斯(Jason Sachs)下一篇文章:
   用光绘画以测量时间

要发布对评论的回复,请单击每个评论所附的“回复”按钮。要发布新评论(而不是回复评论),请查看评论顶部的“写评论”标签。

注册后,您可以参加所有相关网站上的论坛,并获得所有pdf下载的访问权限。

注册

我同意 使用条款 隐私政策 .

试试我们偶尔但很受欢迎的时事通讯。非常容易退订。
或登录