OSLab多线程

日期:2019/3/26

内容:多线程。

一、基本知识

  • 线程的定义

    线程(thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

    一个进程至少有一个线程(记为主线程),多开线程称为工作线程。

  • 并发特性

    当前的进程可以理解为一个主线程,新建的进程称为工作线程。主线程与工作线程交织并发执行。(见2.1的代码实例)

  • 线程互斥

    多个线程之间有共享资源(shared resource)时会出现互斥现象。

    例如设有若干线程共享某个变量,而且都对变量有修改。如果它们之间不考虑相互协调工作,就会产生混乱。比如,线程A和B共用变量x,都对x执行增1操作。由于A和B没有协调,两线程对x的读取、修改和写入操作相互交叉,可能两个线程读取相同个x值,一个线程将修改后的x新值写入到x后,另一个线程也把自己对x修改后的新值写入到x。这样,x只记录后一个线程的修改作用。

  • 使用线程时,需要链接pthread库

gcc f.c -o f -lpthread

 

二、线程的系统调用

2.1 创建线程(pthread_create)

  • 原型:int pthread_create(pthread_t *tid, pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);
  • 功能:

    >>创建一个线程

    >>新线程从start_routine开始执行

    >>新线程的ID保存在tid指向的位置

  • 返回值:成功0,失败非0。(实际是返回-1,但是习惯上使用!=0判断更好)
  • 参数解析

参数

功能

tid

该参数是一个指针, 新线程的ID保存在tid指向的位置

pthread_t等同于unsigned long int (8 bytes)

attr

线程属性。如果为空,则使用缺省的属性值

start_routine

该参数是一个函数指针, 新线程从start_routine开始执行

arg

提供给start_routine的参数

  • attr解析(需要用时补充)
  • arg解析

    >>对于整型变量

int ivalue = 123;

void *arg = (void *) ivalue;

>>字符串(如下代码实例)

>>结构体

struct person {

char *name;

int age;

} p;

void *arg = (void *) &p;

  • 并发代码实例

void *thread_run(void *args)

{

    int i = 5;

    const char *str = (const char *)(args);

    while(i--)

    {

        printf("%s running... ", str);

        sleep(1);

    }

    return NULL;

}

 

int main()

{

    pthread_t ptid;

    pthread_create(&ptid, NULL, &thread_run, "subthread");

 

    int i = 5;

    while(i--)

    {

        puts("main thread running...");

        sleep(1);

    }

    return 0;

}

 

2.2 等待线程(pthread_join)

  • 原型:int pthread_join(pthread_t tid, void **result);
  • 功能:等待线程结束
  • 参数:

    >>tid: 目标线程的id

    >>result: 用于存放线程的计算结果

  • 返回值:成功0,失败非0。
  • 代码实例1(线程并行计算,全局变量记录结果)

#include <stdio.h>

#include <pthread.h>

 

int array[] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};

int master_res = 0;

int worker_res = 0;

 

void master_run()

{

    int i=0;

    for(i=0; i<=4; i++)

        master_res += array[i];

}

void *worker_run(void *args)

{

    int i;

    for(i=5; i<=9; i++)

        worker_res += array[i];

}

 

int main()

{

    pthread_t tid;

    pthread_create(&tid, NULL, &worker_run, NULL);

    master_run();

    pthread_join(tid, NULL);

    printf("master = %d, worker = %d, total = %d... ", master_res, worker_res, master_res + worker_res);

    return 0;

 

}

  • 代码实例2(线程并行计算,局部变量记录结果)

#include <stdio.h>

#include <string.h>

#include <pthread.h>

#define NR_TOTAL 100

#define NR_CPU 4

#define NR_CHILD (NR_TOTAL / NR_CPU)

typedef struct

{

    int *array;

    int start, end;

} param;

typedef struct

{

    int sum;

} result;

 

int array[NR_TOTAL] = {0};

 

void *worker_run(void *args)

{

    param *p = (param *)(args);

    result *res = (result *)malloc(sizeof(result));

    int i;

    int sum = 0;

    for (i = p->start; i <= p->end; i++)

    {

        sum += p->array[i];

    }

    res->sum = sum;

    return (void *)res;

}

 

int main()

{

    int i;

    int sum = 0;

    pthread_t tid[NR_CPU] = {0};

    param params[NR_CPU];

 

    for (i = 0; i < NR_TOTAL; i++)

        array[i] = i + 1;

 

    for (i = 0; i < NR_CPU; i++)

    {

        param *p = &params[i];

        p->array = array;

        p->start = i * NR_CHILD;

        p->end = p->start + NR_CHILD - 1;

        pthread_create(&tid[i], NULL, &worker_run, (void *)p);

    }

 

    for (i = 0; i < NR_CPU; i++)

    {

        result *res = NULL;

        pthread_join(tid[i], (void **)(&res));

        sum += res->sum;

        free(res);

    }

 

    printf("sum = %d ", sum);

}

  • 代码实例3(多线程求和与单线程求和对比,1亿个数据求和)

运行结果和代码如下。可见单线程与多线程做一亿次加法的时间差不多。自己想了一下原因:代码在阿里云的机器上跑的,买的学生版,CPU是单核的,这种情况下NR_CPU个线程只能轮流使用CPU,本质上就是单线程。而且还多出来线程切换的开销,更不划算。

后来把代码拉到本地运行,本地CPU配置4核4线程,单线程结果稳定在0.77s,多线程结果稳定在0.57-0.58s,说明多线程计算确实是比单线程要快的。

单线程

#define SIZE (100000000)

static uint32_t array[SIZE] = {0};

static uint64_t sum = 0;

clock_t start, end;

int main()

{

    start = clock();

    uint32_t i = 0;

    for (i = 0; i < SIZE; i++)

        array[i] = i + 1;

    for (i = 0; i < SIZE; i++)

        sum += array[i];

    end = clock();

    printf("sum is %lld... ", sum);

    printf("time is %lfs... ", ((double)end - start) / CLOCKS_PER_SEC);

}

输出(sum的值无意义,因为溢出,用于检验2次求和结果是否一致)

本地机器

多线程

#include <stdio.h>

#include <stdlib.h>

#include <pthread.h>

#include <time.h>

#include <stdint.h>

#include <string.h>

#define SIZE (100000000)

#define NR_TOTAL SIZE

#define NR_CPU (32)

#define NR_CHILD (NR_TOTAL / NR_CPU)

typedef struct

{

    uint32_t *array;

    uint32_t start, end;

} param;

typedef struct

{

    uint64_t sum;

} result;

uint32_t array[SIZE] = { 0 };

uint64_t sum = 0;

clock_t start, end;

param params[NR_CPU];

result res[NR_CPU];

pthread_t tid[NR_CPU];

 

void *worker_run(void *args)

{

    uint32_t i = 0;

    uint64_t sum = 0;

    param *p = (param *)args;

    result *r = (result *)malloc(sizeof(result));

    for(i = p->start; i <= p->end; i++)

    {

        sum += p->array[i];

    }

    r->sum = sum;

    return (void *)r;

}

 

int main()

{

    uint32_t i = 0;

    start = clock();

    for (i = 0; i < SIZE; i++)

        array[i] = i + 1;

    for (i = 0; i < NR_CPU; i++)

    {

        param *p = &params[i];

        p->array = array;

        p->start = i * NR_CHILD;

        p->end = p->start + NR_CHILD - 1;

        pthread_create(&tid[i], NULL, &worker_run, (void *)p);

    }

 

    for (i = 0; i < NR_CPU; i++)

    {

        result *r;

        pthread_join(tid[i], (void **)&r);

        sum += r->sum;

        free(r);

    }

    end = clock();

    printf("sum is %lld... ", sum);

    printf("time is %lfs... ", ((double)end - start)/CLOCKS_PER_SEC);

}

 

输出

本地机器

2.3 线程互斥(pthread_mutex)

  • 原型

    >> int pthread_mutex_init(pthread_mutex_t *mutex, pthread_mutexattr_t *attr);

    >> int pthread_mutex_destroy(pthread_mutex_t *mutex);

    >> int pthread_mutex_lock(pthread_mutex_t *mutex);

    >> int pthread_mutex_unlock(pthread_mutex_t *mutex);

  • 功能

    >> pthread_mutex_init初始化互斥量

    >> pthread_mutex_destroy释放互斥量

    >> pthread_mutex_lock将互斥量加锁

    >> pthread_mutex_unlock将互斥量解锁

  • 补充说明

    当pthread_mutex_lock()返回时,该互斥锁已被锁定。线程调用该函数让互斥锁上锁,如果该互斥锁已被另一个线程锁定和拥有,则调用的线程将阻塞,直到该互斥锁变为可用为止。

  • 返回值:成功0,失败非0(上述4个函数均如此)
  • 参数

    >>pthread_mutex_t是一个结构体。(具体形式比较复杂,需要时再补充)

    >>如果attr等于NULL,则使用缺省的属性进行初始化

  • 使用说明

开启线程时

线程入口函数

  1. 声明pthread_mutex_t mutex
  2. 执行init
  3. 创建(create)若干个线程
  4. 等待(join)线程
  5. destroy mutex
  1. 对共享资源(例如一个全局变量)操作前先执行lock
  2. 操作共享资源
  3. 操作后执行unlock

 

  • 实例代码(一点存留疑问:这样代码效率跟单线程相比有提高?)

static int global = 0;

pthread_mutex_t mutex;

void *worker_run(void *args)

{

    int i = 0;

    for(i = 0; i < 1000000; i++)

    {

        pthread_mutex_lock(&mutex);

        global++;

        pthread_mutex_unlock(&mutex);

    }

    return NULL;

}

int main()

{

    pthread_t tid[3] = { 0 };

    pthread_mutex_init(&mutex, NULL);

    int i = 0;

    for(i = 0; i < 3; i++)

        pthread_create(&tid[i], NULL, &worker_run, NULL);

      

    for(i = 0; i < 3; i++)

        pthread_join(tid[i], NULL);

    pthread_mutex_destroy(&mutex);

    printf("global = %d ", global);

    return 0;

}

 

2.4 条件变量(pthread_cond)

  • 初始化与销毁
    • 原型

      >> int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *attr);

      >> int pthread_cond_destroy(pthread_cond_t *cond);

    • 功能

      >> pthread_cond_init初始化条件变量

      >> pthread_cond_destroy释放条件变量

    • 参数

      >>如果attr等于NULL,则使用缺省的属性进行初始化

      >> pthread_condattr_t是一个结构体,见文献1的blog

      >> pthread_cond_t是一个union,见文献1的blog

  • 等待
    • 原型:int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
    • 功能:阻塞当前线程的运行
    • 参数:cond表示当前线程阻塞直到满足cond条件;mutex表示当前线程阻塞时所在的临界区。
    • 返回值:成功0,失败非0。
  • 唤醒
    • 原型

      >>int pthread_cond_signal(pthread_cond_t *cond);

      >>int pthread_cond_broadcast(pthread_cond_t *cond);

    • 功能

      >>pthread_cond_signal唤醒阻塞在条件变量上的一个线程

      >>pthread_cond_broadcast唤醒阻塞在条件变量上的所有线程

    • 返回值:成功0,失败非0.
  • 共享缓冲区

初始状态

加入数据

缓冲区满

非法状态(缓冲区溢出)

  • 代码实例1——生产者与消费者(条件变量完成同步)

#define BUFFER_SIZE (4)

#define BUFFER_CAPACITY (BUFFER_SIZE - 1)

#define PRODUCE_SIZE (BUFFER_SIZE * 2)

#define CONSUME_SIZE (BUFFER_SIZE * 2)

typedef unsigned char bool;

char buffer[BUFFER_SIZE] = "";

int in = 0;

int out = 0;

pthread_mutex_t mutex;

pthread_cond_t wait_until_full;

pthread_cond_t wait_until_empty;

 

void print_buffer_state()

{

        int i = 0;

        for(i = 0; i < BUFFER_SIZE; i++)

                if(buffer[i] != '')

                        printf("%c ", buffer[i]);

        printf(" ");

}

bool buffer_is_empty()

{

        return in == out;

}

bool buffer_is_full()

{

        return (in + 1) % BUFFER_SIZE == out;

}

void put_item(char item)

{

        buffer[in] = item;

        in = (in + 1) % BUFFER_SIZE;

}

char get_item()

{

        char item = buffer[out];

        buffer[out] = '';

        out = (out + 1) % BUFFER_SIZE;

        return item;

}

 

void *consume_run(void *args)

{

        int i = 0;

        for(i = 0; i < CONSUME_SIZE; i++)

        {

                pthread_mutex_lock(&mutex);

                while (buffer_is_empty())

                        pthread_cond_wait(&wait_until_full, &mutex);

                printf(" consume item = %c ", get_item());

                //print_buffer_state();

                pthread_cond_signal(&wait_until_empty); //wake up producer

                pthread_mutex_unlock(&mutex);

        }

        return NULL;

}

 

void produce_run()

{

        int i = 0;

        for(i = 0; i < PRODUCE_SIZE; i++)

        {

                pthread_mutex_lock(&mutex);

                while (buffer_is_full())

                        pthread_cond_wait(&wait_until_empty, &mutex);

                char item = 'a' + i;

                printf("produce item = %c ", item);

                put_item(item);

                //print_buffer_state();

                pthread_cond_signal(&wait_until_full); //wake up consume

                pthread_mutex_unlock(&mutex);

        }

}

 

int main()

{

        pthread_mutex_init(&mutex, NULL);

        pthread_cond_init(&wait_until_full, NULL);

        pthread_cond_init(&wait_until_empty, NULL);

 

        pthread_t consume_tid = 0;

        pthread_create(&consume_tid, NULL, &consume_run, NULL);

        produce_run();

        pthread_join(consume_tid, NULL);

        return 0;

}

 

运行结果

结果分析:

实际上,produce不必等到buffer为空才执行,consume也不必等到buffer为满才执行,produce和consume是并发执行的,所以才会有第二次的运行结果。

 

  • 代码实例2——生产者与消费者(信号量完成同步)

#include <stdio.h>

#include <pthread.h>

#define BUFF_SIZE 4

#define ITEM_COUNT (BUFF_SIZE * 2)

typedef unsigned char bool;

typedef struct

{

    int value;

    pthread_mutex_t mutex;

    pthread_cond_t cond;

} sema_t;

char buffer[BUFF_SIZE];

int pin = 0;

int pout = 0;

sema_t mutex_sema;//threads mutex to access buffer

sema_t buffer_notfull_sema;// thread sync

sema_t buffer_notempty_sema;

bool is_empty()

{

    return pin == pout;

}

bool is_full()

{

    return ((pin + 1) % BUFF_SIZE) == pout;

}

char get_item()

{

    char item;

    item = buffer[pout];

    pout = (pout + 1) % BUFF_SIZE;

    return item;

}

void put_item(char item)

{

    buffer[pin] = item;

    pin = (pin + 1) % BUFF_SIZE;

}

void sema_init(sema_t *sema, int value)

{

    sema->value = value;

    pthread_mutex_init(&sema->mutex, NULL);

    pthread_cond_init(&sema->cond, NULL);

}

//wait

//if value<=0 then wait until meet the cond

//else value--

void sema_wait(sema_t *sema)

{

    pthread_mutex_lock(&sema->mutex);

    while (sema->value <= 0)

        pthread_cond_wait(&sema->cond, &sema->mutex);

    sema->value--;

    pthread_mutex_unlock(&sema->mutex);

}

void sema_signal(sema_t *sema)

{

    pthread_mutex_lock(&sema->mutex);

    sema->value++;

    pthread_cond_signal(&sema->cond);//wake up another thread

    pthread_mutex_unlock(&sema->mutex);

}

void *consume_run(void *arg)

{

    int i;

    char item;

    for(i = 0; i < ITEM_COUNT; i++)

    {

        sema_wait(&buffer_notempty_sema);

        sema_wait(&mutex_sema);

        printf(" consume item: %c ", get_item());

        sema_signal(&mutex_sema);

        sema_signal(&buffer_notfull_sema);

    }

    return NULL;

}

void *produce_run()

{

    int i;

    char item;

    for (i = 0; i < ITEM_COUNT; i++)

    {

        sema_wait(&buffer_notfull_sema);

        sema_wait(&mutex_sema);

        item = i + 'a';

        put_item(item);

        printf("produce item: %c ", item);

        sema_signal(&mutex_sema);

        sema_signal(&buffer_notempty_sema);

    }

    return NULL;

}

 

int main()

{

    pthread_t consume_tid;

    sema_init(&mutex_sema, 1);

    sema_init(&buffer_notfull_sema, BUFF_SIZE - 1);

    sema_init(&buffer_notempty_sema, 0);

    pthread_create(&consume_tid, NULL, consume_run, NULL);

    produce_run();

    pthread_join(consume_tid, NULL);

    return 0;

}

 

运行结果

三、名词解析

  • pthread:p的含义是POSIX
  • POSIX:Portable Operating System Interface of Unix,可移植操作系统接口。
  • mutex:意为互斥体。

参考文献

原文地址:https://www.cnblogs.com/sinkinben/p/10793190.html