Appendix A. Sample Programs

The programs in this appendix illustrate the use of some of the features discussed in the book. The following programs are included:

Asynchronous I/O Example

The program in Example A-1 demonstrates the use some asynchronous I/O functions. The basic purpose of the program is to read a list of input files and write their concatenated contents as its output—work that does not normally require asynchronous I/O. However, this test program reads the input files using aio_read(), and writes the output files using aio_write() and aio_fsync(). In addition, it can be compiled in either of two ways,

  • to copy the input files one at a time, using subroutine calls

  • to copy the input files concurrently, using a separate process for each input file

There is no functional advantage to using multiple processes. Doing so merely makes the example more interesting. It also demonstrates that, even though multiple processes ask for output at different points in the same file at the same time, the output is written to the requested offsets.

The reading and writing is done in one of four functions. The functions all perform the following sequence of actions:

  1. Initialize the aiocb for the type of notification desired. The type of notification is the principal difference between the functions: some use signals, some callback functions, some no notification.

  2. Until the input file is exhausted,

    • Call aio_read() for up to one BLOCKSIZE amount from the next offset in the input file

    • Wait for the read to complete

    • Call aio_write() to write the data read to the next offset in the output file

    • Wait for the write to complete

  3. Use aio_fsync() to ensure that output is complete and wait for it to complete.

The four functions, inProc0() through inProc3(), differ only in the method they use to wait for completion.

  • inProc0() alternates calling aio_error() with sginap() until the status is other than EINPROGRESS.

  • inProc1() calls aio_suspend() to wait for the current operation.

  • inProc2() sets the aiocb to request a signal on completion. Then it waits on a semaphore that is posted from the signal handler function.

  • inProc3() waits on a semaphore which is posted from a callback function.

You select which of the four function to use with the -a argument to the program. If you compile the program with the variable DO_SPROCS defined as 0, the chosen function is called as a subroutine once for each input file. If you compile with DO_SPROCS defined as 1, the chosen function is launched by sprocsp() once for each input file.

Example A-1. Asynchronous I/O Example Program


/* ============================================================================ 
||  aiocat.c : This highly artificial example demonstrates asynchronous I/O. 
||
|| The command syntax is:
||  aiocat [ -o outfile ] [-a {0|1|2|3} ] infilename...
||
|| The output file is given by -o, with $TMPDIR/aiocat.out by default.
|| The aio method of waiting for completion is given by -a as follows:
||  -a 0 poll for completion with aio_error() (default)
||  -a 1 wait for completion with aio_suspend()
||  -a 2 wait on a semaphore posted from a signal handler
||  -a 3 wait on a semaphore posted from a callback routine
||
|| Up to MAX_INFILES input files may be specified. Each input file is
|| read in BLOCKSIZE units. The output file contains the data from
|| the input files in the order they were specified. Thus the
|| output should be the same as "cat infilename... >outfile".
||
|| When DO_SPROCS is compiled true, all I/O is done asynchronously
|| and concurrently using one sproc'd process per file.  Thus in a
|| multiprocessor concurrent input can be done.
============================================================================ */

#define _SGI_MP_SOURCE  /* see the "Caveats" section of sproc(2) */
#include <sys/time.h>   /* for clock() */
#include <errno.h>      /* for perror() */
#include <stdio.h>      /* for printf() */
#include <stdlib.h>     /* for getenv(), malloc(3c) */
#include <ulocks.h>     /* usinit() & friends */
#include <bstring.h>    /* for bzero() */
#include <sys/resource.h> /* for prctl, get/setrlimit() */
#include <sys/prctl.h>  /* for prctl() */
#include <sys/types.h>  /* required by lseek(), prctl */
#include <unistd.h>     /* ditto */
#include <sys/types.h>  /* wanted by sproc() */
#include <sys/prctl.h>  /* ditto */
#include <signal.h>     /* for signals - gets sys/signal and sys/siginfo */
#include <aio.h>        /* async I/O */

#define BLOCKSIZE 2048  /* input units -- play with this number */
#define MAX_INFILES 10  /* max sprocs: anything from 4 to 20 or so */
#define DO_SPROCS 1     /* set 0 to do all I/O in a single process */

#define QUITIFNULL(PTR,MSG) if (NULL==PTR) {perror(MSG);return(errno);}
#define QUITIFMONE(INT,MSG) if (-1==INT) {perror(MSG);return(errno);}
/*****************************************************************************
|| The following structure contains the info needed by one child proc.
|| The main program builds an array of MAX_INFILES of these.
|| The reason for storing the actual filename here (not a pointer) is
|| to force the struct to >128 bytes.  Then, when the procs run in 
|| different CPUs on a CHALLENGE, the info structs will be in different
|| cache lines, and a store by one proc will not invalidate a cache line
|| for its neighbor proc.
*/
typedef struct child
{
        /* read-only to child */
    char fname[100];        /* input filename from argv[n] */
    int         fd;         /* FD for this file */
    void*       buffer;     /* buffer for this file */
    int         procid;     /* process ID of child process */
    off_t       fsize;      /* size of this input file */
        /* read-write to child */
    usema_t*    sema;       /* semaphore used by methods 2 & 3 */
    off_t       outbase;    /* starting offset in output file */
    off_t       inbase;     /* current offset in input file */
    clock_t     etime;      /* sum of utime/stime to read file */
    aiocb_t     acb;        /* aiocb used for reading and writing */
} child_t;

/******************************************************************************
|| Globals, accessible to all processes
*/
char*       ofName = NULL;  /* output file name string */
int         outFD;          /* output file descriptor */
usptr_t*    arena;          /* arena where everything is built */
barrier_t*  convene;        /* barrier used to sync up */
int         nprocs = 1;     /* 1 + number of child procs */
child_t*    array;          /* array of child_t structs in arena */
int         errors = 0;     /* always incremented on an error */

/******************************************************************************
|| forward declaration of the child process functions
*/
void inProc0(void *arg, size_t stk);    /* polls with aio_error() */
void inProc1(void *arg, size_t stk);    /* uses aio_suspend() */
void inProc2(void *arg, size_t stk);    /* uses a signal and semaphore */
void inProc3(void *arg, size_t stk);    /* uses a callback and semaphore */

/******************************************************************************
// The main()
*/
int main(int argc, char **argv)
{
    char*       tmpdir;         /* ->name string of temp dir */
    int         nfiles;         /* how many input files on cmd line */
    int         argno;          /* loop counter */
    child_t*    pc;             /* ->child_t of current file */
    void (*method)(void *,size_t) = inProc0; /* ->chosen input method */
    char        arenaPath[128]; /* build area for arena pathname */
    char        outPath[128];   /* build area for output pathname */    
    /*
    || Ensure the name of a temporary directory.
    */
    tmpdir = getenv("TMPDIR");
    if (!tmpdir) tmpdir = "/var/tmp";
    /*
    || Build a name for the arena file.
    */
    strcpy(arenaPath,tmpdir);
    strcat(arenaPath,"/aiocat.wrk");
    /*
    || Create the arena. First, call usconfig() to establish the
    || minimum size (twice the buffer size per file, to allow for misc usage)
    || and the (maximum) number of processes that may later use
    || this arena.  For this program that is MAX_INFILES+10, allowing
    || for our sprocs plus those done by aio_sgi_init().
    || These values apply to any arenas made subsequently, until changed.
    */
    {
        ptrdiff_t ret;
        ret = usconfig(CONF_INITSIZE,2*BLOCKSIZE*MAX_INFILES);
        QUITIFMONE(ret,"usconfig size")
        ret = usconfig(CONF_INITUSERS,MAX_INFILES+10);
        QUITIFMONE(ret,"usconfig users")
        arena = usinit(arenaPath);
        QUITIFNULL(arena,"usinit")
    }
    /*
    || Allocate the barrier.
    */
    convene = new_barrier(arena);
    QUITIFNULL(convene,"new_barrier")
    /*
    || Allocate the array of child info structs and zero it.
    */
    array = (child_t*)usmalloc(MAX_INFILES*sizeof(child_t),arena);
    QUITIFNULL(array,"usmalloc")
    bzero((void *)array,MAX_INFILES*sizeof(child_t));
    /*
    || Loop over the arguments, setting up child structs and
    || counting input files.  Quit if a file won't open or seek,
    || or if we can't get a buffer or semaphore.
    */
    for (nfiles=0, argno=1; argno < argc; ++argno )
    {
        if (0 == strcmp(argv[argno],"-o"))
        { /* is the -o argument */
            ++argno;
            if (argno < argc)
                ofName = argv[argno];
            else
            {
                fprintf(stderr,"-o must have a filename after\n");
                return -1;
            }
        }
        else if (0 == strcmp(argv[argno],"-a"))
        { /* is the -a argument */
            char c = argv[++argno][0];
            switch(c)
            {
            case '0' : method = inProc0; break;
            case '1' : method = inProc1; break;
            case '2' : method = inProc2; break;
            case '3' : method = inProc3; break;
            default:
                {
                    fprintf(stderr,"unknown method -a %c\n",c);
                    return -1;
                }
            }
        }
        else if ('-' == argv[argno][0])
        { /* is unknown -option */
            fprintf(stderr,"aiocat [-o outfile] [-a 0|1|2|3] infiles...\n");
            return -1;
        }
        else    
        { /* neither -o nor -a, assume input file */
            if (nfiles < MAX_INFILES)
            {
                /*
                || save the filename
                */
                pc = &array[nfiles];
                strcpy(pc->fname,argv[argno]);
                /*
                || allocate a buffer and a semaphore.  Not all
                || child procs use the semaphore but so what?
                */
                pc->buffer = usmalloc(BLOCKSIZE,arena);
                QUITIFNULL(pc->buffer,"usmalloc(buffer)")
                pc->sema = usnewsema(arena,0);
                QUITIFNULL(pc->sema,"usnewsema")
                /*
                || open the file
                */
                pc->fd = open(pc->fname,O_RDONLY);
                QUITIFMONE(pc->fd,"open")
                /*
                || get the size of the file. This leaves the file
                || positioned at-end, but there is no need to reposition 
                || because all aio_read calls have an implied lseek.
                || NOTE: there is no check for zero-length file; that
                || is a valid (and interesting) test case.
                */
                pc->fsize = lseek(pc->fd,0,SEEK_END);
                QUITIFMONE(pc->fsize,"lseek")
                /*
                || set the starting base address of this input file
                || in the output file.  The first file starts at 0.
                || Each one after starts at prior base + prior size.
                */
                if (nfiles) /* not first */
                    pc->outbase =
                        array[nfiles-1].fsize + array[nfiles-1].outbase;
                ++nfiles;
            }
            else
            {
                printf("Too many files, %s ignored\n",argv[argno]);
            }
        }
    } /* end for(argc) */
    /*
    || If there was no -o argument, construct an output file name.
    */
    if (!ofName)
    {
        strcpy(outPath,tmpdir);
        strcat(outPath,"/aiocat.out");
        ofName = outPath;
    }
    /*
    || Open, creating or truncating, the output file.
    || Do not use O_APPEND, which would constrain aio to doing
    || operations in sequence.
    */
    outFD = open(ofName, O_WRONLY+O_CREAT+O_TRUNC,0666);
    QUITIFMONE(outFD,"open(output)")
    /*
    || If there were no input files, just quit, leaving empty output
    */
    if (!nfiles)
    {
        return 0;
    }
    /*
    || Note the number of processes-to-be, for use in initializing
    || aio and for use by each child in a barrier() call.
    */
    nprocs = 1+nfiles;
    /*
    || Initialize async I/O using aio_sgi_init(), in order to specify
    || a number of locks at least equal to the number of child procs
    || and in order to specify extra sproc users.
    */
    { 
        aioinit_t ainit = {0}; /* all fields initially zero */
        /*
        || Go with the default 5 for the number of aio-created procs,
        || as we have no way of knowing the number of unique devices.
        */
#define AIO_PROCS 5
        ainit.aio_threads = AIO_PROCS;
        /*
        || Set the number of locks aio needs to the number of procs
        || we will start, minimum 3.
        */
        ainit.aio_locks = (nprocs > 2)?nprocs:3;
        /*
        || Warn aio of the number of user procs that will be
        || using its arena.
        */
        ainit.aio_numusers = nprocs;
        aio_sgi_init(&ainit);
    }
    /*
    || Process each input file, either in a child process or in
    || a subroutine call, as specified by the DO_SPROCS variable.
    */
    for (argno = 0; argno < nfiles; ++argno)
    {
        pc = &array[argno];
#if DO_SPROCS
#define CHILD_STACK 64*1024
        /*
        || For each input file, start a child process as an instance
        || of the selected method (-a argument).
        || If an error occurs, quit. That will send a SIGHUP to any
        || already-started child, which will kill it, too.
        */ 
        pc->procid = sprocsp(method     /* function to start */
                            ,PR_SALL    /* share all, keep FDs sync'd */
                            ,(void *)pc /* argument to child func */
                            ,NULL       /* absolute stack seg */
                            ,CHILD_STACK);  /* max stack seg growth */
        QUITIFMONE(pc->procid,"sproc")
#else
        /*
        || For each input file, call the selected (-a) method as a
        || subroutine to copy its file.
        */
        fprintf(stderr,"file %s...",pc->fname);
        method((void*)pc,0);
        if (errors) break;
        fprintf(stderr,"done\n");
#endif
    }
#if DO_SPROCS
    /*
    || Wait for all the kiddies to get themselves initialized.
    || When all have started and reached barrier(), all continue.
    || If any errors occurred in initialization, quit.
    */
    barrier(convene,nprocs); 
    /*
    || Child processes are executing now. Reunite the family round the
    || old hearth one last time, when their processing is complete.
    || Each child ensures that all its output is complete before it
    || invokes barrier().
    */
    barrier(convene,nprocs);
#endif
    /*
    || Close the output file and print some statistics.
    */
    close(outFD);
    {
        clock_t timesum;
        long    bytesum;
        double  bperus;
        printf("    procid   time     fsize     filename\n");
        for(argno = 0, timesum = bytesum = 0 ; argno < nfiles ; ++argno)
        {
            pc = &array[argno];
            timesum += pc->etime;
            bytesum += pc->fsize;
            printf("%2d: %-8d %-8d %-8d  %s\n"
                    ,argno,pc->procid,pc->etime,pc->fsize,pc->fname);
        }
        bperus = ((double)bytesum)/((double)timesum);
        printf("total time %d usec, total bytes %d, %g bytes/usec\n"
                     ,timesum            , bytesum , bperus);
    }
    /*
    || Unlink the arena file, so it won't exist when this progam runs
    || again. If it did exist, it would be used as the initial state of
    || the arena, which might or might not have any effect.
    */
    unlink(arenaPath);
    return 0;
}
/******************************************************************************
|| inProc0() alternates polling with aio_error() with sginap(). Under
|| the Frame Scheduler, it would use frs_yield() instead of sginap().
|| The general pattern of this function is repeated in the other three;
|| only the wait method varies from function to function.
*/
int inWait0(child_t *pch)
{
    int ret;
    aiocb_t* pab = &pch->acb;
    while (EINPROGRESS == (ret = aio_error(pab))) 
    {
        sginap(0); 
    }
    return ret;
}
void inProc0(void *arg, size_t stk)
{
    child_t *pch = arg;         /* starting arg is ->child_t for my file */
    aiocb_t *pab = &pch->acb;   /* base address of the aiocb_t in child_t */
    int ret;                    /* as long as this is 0, all is ok */
    int bytes;                  /* #bytes read on each input */
    /*
    || Initialize -- no signals or callbacks needed.
    */
    pab->aio_sigevent.sigev_notify = SIGEV_NONE;
    pab->aio_buf = pch->buffer; /* always the same */
#if DO_SPROCS
    /*
    || Wait for the starting gun...
    */
    barrier(convene,nprocs);
#endif
    pch->etime = clock();
    do /* read and write, read and write... */
    {
        /*
        || Set up the aiocb for a read, queue it, and wait for it.
        */
        pab->aio_fildes = pch->fd;
        pab->aio_offset = pch->inbase;
        pab->aio_nbytes = BLOCKSIZE;
        if (ret = aio_read(pab)) 
            break;  /* unable to schedule a read */
        ret = inWait0(pch);
        if (ret)
            break;  /* nonzero read completion status */
        /*
        || get the result of the read() call, the count of bytes read.
        || Since aio_error returned 0, the count is nonnegative.
        || It could be 0, or less than BLOCKSIZE, indicating EOF.
        */ 
        bytes = aio_return(pab); /* actual read result */
        if (!bytes)
            break;  /* no need to write a last block of 0 */
        pch->inbase += bytes;   /* where to read next time */
        /*
        || Set up the aiocb for a write, queue it, and wait for it.
        */
        pab->aio_fildes = outFD;
        pab->aio_nbytes = bytes;
        pab->aio_offset = pch->outbase;
        if (ret = aio_write(pab)) 
            break;
        ret = inWait0(pch);
        if (ret)
            break;
        pch->outbase += bytes;  /* where to write next time */
    } while ((!ret) && (bytes == BLOCKSIZE));
    /*
    || The loop is complete.  If no errors so far, use aio_fsync()
    || to ensure that output is complete.  This requires waiting
    || yet again.
    */
    if (!ret)
    { 
        if (!(ret = aio_fsync(O_SYNC,pab)))
        ret = inWait0(pch);
    }
    /*
    || Flag any errors for the parent proc. If none, count elapsed time.
    */
    if (ret) ++errors;
    else pch->etime = (clock() - pch->etime);
#if DO_SPROCS
    /*
    || Rendezvous with the rest of the family, then quit.
    */
    barrier(convene,nprocs);
#endif
    return;
} /* end inProc1 */
/******************************************************************************
|| inProc1 uses aio_suspend() to await the completion of each operation.
|| Otherwise it is the same as inProc0, above.
*/

int inWait1(child_t *pch)
{
    int ret;
    aiocb_t* susplist[1]; /* list of 1 aiocb for aio_suspend() */
    susplist[0] = &pch->acb;
    /*
    || Note: aio.h declares the 1st argument of aio_suspend() as "const."
    || The C compiler requires the actual-parameter to match in type,
    || so the list we pass must either be declared "const aiocb_t*" or
    || must be cast to that -- else cc gives a warning.  The cast
    || in the following statement is only to avoid this warning.
    */ 
    ret = aio_suspend( (const aiocb_t **) susplist,1,NULL);
    return ret;
}
void inProc1(void *arg, size_t stk)
{
    child_t *pch = arg;         /* starting arg is ->child_t for my file */
    aiocb_t *pab = &pch->acb;   /* base address of the aiocb_t in child_t */
    int ret;                    /* as long as this is 0, all is ok */
    int bytes;                  /* #bytes read on each input */
    /*
    || Initialize -- no signals or callbacks needed.
    */
    pab->aio_sigevent.sigev_notify = SIGEV_NONE;
    pab->aio_buf = pch->buffer; /* always the same */
#if DO_SPROCS
    /*
    || Wait for the starting gun...
    */
    barrier(convene,nprocs);
#endif
    pch->etime = clock();
    do /* read and write, read and write... */
    {
        /*
        || Set up the aiocb for a read, queue it, and wait for it.
        */
        pab->aio_fildes = pch->fd;
        pab->aio_offset = pch->inbase;
        pab->aio_nbytes = BLOCKSIZE;
        if (ret = aio_read(pab))
            break;
        ret = inWait1(pch);
        /*
        || If the aio_suspend() return is nonzero, it means that the wait
        || did not end for i/o completion but because of a signal. Since we
        || expect no signals here, we take that as an error.
        */
        if (!ret) /* op is complete */
            ret = aio_error(pab);  /* read() status, should be 0 */
        if (ret)
            break;  /* signal, or nonzero read completion */
        /*
        || get the result of the read() call, the count of bytes read.
        || Since aio_error returned 0, the count is nonnegative.
        || It could be 0, or less than BLOCKSIZE, indicating EOF.
        */
        bytes = aio_return(pab); /* actual read result */
        if (!bytes)
            break;  /* no need to write a last block of 0 */
        pch->inbase += bytes;   /* where to read next time */
        /*
        || Set up the aiocb for a write, queue it, and wait for it.
        */
        pab->aio_fildes = outFD;
        pab->aio_nbytes = bytes;
        pab->aio_offset = pch->outbase;
        if (ret = aio_write(pab))
            break;
        ret = inWait1(pch);
        if (!ret) /* op is complete */
            ret = aio_error(pab);  /* should be 0 */
        if (ret)
            break;
        pch->outbase += bytes;  /* where to write next time */
    } while ((!ret) && (bytes == BLOCKSIZE));
    /*
    || The loop is complete.  If no errors so far, use aio_fsync()
    || to ensure that output is complete.  This requires waiting
    || yet again.
    */
    if (!ret)
    {
        if (!(ret = aio_fsync(O_SYNC,pab)))
            ret = inWait1(pch);
    }
    /*
    || Flag any errors for the parent proc. If none, count elapsed time.
    */
    if (ret) ++errors;
    else pch->etime = (clock() - pch->etime);
#if DO_SPROCS
    /*
    || Rendezvous with the rest of the family, then quit.
    */
    barrier(convene,nprocs);
#endif
} /* end inProc0 */
/******************************************************************************
|| inProc2 requests a signal upon completion of an I/O. After starting
|| an operation, it P's a semaphore which is V'd from the signal handler.
*/
#define AIO_SIGNUM SIGRTMIN+1 /* arbitrary choice of signal number */
void sigHandler2(const int signo, const struct siginfo *sif )
{
    /*
    || In this minimal signal handler we pick up the address of the
    || child_t info structure -- which was put in aio_sigevent.sigev_value
    || field during initialization -- and use it to find the semaphore.
    */
    child_t *pch = sif->si_value.sival_ptr ;
    usvsema(pch->sema); 
    return; /* stop here with dbx to print the above address */
}
int inWait2(child_t *pch)
{
    /*
    || Wait for any signal handler to post the semaphore.  The signal
    || handler could have been entered before this function is called,
    || or it could be entered afterward.
    */
    uspsema(pch->sema); 
    /*
    || Since this process executes only one aio operation at a time,
    || we can return the status of that operation.  In a more complicated
    || design, if a signal could arrive from more than one pending
    || operation, this function could not return status.
    */
    return aio_error(&pch->acb);
}
void inProc2(void *arg, size_t stk)
{
    child_t *pch = arg;         /* starting arg is ->child_t for my file */
    aiocb_t *pab = &pch->acb;   /* base address of the aiocb_t in child_t */
    int ret;                    /* as long as this is 0, all is ok */
    int bytes;                  /* #bytes read on each input */
    /*
    || Initialize -- request a signal in aio_sigevent. The address of
    || the child_t struct is passed as the siginfo value, for use
    || in the signal handler.
    */
    pab->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
    pab->aio_sigevent.sigev_signo = AIO_SIGNUM;
    pab->aio_sigevent.sigev_value.sival_ptr = (void *)pch;
    pab->aio_buf = pch->buffer; /* always the same */
    /*
    || Initialize -- set up a signal handler for AIO_SIGNUM.
    */
    { 
        struct sigaction sa = {SA_SIGINFO,sigHandler2};
        ret = sigaction(AIO_SIGNUM,&sa,NULL);
        if (ret) ++errors; /* parent will shut down ASAP */
    }   
#if DO_SPROCS
    /*
    || Wait for the starting gun...
    */
    barrier(convene,nprocs);
#else
    if (ret) return;
#endif
    pch->etime = clock();
    do /* read and write, read and write... */
    {
        /*
        || Set up the aiocb for a read, queue it, and wait for it.
        */
        pab->aio_fildes = pch->fd;
        pab->aio_offset = pch->inbase;
        pab->aio_nbytes = BLOCKSIZE;
        if (!(ret = aio_read(pab)))
            ret = inWait2(pch);
        if (ret)
            break;  /* could not start read, or it ended badly */
        /*
        || get the result of the read() call, the count of bytes read.
        || Since aio_error returned 0, the count is nonnegative.
        || It could be 0, or less than BLOCKSIZE, indicating EOF.
        */
        bytes = aio_return(pab); /* actual read result */
        if (!bytes)
            break;  /* no need to write a last block of 0 */
        pch->inbase += bytes;   /* where to read next time */
        /*
        || Set up the aiocb for a write, queue it, and wait for it.
        */
        pab->aio_fildes = outFD;
        pab->aio_nbytes = bytes;
        pab->aio_offset = pch->outbase;
        if (!(ret = aio_write(pab)))
             ret = inWait2(pch);
        if (ret)
            break;
        pch->outbase += bytes;  /* where to write next time */
    } while ((!ret) && (bytes == BLOCKSIZE));
    /*
    || The loop is complete.  If no errors so far, use aio_fsync()
    || to ensure that output is complete.  This requires waiting
    || yet again.
    */
    if (!ret)
    {
        if (!(ret = aio_fsync(O_SYNC,pab)))
            ret = inWait2(pch);
    }
    /*
    || Flag any errors for the parent proc. If none, count elapsed time.
    */
    if (ret) ++errors;
    else pch->etime = (clock() - pch->etime);
#if DO_SPROCS
    /*
    || Rendezvous with the rest of the family, then quit.
    */
    barrier(convene,nprocs);
#endif
} /* end inProc2 */

/******************************************************************************
|| inProc3 uses a callback and a semaphore. It waits with a P operation.
|| The callback function executes a V operation.  This may come before or
|| after the P operation.
*/
void callBack3(union sigval usv)
{
    /*
    || The callback function receives the pointer to the child_t struct,
    || as prepared in aio_sigevent.sigev_value.sival_ptr.  Use this to 
    || post the semaphore in the child_t struct.
    */
    child_t *pch = usv.sival_ptr;
    usvsema(pch->sema);
    return;
}
int inWait3(child_t *pch)
{
    /*
    || Suspend, if necessary, by polling the semaphore.  The callback
    || function might be entered before we reach this point, or after.
    */
    uspsema(pch->sema);
    /*
    || Return the status of the aio operation associated with the sema.
    */
    return aio_error(&pch->acb);    
}
void inProc3(void *arg, size_t stk)
{
    child_t *pch = arg;         /* starting arg is ->child_t for my file */
    aiocb_t *pab = &pch->acb;   /* base address of the aiocb_t in child_t */
    int ret;                    /* as long as this is 0, all is ok */
    int bytes;                  /* #bytes read on each input */
    /*
    || Initialize -- request a callback in aio_sigevent. The address of
    || the child_t struct is passed as the siginfo value to be passed
    || into the callback. 
    */
    pab->aio_sigevent.sigev_notify = SIGEV_CALLBACK;
    pab->aio_sigevent.sigev_func = callBack3;
    pab->aio_sigevent.sigev_value.sival_ptr = (void *)pch;
    pab->aio_buf = pch->buffer; /* always the same */
#if DO_SPROCS
    /*
    || Wait for the starting gun...
    */
    barrier(convene,nprocs);
#endif
    pch->etime = clock();
    do /* read and write, read and write... */
    {
        /*
        || Set up the aiocb for a read, queue it, and wait for it.
        */
        pab->aio_fildes = pch->fd;
        pab->aio_offset = pch->inbase;
        pab->aio_nbytes = BLOCKSIZE;
        if (!(ret = aio_read(pab)))
            ret = inWait3(pch);
        if (ret)
            break;  /* read error */
        /*
        || get the result of the read() call, the count of bytes read.
        || Since aio_error returned 0, the count is nonnegative.
        || It could be 0, or less than BLOCKSIZE, indicating EOF.
        */
        bytes = aio_return(pab); /* actual read result */
        if (!bytes)
            break;  /* no need to write a last block of 0 */
        pch->inbase += bytes;   /* where to read next time */
        /*
        || Set up the aiocb for a write, queue it, and wait for it.
        */
        pab->aio_fildes = outFD;
        pab->aio_nbytes = bytes;
        pab->aio_offset = pch->outbase;
        if (!(ret = aio_write(pab)))
             ret = inWait3(pch);
        if (ret)
            break;
        pch->outbase += bytes;  /* where to write next time */
    } while ((!ret) && (bytes == BLOCKSIZE));
    /*
    || The loop is complete.  If no errors so far, use aio_fsync()
    || to ensure that output is complete.  This requires waiting
    || yet again.
    */
    if (!ret)
    {
        if (!(ret = aio_fsync(O_SYNC,pab)))
            ret = inWait3(pch);
    }
    /*
    || Flag any errors for the parent proc. If none, count elapsed time.
    */
    if (ret) ++errors;
    else pch->etime = (clock() - pch->etime);
#if DO_SPROCS
    /*
    || Rendezvous with the rest of the family, then quit.
    */
    barrier(convene,nprocs);
#endif
} /* end inProc3 */  

Guaranteed-Rate Request

The following subroutine simplifies the task of requesting a guaranteed rate of I/O transfer. The file descriptor passed to function requestRate() must describe a file located in the real-time subvolume of a volume managed by XLV and XFS.

/*
 * Simple function to request a guaranteed rate reservation.
 * Input:
 *      fd      file descriptor to be guaranteed
 *      dur     duration of guarantee in seconds
 *      bps     bytes per second required
 *      flag    one of SOFT_ or HARD_GUARANTEE [+VOD_LAYOUT]
 *              (extra entry points included for those who do not
 *              want to include sys/grio.h)
 *
 * Assumed:
 *      reservation start time of "1 second from now"
 *      guarantee unit time of 1 second
 *
 * Returns:
 *       0    success,  guarantee granted
 *      -1    error returned and displayed with perror()
 *      +n    n is the best bytes/second that XFS can offer
 *
 * Usage:
 *      #define BEST_RATE zillions
 *      #define MINIMAL_RATE somewhat_less
 *      if ( (ret = requestRate(fd, dur, BEST_RATE, SOFT_GUARANTEE)) )
 *      { // not a success
 *        if (ret >= MINIMAL_RATE) // acceptable lower rate offered
 *        ret = requestRate(fd, dur, ret, SOFT_GUARANTEE);
 *      }
 *      if (ret) // failed for some reason
 *      {
 *        if (0<ret) // not an error as such
 *           fprintf(stderr, "Cannot get rate\n");
 *        exit();
 *      }
 *      // guaranteed rate obtained, continue    
 */
#include <sys/types.h>  /* for time_t */
#include <time.h>       /* for time() */
#include <errno.h>      /* for error codes */
#include <stdio.h>      /* [fs]printf() */
#include "grio.h"       /* for grio_* */

/*
 * This subroutine displays a diagnostic message to stderr when
 * grio_request() returns an error. perror() cannot be used in
 * this case because the generic messages are not descriptive.
 * 
 */
void printGRIOerror( grio_resv_t *g )
{
    char work[80];
    char *msg = work;
    
    switch (g->gr_error)
    {
    case EINVAL:
    {
        msg = "unable to contact grio daemon";
        break;
    }
    case EBADF:
    {
        msg = "cannot stat file, or file already guaranteed";
        break;
    }
    case ESRCH:
    {
        msg = "invalid procid";
        break;
    }
    case ENOENT:
    {
        msg = "file has no (real-time?) extents";
        break;
    }
    case EIO:
    {
        msg = "incorrect start time";
        break;
    }
    case EACCES:
    {
        msg = (g->gr_flags & VOD_LAYOUT)
              ? "unable to provide VOD guarantee"
              : (
                (g->gr_flags & HARD_GUARANTEE)
                ? "unable to provide HARD guarantee"
                : "unable to provide SOFT guarantee"
            );
        break;
    }
    case ENOSPC:
    {
        sprintf(work, "out of bandwidth on device %s",
                    g->gr_errordev);
        break;      
    }
    default: /* in case they think of something else */
    {
        sprintf(work, "error %d", g->gr_error);
    }
    }
    fprintf(stderr, "grio_request: %s.\n", msg);
}

/*
 * This function actually places the request.
 */
int requestRate( int fd, int dur, int bps, int flag)
{
    int ret;
    grio_resv_t grio;
    
    grio.gr_duration= dur;
    grio.gr_start   = 1+time(NULL);
    grio.gr_optime  = 1; /* unit time is 1 second */
    grio.gr_opsize  = bps;
    grio.gr_flags   = flag;
    ret = grio_request(fd, &grio); 
    if (ret) /* not a success */
    {
        if ( (ENOSPC == grio.gr_error) /* insufficient bandwidth.. */
        &&   (grio.gr_opsize) ) /* ..but nonzero rate remaining */
            ret = grio.gr_opsize; /* return available rate */
        else /* some other problem or 0 bandwidth available */
            printGRIOerror(&grio);
    }
    return ret;
}
/*
 * When you don't want to #include sys/grio.h to define one constant...
 */
int requestHardRate( int fd, int dur, int bps )
{ return requestRate(fd, dur, bps, HARD_GUARANTEE); }

int requestSoftRate( int fd, int dur, int bps )
{ return requestRate(fd, dur, bps, SOFT_GUARANTEE); }

#ifdef UNIT_TEST
#include <sys/stat.h>
#include <fcntl.h>
/* requestRate pathname [rate [duration [flags ] ] ] */
int main(int argc, char **argv)
{
    int fd = open(argv[1], O_RDONLY);
    int bps = 1000000; /* 1MB */
    int dur = 60; /* a new york minute */
    int flg = SOFT_GUARANTEE;
    int rc;
    if (argc > 2) bps = atoi(argv[2]);
    if (argc > 3) dur = atoi(argv[3]);
    if (argc > 4) flg = atoi(argv[4]);
    printf("Requesting guarantee on fd=%d of %d bps for %d sec\n",
                                       fd,   bps,       dur);
    rc = requestRate(fd, dur, bps, flg);
    printf("requestRate() returns %d\n", rc);
}
#endif /*UNIT_TEST*/

Frame Scheduler Examples

A number of example programs are distributed with the REACT/Pro Frame Scheduler. This section describes them. Only one is reproduced here; the others are found on disk.

The example programs distributed with the Frame Scheduler are found in the directory /usr/react/src/examples. They are summarized in Table i and are discussed in more detail in the topics that follow.

Table A-1. Summary of Frame Scheduler Example Programs

Directory

Features of Example

simple
r4k_intr

Two processes scheduled on a single CPU at a frame rate slow enough to permit use of printf() for debugging. The examples differ in the time base used; and the r4k_intr code uses a barrier for synchronization.

mprogs

Like simple, but the scheduled processes are independent programs.

multi
ext_intr
user_intr
vsync_intr

Three synchronous Frame Schedulers running lightweight processes on three processors. These examples are much alike, differing mainly in the source of the time base interrupt.

complete
stop_resume

Like multi in starting three Frame Schedulers. Information about the activity processes is stored in arrays for convenient maintenance. The stop_resume code demonstrates frs_stop() and frs_resume() calls.

driver
dintr

driver contains a pseudo-device driver that demonstrates the Frame Scheduler device driver interface. dintr contains a program based on simple that uses the example driver as a time base.

sixtyhz
memlock

One process scheduled at a 60 Hz frame rate. The activity process in the memlock example locks its address space into memory before it joins the scheduler.

upreuse

Complex example that demonstrates the creation of a pool of reusable processes, and how they can be dispatched as activity processes on a Frame Scheduler.


Basic Example

The example in /usr/react/src/examples/simple shows how to create a simple application using the Frame Scheduler API. The code in /usr/react/src/examples/r4kintr is similar.

Real-Time Application Specification

The application consists of two processes that have to periodically execute a specific sequence of code. The period for the first process, process A, is 600 milliseconds. The period for the other process, process B, is 2400 ms.


Note: Such long periods are unrealistic for real-time applications. However, they allow the use of printf() calls within the “real-time” loops in this sample program.


Frame Scheduler Design

The two periods and their ratio determine the selection of the minor frame period—600 ms—and the number of minor frames per major frame—4, for a total of 2400 ms.

The discipline for process A is strict real-time (FRS_DISC_RT). Underrun and overrrun errors should cause signals.

Process B should run only once in 2400 ms, so it operates as Continuable over as many as 4 minor frames. For the first 3 frames, its discipline is Overrunnable and Continuable. For the last frame it is strict real-time. The Overrunnable discipline allows process B to run without yielding past the end of each minor frame. The Continuable discipline ensures that once process B does yield, it is not resumed until the fourth minor frame has passed. The combination allows process B to extend its execution to the allowable period of 2400 ms, and the strict real-time discipline at the end makes certain that it yields by the end of the major frame.

There is a single Frame Scheduler so a single processor is used by both processes. Process A runs within a minor frame until yielding or until the expiration of the minor frame period. In the latter case the frame scheduler generates an overrun error signaling that process A is misbehaving.

When process A yields, the frame scheduler immediately activates process B. It runs until yielding, or until the end of the minor frame at which point it is preempted. This is not an error since process B is Overrunable.

Starting the next minor frame, the Frame Scheduler allows process A to execute again. After it yields, process B is allowed to resume running, if it has not yet yielded. Again in the third and fourth minor frame, A is started, followed by B if it has not yet yielded. At the interrupt that signals the end of the fourth frame (and the end of the major frame), process B must have yielded, or an overrun error is signalled.

Example of Scheduling Separate Programs

The code in directory /usr/react/src/examples/mprogs does the same work as example simple (see “Basic Example”). However, the activity processes A and B are physically loaded as separate commands. The main program establishes the single Frame Scheduler. The activity processes are started as separate programs. They communicate with the main program using SVR4-compatible interprocess communication messages (see the intro(2) and msgget(2) reference pages).

There are three separate executables in the mprogs example. The master program, in master.c, is a command that has the following syntax:

master [-p cpu-number] [-s slave-count]

The cpu-number specifies which processor to use for the one Frame Scheduler this program creates. The default is processor 1. The slave-count tells the master how many subordinate programs will be enqueued to the Frame Scheduler. The default is two programs.

The problems that need to be solved in this example are as follows:

  • The frs-master program must enqueue the activity processes. However, since they are started as separate programs, the master has no direct way of knowing their process IDs, which are needed for frs_enqueue().

  • The activity processes need to specify upon which minor frames they should be enqueued, and with what discipline.

  • The master needs to enqueue the activities in the proper order on their minor frames, so they will be dispatched in the proper sequence. Therefore the master has to distinguish the subordinates in some way; it cannot treat them as interchangeable.

  • The activity processes must join the Frame Scheduler, so they need the handle of the Frame Scheduler to use as an argument to frs_join(). However, this information is in the master's address space.

  • If an error occurs when enqueueing, the master needs to tell the activity processes so they can terminate in an orderly way.

There are many ways in which these objectives could be met (for example, the three programs could share a shared-memory arena). In this example, the master and subordinates communicate using a simple protocol of messages exchanged using msgget() and msgput() (see the msgget(2) and msgput(2) reference pages). The sequence of operations is as follows:

  1. The master program creates a Frame Scheduler.

  2. The master sends a message inviting the most important subordinate to reply. (All the message queue handling is in module ipc.c, which is linked by all three programs.)

  3. The subordinate compiled from the file processA.c replies to this message, sending its process ID and requesting the FRS handle.

  4. The subordinate process A sends a series of messages, one for each minor queue on which it should enqueue. The master enqueues it as requested.

  5. The subordinate process A sends a “ready” message.

  6. The master sends a message inviting the next most important process to reply.

  7. The program compiled from processB.c will reply to this request, and steps 3-6 are repeated for as many slaves as the slave-count parameter to the master program. (Only two slaves are provided. However, you can easily create more using processB.c as a pattern.)

  8. The master issues frs_start(), and waits for the termination signal.

  9. The subordinates independently issue frs_join() and the real-time dispatching begins.

Examples of Multiple Synchronized Schedulers

The example in /usr/react/src/examples/multi demonstrates the creation of three synchronized Frame Schedulers. The three use the cycle counter to establish a minor frame interval of 50 ms. All three Frame Schedulers use 20 minor frames per major frame, for a major frame rate of 1 Hz.

The following processes are scheduled in this example:

  • Processes A and D require a frequency of 20 Hz

  • Process B requires a frequency of 10 Hz and can consume up to 100 ms of execution time each time

  • Process C requires a frequence of 5 Hz and can consume up to 200 ms of execution time each time

  • Process E requires a frequency of 4 Hz and can consume up to 250 ms of execution time each time

  • Process F requires a frequency of 2 Hz and can consume up to 500 ms of execution time each time

  • Processes K1, K2 and K3 are background processes that should run as often as possible, when time is available.

The processes are assigned to processors as follows:

  • Scheduler 1 runs processes A (20 Hz) and K1 (background).

  • Scheduler 2 runs processes B (10 Hz), C (5 Hz), and K2 (background).

  • Scheduler 3 runs processes D (20Hz), E (4 Hz), F (2 Hz), and K3.

In order to simplify the coding of the example, all real-time processes use the same function body, process_skeleton(), which is parameterized with the process name, the address of the Frame Scheduler it is to join, and the address of the “real-time” action it is to execute. In the sample code, all real-time actions are empty function bodies (feel free to load them down with code).

The examples in /usr/react/src/examples/ext_intr, user_intr, and vsync_intr are all similar to multi, differing mainly in the time base used. The examples in complete and stop_resume are similar in operation, but more evolved and complex in the way they manage subprocesses.


Tip: It is helpful to use the xdiff program when comparing these similar programs—see the xdiff(1) reference page.


Example of Device Driver

The code in /usr/react/src/examples/driver contains a skeletal test-bed for a kernel-level device driver that interacts with the Frame Scheduler. Most of the driver functions consist of minimal or empty stubs. However, the ioctl() entry point to the driver (see the ioctl(2) reference page) simulates a hardware interrupt and calls the Frame Scheduler entry point, frs_handle_driverintr() (see “Generating Interrupts”). This allows you to test the driver. Calling its ioctl() entry is equivalent to using frs_usrintr() (see “The Frame Scheduler API”).

The code in /usr/react/src/examples/dintr contains a variant of the simple example that uses a device driver as the time base. The program dintr/sendintr.c opens the driver, calls ioctl() to send one time-base interrupt, and closes the driver. (It could easily be extended to send a specified number of interrupts, or to send an interrupt each time the return key is pressed.)

Examples of a 60 Hz Frame Rate

The example in directory /usr/react/src/examples/sixtyhz demonstrates the ability to schedule a process at a frame rate of 60 Hz, a common rate in visual simulators. A single Frame Scheduler is created. It uses the cycle counter with an interval of 16,666 microseconds (16.66 ms, approximately 60 Hz). There is one minor frame per major frame.

One real-time process is enqueued to the Frame Scheduler. By changing the compiler constant LOGLOOPS you can change the amount of work it attempts to do in each frame.

This example also contains the code to query and to change the signal numbers used by the Frame Scheduler.

The example in /usr/react/src/examples/memlock is similar to the sixtyhz example, but the activity process uses plock() to lock its address space. Also, it executes one major frame's worth of frs_yield() calls immediately after return from frs_join(). The purpose of this is to “warm up” the processor cache with copies of the process code and data. (An actual application process could access its major data structures prior to this yield in order to speed up the caching process.)

Example of Managing Lightweight Processes

The code in /usr/react/src/examples/upreuse implements a simulated real-time application based on a pool of reusable processes. A reusable process is created with sproc() and described by a pdesc_t structure. Code in pqueue.c builds and maintains a pool of processes. Code in pdesc.c provides functions to get and release a process, and to dispatch one to execute a specific function.

The code in test_hello.c creates a pool of processes and dispatches each one in turn to display a message. The code in test_singlefrs.c creates a pool of processes and causes them to join a Frame Scheduler.