8.8 Programming with Asynchronous IO

Team-FLY

8.8 Programming with Asynchronous I/O

Normally, when performing a read or write, a process blocks until the I/O completes. Some types of performance-critical applications would rather initiate the request and continue executing, allowing the I/O operation to be processed asynchronously with program execution. The older method of asynchronous I/O uses either SIGPOLL or SIGIO to notify a process when I/O is available. The mechanism for using these signals is set up with ioctl . This section discusses the newer version which is part of the POSIX:AIO Asynchronous I/O Extension that was introduced with the POSIX:RTS Realtime Extension.

The POSIX:AIO Extension bases its definition of asynchronous I/O on four main functions. The aio_read function allows a process to queue a request for reading on an open file descriptor. The aio_write function queues requests for writing. The aio_return function returns the status of an asynchronous I/O operation after it completes, and the aio_error function returns the error status. A fifth function, aio_cancel , allows cancellation of asynchronous I/O operations that are already in progress.

The aio_read and aio_write functions take a single parameter, aiocbp , which is a pointer to an asynchronous I/O control block. The aio_read function reads aiocbp->aio_bytes from the file associated with aiocbp->aio_fildes into the buffer specified by aiocbp->aio_buf . The function returns when the request is queued. The aio_write function behaves similarly.

  SYNOPSIS  #include <aio.h>   int aio_read(struct aiocb *aiocbp);   int aio_write(struct aiocb *aiocbp);  POSIX:AIO  

If the request was successfully queued, aio_read and aio_write return 0. If unsuccessful , these functions return 1 and set errno . The following table lists the mandatory errors for these functions that are specific to asynchronous I/O.

errno

cause

EAGAIN

system did not have the resources to queue request ( B )

EBADF

aiocbp->aio_fildes invalid ( BA )

EFBIG

aiocbp->aio_offset exceeds maximum ( aio_write ) ( BA )

ECANCELED

request canceled because of explicit aio_cancel ( A )

EINVAL

invalid member of aiocbp ( BA )

EOVERFLOW

aiocbp->aio_offset exceeds maximum ( aio_read ) ( BA )

Errors that occur before the return of aio_read or aio_write have a B tag. These are values that errno can have if the call returns 1. The errors that may occur after the return have an A tag. These errors are returned by a subsequent call to aio_error . The aio_read and aio_write functions also have the mandatory errors of their respective read and write counterparts.

The struct aiocb structure has at least the following members .

 int             aio_fildes;     /* file descriptor */ volatile void   *aio_buf;       /* buffer location */ size_t          aio_nbytes;     /* length of transfer */ off_t           aio_offset;     /* file offset */ int             aio_reqprio;    /* request priority offset */ struct sigevent aio_sigevent;   /* signal number and value */ int             aio_lio_opcode; /* listio operation */ 

The first three members of this structure are similar to the parameters in an ordinary read or write function. The aio_offset specifies the starting position in the file for the I/O. If the implementation supports user scheduling ( _POSIX_PRIORITIZED_IO and _POSIX_PRIORITY_SCHEDULING are defined), aio_reqprio lowers the priority of the request. The aio_sigevent field specifies how the calling process is notified of the completion. If aio_sigevent.sigev_notify has the value SIGEV_NONE , the operating system does not generate a signal when the I/O completes. If aio_sigevent.sigev_notify is SIGEV_SIGNAL , the operating system generates the signal specified in aio_sigevent.sigev_signo . The aio_lio_opcode function is used by the lio_listio function (not discussed here) to submit multiple I/O requests.

The aio_error and aio_return functions return the status of the I/O operation designated by aiocbp . Monitor the progress of the asynchronous I/O operation with aio_error . When the operation completes, call aio_return to retrieve the number of bytes read or written.

  SYNOPSIS  #include <aio.h>  ssize_t aio_return(struct aiocb *aiocbp);  int aio_error(const struct aiocb *aiocbp);  POSIX:AIO  

The aio_error function returns 0 when the I/O operation has completed successfully or EINPROGRESS if the I/O operation is still executing. If the operation fails, aio_error returns the error code associated with the failure. This error status corresponds to the value of errno that would have been set by the corresponding read or write function. The aio_return function returns the status of a completed underlying I/O operation. If the operation was successful, the return value is the number of bytes read or written. Once aio_return has been called, neither aio_return nor aio_error should be called for the same struct aiocb until another asynchronous operation is started with this buffer. The results of aio_return are undefined if the asynchronous I/O has not yet completed.

POSIX asynchronous I/O can be used either with or without signals, depending on the setting of the sigev_notify field of the struct aiocb . Programs 8.13 and 8.14 illustrate how to do asynchronous I/O with signals. The general idea is to set up a signal handler that does all the work after the initial I/O operation is started.

Program 8.13 is a program for copying one file to another. The reading from the first file is done with asynchronous I/O, and the writing to the second file is done with ordinary I/O. This approach is appropriate if the input is from a pipe or a network connection that might block for long periods of time and if the output is to an ordinary file. Program 8.13 takes two filenames as command-line arguments and opens the first for reading and the second for writing. The program then calls the initsignal function to set up a signal handler and initread to start the first read. The signal is set up as a realtime signal as described in Section 9.4. The main program's loop calls dowork and checks to see if the asynchronous copy has completed with a call to getdone . When the copying is done, the program displays the number of bytes copied or an error message.

Program 8.14 contains the signal handler for the asynchronous I/O as well as initialization routines. The initread function sets up a struct aiocb structure for reading asynchronously and saves the output file descriptor in a global variable. It initializes three additional global variables and starts the first read with a call to readstart .

Program 8.14 keeps track of the first error that occurs in globalerror and the total number of bytes transferred in totalbytes . A doneflag has type sig_atomic_t so that it can be accessed atomically. This is necessary since it is modified asynchronously by the signal handler and can be read from the main program with a call to getdone . The variables globalerror and totalbytes are only available after the I/O is complete, so they are never accessed concurrently by the signal handler and the main program.

The signal handler in Program 8.14 uses the struct aiocb that is stored in the global variable aiocb . The signal handler starts by saving errno so that it can be restored when the handler returns. If the handler detects an error, it calls seterror to store errno in the variable globalerror , provided that this was the first error detected . The signal handler sets the doneflag if an error occurs or end-of-file is detected. Otherwise , the signal handler does a write to the output file descriptor and starts the next read.

Program 8.13 asyncsignalmain.c

A main program that uses asynchronous I/O with signals to copy a file while doing other work .

 #include <errno.h> #include <fcntl.h> #include <signal.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <sys/stat.h> #include "asyncmonitorsignal.h" #define BLKSIZE 1024 #define MODE (S_IRUSR  S_IWUSR  S_IRGRP  S_IROTH) void dowork(void); int main(int argc, char *argv[]) {    char buf[BLKSIZE];    int done = 0;    int error;    int fd1;    int fd2;                                         /* open the file descriptors for I/O */    if (argc != 3) {       fprintf(stderr, "Usage: %s filename1 filename2\n", argv[0]);       return 1;    }    if ((fd1 = open(argv[1], O_RDONLY)) == -1) {       fprintf(stderr, "Failed to open %s:%s\n", argv[1], strerror(errno));       return 1;    }    if ((fd2 = open(argv[2], O_WRONLY  O_CREAT  O_TRUNC, MODE)) == -1) {       fprintf(stderr, "Failed to open %s: %s\n", argv[2], strerror(errno));       return 1;    }    if (initsignal(SIGRTMAX) == -1) {       perror("Failed to initialize signal");       return 1;    }    if (initread(fd1, fd2, SIGRTMAX, buf, BLKSIZE) == -1) {       perror("Failed to initate the first read");       return 1;    }    for (; ;) {       dowork();       if (!done)          if (done = getdone())             if (error = geterror())                fprintf(stderr, "Failed to copy file:%s\n", strerror(error));             else                fprintf(stderr, "Copy successful, %d bytes\n", getbytes());    } } 
Program 8.14 asyncmonitorsignal.c

Utility functions for handling asynchronous I/O with signals .

 #include <aio.h> #include <errno.h> #include <signal.h> #include "restart.h" static struct aiocb aiocb; static sig_atomic_t doneflag; static int fdout; static int globalerror; static int totalbytes; static int readstart(); static void seterror(int error); /* ARGSUSED */ static void aiohandler(int signo, siginfo_t *info, void *context) {     int  myerrno;     int  mystatus;     int  serrno;     serrno = errno;     myerrno = aio_error(&aiocb);     if (myerrno == EINPROGRESS) {         errno = serrno;         return;     }     if (myerrno) {         seterror(myerrno);         errno = serrno;         return;     }     mystatus = aio_return(&aiocb);     totalbytes += mystatus;     if (mystatus == 0)         doneflag = 1;     else if (r_write(fdout, (char *)aiocb.aio_buf, mystatus) == -1)         seterror(errno);     else if (readstart() == -1)         seterror(errno);     errno = serrno; } static int readstart() {                     /* start an asynchronous read */     int error;     if (error = aio_read(&aiocb))         seterror(errno);     return error; } static void seterror(int error) {            /* update globalerror if zero */     if (!globalerror)         globalerror = error;     doneflag = 1; } /* --------------------------Public Functions ---------------------------- */ int getbytes() {     if (doneflag)         return totalbytes;     errno = EINVAL;     return -1; } int getdone() {                                          /* check for done */     return doneflag; } int geterror() {               /* return the globalerror value if doneflag */     if (doneflag)         return globalerror;     errno = EINVAL;     return errno; } int initread(int fdread, int fdwrite, int signo, char *buf, int bufsize) {     aiocb.aio_fildes = fdread;                          /* set up structure */     aiocb.aio_offset = 0;     aiocb.aio_buf = (void *)buf;     aiocb.aio_nbytes = bufsize;     aiocb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;     aiocb.aio_sigevent.sigev_signo = signo;     aiocb.aio_sigevent.sigev_value.sival_ptr = &aiocb;     fdout = fdwrite;     doneflag = 0;     globalerror = 0;     totalbytes = 0;     return readstart();                                 /* start first read */ } int initsignal(int signo) {        /* set up the handler for the async I/O */     struct sigaction newact;     newact.sa_sigaction = aiohandler;     newact.sa_flags = SA_SIGINFO;     if ((sigemptyset(&newact.sa_mask) == -1)            (sigaction(signo, &newact, NULL) == -1))         return -1;     return 0; } int suspenduntilmaybeready() {            /* return 1 if done, 0 otherwise */     const struct aiocb *aiocblist;     aiocblist = &aiocb;     aio_suspend(&aiocblist, 1, NULL);     return doneflag; } 

The r_write function from the restart library in Appendix B guarantees that all the bytes requested are written if possible. Program 8.14 also contains the suspenduntilmaybeready function, which is not used in Program 8.13 but will be described later.

The signal handler does not output any error messages. Output from an asynchronous signal handler can interfere with I/O operations in the main program, and the standard library routines such as fprintf and perror may not be safe to use in signal handlers. Instead, the signal handler just keeps track of the errno value of the first error that occurred. The main program can then print an error message, using strerror .

Example 8.29

The following command line calls Program 8.13 to copy from pipe1 to pipe2 .

 asyncsignalmain pipe1 pipe2 

Asynchronous I/O can be used without signals if the application has to do other work that can be broken into small pieces. After each piece of work, the program calls aio_error to see if the I/O operation has completed and handles the result if it has. This procedure is called polling .

Program 8.15 shows a main program that takes a number of filenames as parameters. The program reads each file, using asynchronous I/O, and calls processbuffer to process each input. While this is going on, the program calls dowork in a loop.

Program 8.15 uses utility functions from Program 8.16. The main program starts by opening each file and calling initaio to set up the appropriate information for each descriptor as an entry in the static array defined in Program 8.16. Each element of the array contains a struct aiocb structure for holding I/O and control information. Next, the first read for each file is started with a call to readstart . The program does not use signal handlers. The main program executes a loop in which it calls readcheck to check the status of each operation after each piece of dowork . If a read has completed, the main program calls processbuffer to handle the bytes read and starts a new asynchronous read operation. The main program keeps track of which file reads have completed (either successfully or due to an error) in an array called done .

Program 8.15 asyncpollmain.c

A main program that uses polling with asynchronous I/O to process input from multiple file descriptors while doing other work .

 #include <errno.h> #include <fcntl.h> #include <stdio.h> #include <string.h> #include "asyncmonitorpoll.h" void dowork(void); void processbuffer(int which, char *buf, int bufsize); int main(int argc, char *argv[]) {    char *buf;    int done[NUMOPS];    int fd[NUMOPS];    int i;    int numbytes, numfiles;    if (argc < 2) {       fprintf(stderr, "Usage: %s filename1 filename2 ...\n", argv[0]);       return 1;    } else if (argc > NUMOPS + 1) {       fprintf(stderr, "%s: only supports %d simultaneous operations\n",               argv[0],  NUMOPS);       return 1;    }    numfiles = argc - 1;    for (i = 0; i < numfiles; i++)  {            /* set up the I/O operations */       done[i] = 0;       if ((fd[i] = open(argv[i+1], O_RDONLY)) == -1) {          fprintf(stderr, "Failed to open %s:%s\n", argv[i+1], strerror(errno));          return 1;       }       if (initaio(fd[i], i) == -1) {          fprintf(stderr, "Failed to setup I/O op %d:%s\n", i, strerror(errno));          return 1;       }       if (readstart(i) == -1) {          fprintf(stderr, "Failed to start read %d:%s\n", i, strerror(errno));          return 1;       }    }    for (;  ;) {                                         /* loop and poll */       dowork();       for (i = 0; i < numfiles; i++) {          if (done[i])             continue;          numbytes = readcheck(i, &buf);          if ((numbytes == -1) && (errno == EINPROGRESS))             continue;          if (numbytes <= 0) {             if (numbytes == 0)                fprintf(stderr, "End of file on %d\n", i);             else                fprintf(stderr, "Failed to read %d:%s\n", i, strerror(errno));             done[i] = 1;             continue;          }          processbuffer(i, buf, numbytes);          reinit(i);          if (readstart(i) == -1) {             fprintf(stderr, "Failed to start read %d:%s\n", i, strerror(errno));             done[i] = 1;          }       }    } } 
Program 8.16 asyncmonitorpoll.c

Utility functions for handling asynchronous I/O with polling .

 #include <aio.h> #include <errno.h> #include <stdio.h> #include <unistd.h> #include "asyncmonitorpoll.h" #define BLKSIZE 1024                            /* size of blocks to be read */ typedef struct {     char buf[BLKSIZE];     ssize_t bytes;     struct aiocb control;     int doneflag;     int startedflag; } aio_t; static aio_t iops[NUMOPS];                         /* information for the op */ /* -------------------------- Public Functions ----------------------------- */ int initaio(int fd, int handle)       {          /* set up control structure */     if (handle >= NUMOPS) {         errno = EINVAL;         return -1;     }     iops[handle].control.aio_fildes = fd;              /* I/O operation on fd */     iops[handle].control.aio_offset = 0;     iops[handle].control.aio_buf = (void *)iops[handle].buf;     iops[handle].control.aio_nbytes = BLKSIZE;     iops[handle].control.aio_sigevent.sigev_notify = SIGEV_NONE;     iops[handle].doneflag = 0;     iops[handle].startedflag = 0;     iops[handle].bytes = 0;     return 0; } /* return -1 if not done or error              errno = EINPROGRESS if not done    otherwise, return number of bytes read with *buf pointing to buffer */ int readcheck(int handle, char **bufp) {   /* see if read for handle is done */     int error;     ssize_t numbytes;     struct aiocb *thisp;     thisp = &(iops[handle].control);            /* get a pointer to the aiocp */     if (iops[handle].doneflag) {       /* done already, don't call aio_return */         numbytes = iops[handle].bytes;         *bufp = (char *)iops[handle].control.aio_buf; /* set pointer to buffer */         return numbytes;     }     error = aio_error(thisp);     if (error) {         errno = error;         return -1;     }     numbytes = aio_return(thisp);     iops[handle].bytes = numbytes;     *bufp = (char *)iops[handle].control.aio_buf;    /* set pointer to buffer */     iops[handle].doneflag = 1;     return numbytes; } int readstart(int handle) {    /* start read for I/O corresponding to handle */     int error;     struct aiocb *thisp;     thisp = &(iops[handle].control);            /* get a pointer to the aiocp */     if (iops[handle].startedflag) {                        /* already started */         errno = EINVAL;         return -1;     }     if ((error = aio_read(thisp)) == -1) {         errno = error;         return -1;     }     iops[handle].startedflag = 1;     return 0; } void reinit(int handle) {   /* must be called before doing another readstart */     iops[handle].doneflag = 0;     iops[handle].startedflag = 0;     iops[handle].bytes = 0; } 
Example 8.30

The following command line calls Program 8.15 for inputs pipe1 , pipe2 and pipe3 .

 asyncpollmain pipe1 pipe2 pipe3 

What if a program starts asynchronous I/O operations as in Program 8.13 and runs out of other work to do? Here are several options for avoiding busy waiting.

  1. Switch to using standard blocking I/O with select .

  2. Use signals as in Program 8.13, or use pause or sigsuspend in a loop. Do not use sigwait , since this function requires the signals to be blocked.

  3. Switch to using signals as in Program 8.15 by blocking the signal and calling sigwait in a loop.

  4. Use aio_suspend .

The aio_suspend function takes three parameters, an array of pointers to struct aiocb structures, the number of these structures and a timeout specification. If the timeout specification is not NULL , aio_suspend may return after the specified time. Otherwise, it returns when at least one of the I/O operations has completed and aio_error no longer returns EINPROGRESS . Any of the entries in the array may be NULL , in which case they are ignored.

  SYNOPSIS  #include <aio.h>   int aio_suspend(const struct aiocb * const list[], int nent,                   const struct timespec *timeout);  POSIX:AIO  

If successful, aio_suspend returns 0. If unsuccessful, aio_suspend returns 1 and sets errno . The following table lists the mandatory errors for aio_suspend .

errno

cause

EAGAIN

timeout occurred before asynchronous I/O completed

EINTR

a signal interrupted aio_suspend

Program 8.14 has a suspenduntilmaybeready function that uses aio_suspend to suspend the calling process until the asynchronous I/O operation is ready. It can be called from the main program of Program 8.13 in place of dowork when there is no other work to be done. In this case, there is only one asynchronous I/O operation and the function returns 1 if it has completed, and 0 otherwise.

The aio_cancel function attempts to cancel one or more asynchronous I/O requests on the file descriptor fildes . The aiocbp parameter points to the control block for the request to be canceled. If aiocbp is NULL , the aio_cancel function attempts to cancel all pending requests on fildes .

  SYNOPSIS  #include <aio.h>   int aio_cancel(int fildes, struct aioch *aiocbp);  POSIX:AIO  

The aio_cancel function returns AIO_CANCELED if the requested operations were successfully canceled or AIO_NOTCANCELED if at least one of the requested operations could not be canceled because it was in progress. It returns AIO_ALLDONE if all the operations have already completed. Otherwise, the aio_cancel function returns 1 and sets errno . The aio_cancel function sets errno to EBADF if the fildes parameter does not correspond to a valid file descriptor.

Exercise 8.31

How would you modify Programs 8.15 and 8.16 so that a SIGUSR1 signal cancels all the asynchronous I/O operations without affecting the rest of the program?

Answer:

Set up a signal handler for SIGUSR1 in asyncmonitorpoll that cancels all pending operations using aio_cancel . Also set a flag signifying that all I/O has been canceled. The readcheck function checks this flag. If the flag is set, readcheck returns 1 with errno set to ECANCELED .

Team-FLY


Unix Systems Programming
UNIX Systems Programming: Communication, Concurrency and Threads
ISBN: 0130424110
EAN: 2147483647
Year: 2003
Pages: 274

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net