进程间通信

v2-3b32471e938dae81bfd4d93bf9add3d7_1200x500

在本系统中发现了两个BUG:两个BUG · Issue #3 · bajdcc/MiniOS,限于自己水平比较渣,没法解决这两个BUG,那么OS系列就告一段落了,纵观整个过程,还是对IPC机制的理解帮助比较大,这种思想可以运用于实践中,如管道、套接字、流的实现。

---------------------------------------------------------------------------

有童鞋问如何从零开始学习代码,我写了点总结:

首先要了解操作系统的知识,从零开始指的是代码从零开始,所以相关知识的基础也是不可或缺的。要了解系统的启动过程,Zyw_OS启动流程跟实现~未完待续 - 知乎专栏 。从启动过程来分析代码,首先是BIOS。

(1)然后是boot文件夹下的三个asm文件,它们完成读软盘、设置GDT、加载内核数据至0x100000,并跳转到0x100000处执行内核代码

(2)随后根据main.c中初始化的顺序学习:(I)vga显示输出、(II)gdt分段管理、(III)idt中断向量设置、(IV)isr中断处理程序、(V)pmm获取可用内存并按4KB记录并保存至栈中,完成物理页框的申请与释放、(VI)vmm分页管理、(VII)sys初始化系统调用函数、(VIII)proc多进程管理,进程切换。

(3)从零开始添加功能,如先写好boot.asm,能够开机并用中断打印一个A,没问题之后,再设置gdt,vga以及中断管理,这时运行一个int 3看看有没有打印结果出来;这样每次写的代码都能够独立运行。如果是把所有代码下载下来运行的话,效果很不错,但是会不知道从读起。现在要做的就是精简代码,只保留核心功能,摒弃一切杂念。

(4)那么写代码过程中有几个情况要处理:(I)不明所以,这时可以看注释或是上网查资料(II)这里代码是不是有问题?先放着不管(III)运行出错了,那就debug吧。其中debug的时间比较久,我做到IPC这一部分起码编程时间100小时以上,不包括日常吃饭时想的时间,所以不要慌,操作系统代码需要细嚼慢咽,急也急不得。debug可能用时比较久,这时比较纠结、想放弃、头脑混乱,因为好多bug真不知道哪冒出来的,用qemu源码级调试好处也有限。其实这一关过不了,对操作系统的理解水平也就触到天花板了,也就是只是理解了书上的思想,而没有将代码和思想结合起来。写代码、山寨别人的代码、东拼西凑,不管用什么方式,只要把bug除了就皆大欢喜。

(5)这样做有好处:代码、思路,所有的细节全部load到了脑子里,要啥有啥,也就是真正理解了内核,可以举一反三,并自己更改代码,添加功能

(6)要有毅力、有恒心,能吃苦,成功不是一蹴而就,别看我写的代码运行效果挺好,我起码debug了100h以上,每天打底调试6小时,最后才能bug弄好。真正自己花时间花工夫写的代码,才会长久的留自己的脑海里。

--------------------------------------------------------------------

写在前面

Release:bajdcc/MiniOS

总结下当前的进度:

  1. 引导、GDT、中断、虚页:花了两三天
  2. 多进程:花了一周
  3. 进程间通信:花了两三天

90%的时间花在了debug上,除完所有bug,已经对整个实现机制了如指掌。所以,debug的过程也是一个学习的过程,虽然效率不高,但是效果好(就是单词抄写百遍的效果啦)。

90%操作系统的内容 = 枯燥乏味,而10%(余下的内容) = 精妙绝伦

目前感受设计精妙的地方:

  1. 进程切换机制(时钟分派),进程的父子关系,进程的状态。代表:fork,wait,sleep,exit,kill,如wait和exit的搭配为例,进程的销毁是惰性的(销毁操作集中于父进程中的wait)。
  2. 进程间通信机制。通信分异步和同步,那么这里实现的是同步IPC,比较简单,用不着写队列。这是微内核的基础,它将某些同类别的系统调用转化为唯一一个系统调用0x80。如何区分不同功能的调用呢?就是在调用int 0x80前将参数入栈,参数有通信方式(SEND/RECEIVE),通信对象(正数表示pid,-1表示任意对象),消息结构

这些精妙的地方只能通过代码去体会。

用户代码

static void halt() {
    while (1);
}

static int delay() {
    volatile int i;

    for (i = 0; i < 0x1000000; i++);
    return i;
}

void user_main() {

    int i;

    i = call(SYS_FORK);

    delay();
    if (i != 0) {
        sys_tasks0();
        delay();
        call(SYS_WAIT);
    } else {
        while (1) {
            delay();
            i = sys_ticks();
            delay();
            printk("!! proc#%d received tick '%d'
", proc2pid(proc), i);
            delay();
            delay();
            delay();
            delay();
        }
    }

    printk("halt...
");

    halt();
}

void sys_tasks0() {
    extern uint32_t tick;

    MESSAGE msg;
    while (1) {
        send_recv(RECEIVE, TASK_ANY, &msg);
        int src = msg.source;
        switch (msg.type) {
        case SYS_TICKS:
            msg.RETVAL = tick;
            printk("!! proc #%d sent tick '%d'
", proc2pid(proc), tick);
            send_recv(SEND, src, &msg);
            break;
        default:
            assert(!"unknown msg type");
            break;
        }
    }
}

static int sys_ipc_call(int type) {
    MESSAGE msg;
    reset_msg(&msg);
    msg.type = type;
    send_recv(BOTH, TASK_SYS, &msg);
    return msg.RETVAL;
}

int sys_ticks() {
    return sys_ipc_call(SYS_TICKS);
}

进程的结构

说到IPC,可能会想到pipe、clipboard、windows message、socket、shared memory、file等方式,然而没法实现那么多(括弧笑,所以跟着书上走吧~

目前只是抄了书上的代码(希望尽快看到结果),还没时间去分析IPC机制的代码。

首先,我们创建的进程proc的结构有:

struct proc {
    /* KERNEL */
    struct interrupt_frame *fi;     // 中断现场
    volatile uint8_t pid;           // 进程ID
    uint32_t size;                  // 用户空间大小
    uint8_t state;                  // 进程状态
    char name[PN_MAX_LEN];          // 进程名称
    pde_t *pgdir;                   // 虚页目录(一级页表)
    char *stack;                    // 进程内核堆栈
    struct proc *parent;            // 父进程
    int8_t ticks;                   // 时间片
    int8_t priority;                // 优先级
    /* IPC */
    int p_flags;                    // 标识
    MESSAGE *p_msg;                 // 消息
    int p_recvfrom;                 // 接收消息的进程ID
    int p_sendto;                   // 发送消息的进程ID
    int has_int_msg;                // nonzero if an INTERRUPT occurred when the task is not ready to deal with it.
    struct proc *q_sending;         // queue of procs sending messages to this proc
    struct proc *next_sending;      // next proc in the sending queue (q_sending)
};

结构很复杂吧?不过,如果是一步步实现功能,往里添加的话,其实也不算多。

PCB结构:

  1. 进程相关信息:名称,ID,状态,父进程
  2. 调度信息:中断现场,时间片,优先级
  3. 内存信息:代码空间大小,页表,内核堆栈
  4. IPC:消息收发状态flags,消息msg,收发进程ID,消息队列(链表)

进程的切换:

  1. 主进程死循环,通过时钟中断,进行调度
  2. 中断时保存现场(即proc->fi),如果是最外层中断,那么起调度作用(此时k_reenter=0),内层中断k_reenter>0,中断结束后iret返回,从proc->fi中恢复现场,此时修改相应特权级

微内核架构

原版的linux中有一堆的系统调用,那么微内核架构与此不同,它将系统调用按功能划分开来,如分成内存管理、文件管理等,建立专门的系统级进程来负责系统调用。

那么,也就是 “ring1级系统服务进程” 与 系统 打交道(通过系统调用),而我们的“ring3级用户进程” 只要与 “ring1级系统服务进程” 通信就可以了。结论:ring3用户级 <=> ring1服务级 <=> ring0系统级,ring1就像中介一样,而ring0与ring3可以素不相识。这样,微内核架构(相当于微服务)抽象出一个服务层sys_task,降低了耦合度。

  • ring1与ring0打交道:通过系统调用即可
  • ring1与ring3打交道:维护一个消息等待队列

进程间通信

主要分两个函数msg_send和msg_receive。

收/发消息有几种情况:

  1. 系统服务监听消息:没消息时休眠,来消息时唤醒
  2. 系统服务发送消息:仅当系统服务收到用户的SEND消息后,被唤醒,随后用户再发送RECV消息,系统服务收到后设置msg
  3. 用户进程发送消息:系统服务不可用,用户进程堵塞,直到系统服务处理完其他任务,从等待队列中取出用户进程,并唤醒用户进程
  4. 用户进程接收消息:当用户进程发送SEND消息收到回应后,会再发送一个RECV消息,等待系统服务响应并设置msg,最后用户进程拿到设置后的msg

这里的操作挺像TCP的握手操作的,归纳起来的同步通信模型

  1. 系统服务:监听消息(就像web服务器一样),单线程: while(1){recv(), send()}
  2. 用户进程:像web客户端一样,单线程:{send() recv()}

消息队列

统一调用接口:

int send_recv(int function, int src_dest, MESSAGE* msg)
{
    int ret = 0, caller;

    caller = proc2pid(proc);

    if (function == RECEIVE)
        memset(msg, 0, sizeof(MESSAGE));

    switch (function) {
    case BOTH: // 先发送再接收
        ret = _sendrec(SEND, src_dest, msg, caller);
        if (ret == 0)
            ret = _sendrec(RECEIVE, src_dest, msg, caller);
        break;
    case SEND:
    case RECEIVE:
        ret = _sendrec(function, src_dest, msg, caller);
        break;
    default:
        assert((function == BOTH) ||
               (function == SEND) || (function == RECEIVE));
        break;
    }

    return ret;
}

对于系统服务service:

  1. send_recv(RECV, TASK_ANY, msg) 监听消息
  2. 处理msg
  3. send_recv(RECV, msg.source, msg) 发送消息给客户端

对于客户端程序client:

  1. 初始化msg
  2. send_recv(SEND, SYSTASK_ID, msg) 发送消息给系统服务
  3. send_recv(RECV, SYSTASK_ID, msg) 堵塞并接收消息
  4. 处理msg

一、发送消息

int msg_send(struct proc* current, int dest, MESSAGE* m)
{
    struct proc* sender = current;
    struct proc* p_dest = npid(dest); /* proc dest */

    /* check for deadlock here */
    if (deadlock(proc2pid(sender), dest)) {
        printk("DEADLOCK! %d --> %d
", sender->pid, p_dest->pid);
        assert(!"DEADLOCK");
    }

    if ((p_dest->p_flags & RECEIVING) && /* dest is waiting for the msg */
        (p_dest->p_recvfrom == proc2pid(sender) ||
         p_dest->p_recvfrom == TASK_ANY)) {

        memcpy(va2la(dest, p_dest->p_msg),
              va2la(proc2pid(sender), m),
              sizeof(MESSAGE));

        p_dest->p_msg = 0;
        p_dest->p_flags &= ~RECEIVING; /* dest has received the msg */
        p_dest->p_recvfrom = TASK_NONE;
        unblock(p_dest);
    }
    else { /* dest is not waiting for the msg */
        sender->p_flags |= SENDING;
        sender->p_sendto = dest;
        sender->p_msg = m;

        /* append to the sending queue */
        struct proc * p;
        if (p_dest->q_sending) {
            p = p_dest->q_sending;
            while (p->next_sending)
                p = p->next_sending;
            p->next_sending = sender;
        }
        else {
            p_dest->q_sending = sender;
        }
        sender->next_sending = 0;

        block(sender);
    }

    return 0;
}

解释:

  1. 判断是否死锁,即A->send->B,同时B->send->A
  2. 若对方正在监听消息,则将msg拷贝到对方的p_msg中,并消除对方的监听与堵塞状态
  3. 若对方不在监听消息(可能在处理其他事务),则将发送方PCB指针插入到对方的q_sending队列中,并将发送方堵塞以等待接收方的回应

二、接收消息

int msg_receive(struct proc* current, int src, MESSAGE* m)
{
    struct proc* p_who_wanna_recv = current;
    struct proc* p_from = 0; /* from which the message will be fetched */
    struct proc* prev = 0;
    int copyok = 0;

    if ((p_who_wanna_recv->has_int_msg) &&
        ((src == TASK_ANY) || (src == INTERRUPT))) {
        /* There is an interrupt needs p_who_wanna_recv's handling and
         * p_who_wanna_recv is ready to handle it.
         */

        MESSAGE msg;
        reset_msg(&msg);
        msg.source = INTERRUPT;
        msg.type = HARD_INT;
        assert(m);
        memcpy(va2la(proc2pid(p_who_wanna_recv), m), &msg,
              sizeof(MESSAGE));

        p_who_wanna_recv->has_int_msg = 0;

        return 0;
    }


    /* Arrives here if no interrupt for p_who_wanna_recv. */
    if (src == TASK_ANY) {
        /* p_who_wanna_recv is ready to receive messages from
         * TASK_ANY proc, we'll check the sending queue and pick the
         * first proc in it.
         */
        if (p_who_wanna_recv->q_sending) {
            p_from = p_who_wanna_recv->q_sending;
            copyok = 1;
        }
    }
    else {
        /* p_who_wanna_recv wants to receive a message from
         * a certain proc: src.
         */
        p_from = npid(src);

        if ((p_from->p_flags & SENDING) &&
            (p_from->p_sendto == proc2pid(p_who_wanna_recv))) {
            /* Perfect, src is sending a message to
             * p_who_wanna_recv.
             */
            copyok = 1;

            struct proc* p = p_who_wanna_recv->q_sending;
            while (p) {
                assert(p_from->p_flags & SENDING);
                if (proc2pid(p) == proc2pid(npid(src))) { /* if p is the one */
                    p_from = p;
                    break;
                }
                prev = p;
                p = p->next_sending;
            }

        }
    }

    if (copyok) {
        /* It's determined from which proc the message will
         * be copied. Note that this proc must have been
         * waiting for this moment in the queue, so we should
         * remove it from the queue.
         */
        if (p_from == p_who_wanna_recv->q_sending) { /* the 1st one */
            assert(prev == 0);
            p_who_wanna_recv->q_sending = p_from->next_sending;
            p_from->next_sending = 0;
        }
        else {
            prev->next_sending = p_from->next_sending;
            p_from->next_sending = 0;
        }

        /* copy the message */
        memcpy(va2la(proc2pid(p_who_wanna_recv), m),
              va2la(proc2pid(p_from), p_from->p_msg),
              sizeof(MESSAGE));

        p_from->p_msg = 0;
        p_from->p_sendto = TASK_NONE;
        p_from->p_flags &= ~SENDING;
        unblock(p_from);
    }
    else {  /* nobody's sending TASK_ANY msg */
        /* Set p_flags so that p_who_wanna_recv will not
         * be scheduled until it is unblocked.
         */
        p_who_wanna_recv->p_flags |= RECEIVING;

        p_who_wanna_recv->p_msg = m;

        if (src == TASK_ANY)
            p_who_wanna_recv->p_recvfrom = TASK_ANY;
        else
            p_who_wanna_recv->p_recvfrom = proc2pid(p_from);

        block(p_who_wanna_recv);

    }

    return 0;
}

解释:

  1. 若接收方发生中断,则处理中断,函数立即返回
  2. 若接收方可以接收一切消息TASK_ANY,那么此时判断q_sending发送队列中是否有消息,是的话,则从队列中取消息,清除发送方的SENDING状态;如果此时q_sending中没有消息,则接收方堵塞,置RECEIVING状态
  3. 若接收方只接收某一种消息,则当消息不匹配时,接收方堵塞;若消息匹配,进行第2步中的取消息操作

死锁的简单判断:

由于q_sending队列表示等待队列,只要遍历它,看是否可以遍历到当前进程本身即可。

堵塞的简单实现:

堵塞意味着要暂停当前进程并切换到其他进程,然而本系统的实现有限,只能强行触发时钟中断进行进程切换,由此可能导致BUG。

阶段性总结

如果说debug是负反馈,那么proc和ipc的实现就是大大的正反馈,先前用java实现了解释器并构建操作系统(bajdcc/jMiniLang),提供lambda、coroutine、multi-process等机制,但效率极低,求个一百内素数都要半天,还是没法完成做一个操作系统的愿望。本来用C/C++/ java/C# 也造了好多好多轮子,那么这次实现操作系统只用到了ASM和C,但是!!!难度非同小可!因为:资料贫乏、机制复杂、陷阱众多、难以调试、理解困难等等……但我没有放弃!!但看来IPC运行良好没有panic的时候,我的内心是非常喜悦的!这大概就是编程的美吧!

https://zhuanlan.zhihu.com/p/26054925备份。

原文地址:https://www.cnblogs.com/bajdcc/p/8972965.html