POSIX异步I/O接口使用

POSIX1003.1b 实时扩展协议规定的标准异步 I/O 接口,即 aio_read 函数、 aio_write 函数、aio_fsync 函数、aio_cancel 函数、aio_error 函数、aio_return 函数、aio_suspend函数和 lio_listio 函数。这组 API 用来操作异步 I/O。

异步 I/O 是针对同步 I/O 提出的概念,它不需要线程等待 I/O 结果,而只需要请求进行传输,然后系统会自动完成 I/O 传输,结束或者出现错误时会产生相应的 I/O 信号,用户程序只需要设置好对应的信号陷入函数,即可处理一个异步 I/O 事件。

        #man aio

        The POSIX AIO interface consists of the following functions:

        aio_read(3)     Enqueue a read request.  This is the asynchronous analog of read(2).

        aio_write(3)    Enqueue a write request.  This is the asynchronous analog of write(2).

        aio_fsync(3)    Enqueue a sync request for the I/O operations on a file descriptor.  This is the asynchronous analog of fsync(2) and fdatasync(2).

        aio_error(3)    Obtain the error status of an enqueued I/O request.

        aio_return(3)   Obtain the return status of a completed I/O request.

        aio_suspend(3)  Suspend the caller until one or more of a specified set of I/O requests completes.

        aio_cancel(3)   Attempt to cancel outstanding I/O requests on a specified file descriptor.

        lio_listio(3)   Enqueue multiple I/O requests using a single function call.

        The aiocb ("asynchronous I/O control block") structure defines parameters that control an I/O operation.  An argument of this type is employed with all  of  the  functions  listed
        above.  This structure has the following form:

           #include <aiocb.h>

           struct aiocb {
               /* The order of these fields is implementation-dependent */

               int             aio_fildes;     /* File descriptor */
               off_t           aio_offset;     /* File offset */
               volatile void  *aio_buf;        /* Location of buffer */
               size_t          aio_nbytes;     /* Length of transfer */
               int             aio_reqprio;    /* Request priority */
               struct sigevent aio_sigevent;   /* Notification method */
               int             aio_lio_opcode; /* Operation to be performed;
                                                  lio_listio() only */

               /* Various implementation-internal fields not shown */
           };

            struct sigevent{
                int             sigev_notify;    // notify type
                int             sigev_signo;     // signal number
                union sigval    sigev_value;     // notify argument
                void (*sigev_notify_function)(union sigval);    // notify function
                pthread_attr_t *sigev_notify_attributes;        // notify attrs
            };

在 aiocb 结构中,aio_fildes 字段表示被打开用来读或写的文件描述符。

读或写操作从 aio_offset 指定的偏移量开始(注意,异步 I/O 操作必须显示地指定偏移量。只要不在同一个进程里把异步 I/O 函数和传统 I/O 函数混在一起用在同一个文件上,异步 I/O 接口是不会影响操作系统维护的文件偏移量的。

另外,如果使用异步 I/O 接口向一个以追加模式打开的文件中写入数据,aio_offset 字段会被忽略)。

读写数据的操作都是在 aio_buf 指定的缓冲区中进行,aio_nbytes 字段则包含了要读写的字节数。

aio_reqprio 为异步 I/O 请求提示顺序(但系统对该顺序的控制力有限,因此不一定遵循)。

LIO_NOP 没有传输请求

LIO_READ 请求一个读操作

LIO_WRITE 请求一个写操作

LIO_SYNC 请求异步 I/O 同步

aio_lio_opcode 字段只能用于基于列表的异步 I/O(见下)。aio_sigevent 使用 sigevent 结构来控制在 I/O 事件完成后,如何通知应用程序。
在 sigevent 结构中,sigev_notify 字段控制通知的类型,其取值可能是以下 3 个中的一个。

1)SIGEV_NONE:
      异步 I/O 请求完成后,不通知进程。
2)SIGEV_SIGNAL:
      异步 I/O 请求完成后,产生由 sigev_signo 字段指定的信号。如果应用程序已选择捕捉信号,且在建立信号处理程序时指定了 SA_SIGINFO 标志,那么该信号将被入队(如果实现支持排队信号)。
      信号处理程序会传送给一个 siginfo 结构,该结构的 si_value 字段被设置为 sigev_value(如果使用了 SA_SIGINFO 标志)。
3)SIGEV_THREAD:
      当异步 I/O 请求完成时,由 sigev_notify_function 指定的函数会被调用,sigev_value 字段被作为它的唯一参数传入。
      除非 sigev_notify_attributes 字段被设置为 pthread 属性结构的地址,且该结构指定了另一个线程属性,否则该函数将在分离状态下的一个单独的线程中执行。


进行异步 I/O 之前需要先初始化 AIO 控制块。aio_read 和 aio_write 函数可分别用来进行异步读和异步写操作。

调用 aio_read 函数和 aio_write 函数成功之后,异步 I/O 请求便已经被操作系统放入等待处理的队列中了。这些返回值与实际 I/O 操作的结果没有任何关系,如果需要查看函数的返回状态可调用 aio_error 函数。
如果想强制所有等待中的异步操作不等待而写入存储中,可以建立一个 AIO 控制块并调用 aio_fsync 函数。

aio_fsync 在安排了同步后便返回,在异步同步操作完成前,数据不会被持久化。AIO 控制块中的 aio_fields 字段指定了其异步写操作被同步的文件。如果 op 参数被设置为 O_DSYNC,那么操作执行起来就会像调用了 fdatasync 一样。否则,如果 op 参数被设置为 O_SYNC,则操作执行起来就会像调用了 fsync 一样。
aio_error 的返回值为下面 4 种情况中的一种。

10:表示异步操作成功完成,需要调用 aio_return 函数来获取操作返回值。
(2)-1:表示对 aio_error 的调用失败,这可以通过查看 errno 来获知失败原因。
(3)EINPROGRESS:表示异步读、写或同步操作仍在等待。
(4)其他情况:其他任何的返回值是相关的异步操作失败返回的错误码。


直到异步操作完成之前,都需要小心不要调用 aio_return 函数,否则结果是未定义的。而且还要小心对每个异步操作只调用一次 aio_return,因为一旦调用了该函数,操作系统就可以释放掉包含了 I/O 操作返回值的记录。如果 aio_return 本身失败,就会返回 -1,并设置 errno。否则其他情况下,它将直接返回 read、write 或 fsync 被成功调用时的结果。
执行 I/O 操作时,如果还有其他事务要处理而不想被 I/O 操作阻塞,就可以使用异步 I/O。但如果在完成了所有事务后还有异步操作未完成时,就可以调用 aio_suspend 函数来阻塞进程,直到操作完成。

RETURN VALUE
       If this function returns after completion of one of the I/O requests specified in aiocb_list, 0 is returned.  Otherwise, -1 is returned, and errno is set to indicate the error.

ERRORS
       EAGAIN The call timed out before any of the indicated operations had completed.

       EINTR  The call was ended by signal (possibly the completion signal of one of the operations we were waiting for); see signal(7).




而当还有不想再完成的等待中的异步 I/O 操作时,可以尝试用 aio_cancel 函数来取消它们。

aio_cancel 会返回以下情况中的一种:

       AIO_CANCELED 所有请求的操作已被取消
              All requests were successfully canceled.

       AIO_NOTCANCELED 至少有一个请求操作没被取消
              At least one of the requests specified was not canceled because it was in progress.  In this case, one may check the status of individual requests using aio_error(3).

       AIO_ALLDONE 在请求取消之前已经完成
              All requests had already been completed before the call.

       -1     An error occurred.  The cause of the error can be found by inspecting errno.

以下是一个来自 man 手册的 例程

       #include <fcntl.h>
       #include <stdlib.h>
       #include <unistd.h>
       #include <stdio.h>
       #include <errno.h>
       #include <aio.h>
       #include <signal.h>

       #define BUF_SIZE 20     /* Size of buffers for read operations */

       #define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); } while (0)

       #define errMsg(msg)  do { perror(msg); } while (0)

       struct ioRequest {      /* Application-defined structure for tracking
                                  I/O requests */
           int           reqNum;
           int           status;
           struct aiocb *aiocbp;
       };

       static volatile sig_atomic_t gotSIGQUIT = 0;
                               /* On delivery of SIGQUIT, we attempt to
                                  cancel all outstanding I/O requests */

       static void             /* Handler for SIGQUIT */
       quitHandler(int sig)
       {
           gotSIGQUIT = 1;
       }

       #define IO_SIGNAL SIGUSR1   /* Signal used to notify I/O completion */

       static void                 /* Handler for I/O completion signal */
       aioSigHandler(int sig, siginfo_t *si, void *ucontext)
       {
           if (si->si_code == SI_ASYNCIO) {
               write(STDOUT_FILENO, "I/O completion signal received
", 31);

               /* The corresponding ioRequest structure would be available as
                      struct ioRequest *ioReq = si->si_value.sival_ptr;
                  and the file descriptor would then be available via
                      ioReq->aiocbp->aio_fildes */
           }
       }

       int
       main(int argc, char *argv[])
       {
           struct ioRequest *ioList;
           struct aiocb *aiocbList;
           struct sigaction sa;
           int s, j;
           int numReqs;        /* Total number of queued I/O requests */
           int openReqs;       /* Number of I/O requests still in progress */

           if (argc < 2) {
               fprintf(stderr, "Usage: %s <pathname> <pathname>...
",
                       argv[0]);
               exit(EXIT_FAILURE);
           }

           numReqs = argc - 1;

           /* Allocate our arrays */

           ioList = calloc(numReqs, sizeof(struct ioRequest));
           if (ioList == NULL)
               errExit("calloc");

           aiocbList = calloc(numReqs, sizeof(struct aiocb));
           if (aiocbList == NULL)
               errExit("calloc");

           /* Establish handlers for SIGQUIT and the I/O completion signal */

           sa.sa_flags = SA_RESTART;
           sigemptyset(&sa.sa_mask);

           sa.sa_handler = quitHandler;
           if (sigaction(SIGQUIT, &sa, NULL) == -1)
               errExit("sigaction");

           sa.sa_flags = SA_RESTART | SA_SIGINFO;
           sa.sa_sigaction = aioSigHandler;
           if (sigaction(IO_SIGNAL, &sa, NULL) == -1)
               errExit("sigaction");

           /* Open each file specified on the command line, and queue
              a read request on the resulting file descriptor */

           for (j = 0; j < numReqs; j++) {
               ioList[j].reqNum = j;
               ioList[j].status = EINPROGRESS;
               ioList[j].aiocbp = &aiocbList[j];

               ioList[j].aiocbp->aio_fildes = open(argv[j + 1], O_RDONLY);
               if (ioList[j].aiocbp->aio_fildes == -1)
                   errExit("open");
               printf("opened %s on descriptor %d
", argv[j + 1],
                       ioList[j].aiocbp->aio_fildes);

               ioList[j].aiocbp->aio_buf = malloc(BUF_SIZE);
               if (ioList[j].aiocbp->aio_buf == NULL)
                   errExit("malloc");

               ioList[j].aiocbp->aio_nbytes = BUF_SIZE;
               ioList[j].aiocbp->aio_reqprio = 0;
               ioList[j].aiocbp->aio_offset = 0;
               ioList[j].aiocbp->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
               ioList[j].aiocbp->aio_sigevent.sigev_signo = IO_SIGNAL;
               ioList[j].aiocbp->aio_sigevent.sigev_value.sival_ptr =
                                       &ioList[j];

               s = aio_read(ioList[j].aiocbp);
               if (s == -1)
                   errExit("aio_read");
           }

           openReqs = numReqs;

           /* Loop, monitoring status of I/O requests */

           while (openReqs > 0) {
               sleep(3);       /* Delay between each monitoring step */

               if (gotSIGQUIT) {

                   /* On receipt of SIGQUIT, attempt to cancel each of the
                      outstanding I/O requests, and display status returned
                      from the cancellation requests */

                   printf("got SIGQUIT; canceling I/O requests: 
");

                   for (j = 0; j < numReqs; j++) {
                       if (ioList[j].status == EINPROGRESS) {
                           printf("    Request %d on descriptor %d:", j,
                                   ioList[j].aiocbp->aio_fildes);
                           s = aio_cancel(ioList[j].aiocbp->aio_fildes,
                                   ioList[j].aiocbp);
                           if (s == AIO_CANCELED)
                               printf("I/O canceled
");
                           else if (s == AIO_NOTCANCELED)
                               printf("I/O not canceled
");
                           else if (s == AIO_ALLDONE)
                               printf("I/O all done
");
                           else
                               errMsg("aio_cancel");
                       }
                   }

                   gotSIGQUIT = 0;
               }

               /* Check the status of each I/O request that is still
                  in progress */

               printf("aio_error():
");
               for (j = 0; j < numReqs; j++) {
                   if (ioList[j].status == EINPROGRESS) {
                       printf("    for request %d (descriptor %d): ",
                               j, ioList[j].aiocbp->aio_fildes);
                       ioList[j].status = aio_error(ioList[j].aiocbp);

                       switch (ioList[j].status) {
                       case 0:
                           printf("I/O succeeded
");
                           break;
                       case EINPROGRESS:
                           printf("In progress
");
                           break;
                       case ECANCELED:
                           printf("Canceled
");
                           break;
                       default:
                           errMsg("aio_error");
                           break;
                       }

                       if (ioList[j].status != EINPROGRESS)
                           openReqs--;
                   }
               }
           }

           printf("All I/O requests completed
");

           /* Check status return of all I/O requests */

           printf("aio_return():
");
           for (j = 0; j < numReqs; j++) {
               ssize_t s;

               s = aio_return(ioList[j].aiocbp);
               printf("    for request %d (descriptor %d): %zd
",
                       j, ioList[j].aiocbp->aio_fildes, s);
           }

           exit(EXIT_SUCCESS);
       }




ref:https://blog.csdn.net/aisxyz/article/details/84915025

原文地址:https://www.cnblogs.com/schips/p/11155767.html