Section 15.7. Message Queues

team bbl


15.7. Message Queues

A message queue is a linked list of messages stored within the kernel and identified by a message queue identifier. We'll call the message queue just a queue and its identifier a queue ID.

The Single UNIX Specification includes an alternate IPC message queue implementation in the message-passing option of its real-time extensions. We do not cover the real-time extensions in this text.

A new queue is created or an existing queue opened by msgget. New messages are added to the end of a queue by msgsnd. Every message has a positive long integer type field, a non-negative length, and the actual data bytes (corresponding to the length), all of which are specified to msgsnd when the message is added to a queue. Messages are fetched from a queue by msgrcv. We don't have to fetch the messages in a first-in, first-out order. Instead, we can fetch messages based on their type field.

Each queue has the following msqid_ds structure associated with it:

    struct msqid_ds {      struct ipc_perm  msg_perm;     /* see Section 15.6.2 */      msgqnum_t        msg_qnum;     /* # of messages on queue */      msglen_t         msg_qbytes;   /* max # of bytes on queue */      pid_t            msg_lspid;    /* pid of last msgsnd() */      pid_t            msg_lrpid;    /* pid of last msgrcv() */      time_t           msg_stime;    /* last-msgsnd() time */      time_t           msg_rtime;    /* last-msgrcv() time */      time_t           msg_ctime;    /* last-change time */      .      .      .    }; 

This structure defines the current status of the queue. The members shown are the ones defined by the Single UNIX Specification. Implementations include additional fields not covered by the standard.

Figure 15.26 lists the system limits that affect message queues. We show "notsup" where the platform doesn't support the feature. We show "derived" whenever a limit is derived from other limits. For example, the maximum number of messages in a Linux system is based on the maximum number of queues and the maximum amount of data allowed on the queues. If the minimum message size is 1 byte, that would limit the number of messages systemwide to maximum # queues * maximum size of a queue. Given the limits in Figure 15.26, Linux has an upper bound of 262,144 messages with the default configuration. (Even though a message can contain zero bytes of data, Linux treats it as if it contained 1 byte, to limit the number of messages queued.)

Figure 15.26. System limits that affect message queues

Description

Typical values

FreeBSD 5.2.1

Linux 2.4.22

Mac OS X 10.3

Solaris 9

Size in bytes of largest message we can send

16,384

8,192

notsup

2,048

The maximum size in bytes of a particular queue (i.e., the sum of all the messages on the queue)

2,048

16,384

notsup

4,096

The maximum number of messages queues, systemwide

40

16

notsup

50

The maximum number of messages, systemwide

40

derived

notsup

40


Recall from Figure 15.1 that Mac OS X 10.3 doesn't support XSI message queues. Since Mac OS X is based in part on FreeBSD, and FreeBSD supports message queues, it is possible for Mac OS X to support them, too. Indeed, a good Internet search engine will provide pointers to a third-party port of XSI message queues for Mac OS X.

The first function normally called is msgget to either open an existing queue or create a new queue.

 #include <sys/msg.h> int msgget(key_t key, int flag); 

Returns: message queue ID if OK, 1 on error


In Section 15.6.1, we described the rules for converting the key into an identifier and discussed whether a new queue is created or an existing queue is referenced. When a new queue is created, the following members of the msqid_ds structure are initialized.

  • The ipc_perm structure is initialized as described in Section 15.6.2. The mode member of this structure is set to the corresponding permission bits of flag. These permissions are specified with the values from Figure 15.24.

  • msg_qnum, msg_lspid, msg_lrpid, msg_stime, and msg_rtime are all set to 0.

  • msg_ctime is set to the current time.

  • msg_qbytes is set to the system limit.

On success, msgget returns the non-negative queue ID. This value is then used with the other three message queue functions.

The msgctl function performs various operations on a queue. This function and the related functions for semaphores and shared memory (semctl and shmctl) are the ioctl-like functions for XSI IPC (i.e., the garbage-can functions).

 #include <sys/msg.h> int msgctl(int msqid, int cmd, struct msqid_ds *buf ); 

Returns: 0 if OK, 1 on error


The cmd argument specifies the command to be performed on the queue specified by msqid.

IPC_STAT

Fetch the msqid_ds structure for this queue, storing it in the structure pointed to by buf.

IPC_SET

Copy the following fields from the structure pointed to by buf to the msqid_ds structure associated with this queue: msg_perm.uid, msg_perm.gid, msg_perm.mode, and msg_qbytes. This command can be executed only by a process whose effective user ID equals msg_perm.cuid or msg_perm.uid or by a process with superuser privileges. Only the superuser can increase the value of msg_qbytes.

IPC_RMID

Remove the message queue from the system and any data still on the queue. This removal is immediate. Any other process still using the message queue will get an error of EIDRM on its next attempted operation on the queue. This command can be executed only by a process whose effective user ID equals msg_perm.cuid or msg_perm.uid or by a process with superuser privileges.


We'll see that these three commands (IPC_STAT, IPC_SET, and IPC_RMID) are also provided for semaphores and shared memory.

Data is placed onto a message queue by calling msgsnd.

[View full width]

 #include <sys/msg.h> int msgsnd(int msqid, const void *ptr, size_t  nbytes, int flag); 

Returns: 0 if OK, 1 on error


As we mentioned earlier, each message is composed of a positive long integer type field, a non-negative length (nbytes), and the actual data bytes (corresponding to the length). Messages are always placed at the end of the queue.

The ptr argument points to a long integer that contains the positive integer message type, and it is immediately followed by the message data. (There is no message data if nbytes is 0.) If the largest message we send is 512 bytes, we can define the following structure:

    struct mymesg {      long  mtype;      /* positive message type */      char  mtext[512]; /* message data, of length nbytes */    }; 

The ptr argument is then a pointer to a mymesg structure. The message type can be used by the receiver to fetch messages in an order other than first in, first out.

Some platforms support both 32-bit and 64-bit environments. This affects the size of long integers and pointers. For example, on 64-bit SPARC systems, Solaris allows both 32-bit and 64-bit applications to coexist. If a 32-bit application were to exchange this structure over a pipe or a socket with a 64-bit application, problems would arise, because the size of a long integer is 4 bytes in a 32-bit application, but 8 bytes in a 64-bit application. This means that a 32-bit application will expect that the mtext field will start 4 bytes after the start of the structure, whereas a 64-bit application will expect the mtext field to start 8 bytes after the start of the structure. In this situation, part of the 64-bit application's mtype field will appear as part of the mtext field to the 32-bit application, and the first 4 bytes in the 32-bit application's mtext field will be interpreted as a part of the mtype field by the 64-bit application.

This problem doesn't happen with XSI message queues, however. Solaris implements the 32-bit version of the IPC system calls with different entry points than the 64-bit version of the IPC system calls. The system calls know how to deal with a 32-bit application communicating with a 64-bit application, and treat the type field specially to avoid it interfering with the data portion of the message. The only potential problem is a loss of information when a 64-bit application sends a message with a value in the 8-byte type field that is larger than will fit in a 32-bit application's 4-byte type field. In this case, the 32-bit application will see a truncated type value.

A flag value of IPC_NOWAIT can be specified. This is similar to the nonblocking I/O flag for file I/O (Section 14.2). If the message queue is full (either the total number of messages on the queue equals the system limit, or the total number of bytes on the queue equals the system limit), specifying IPC_NOWAIT causes msgsnd to return immediately with an error of EAGAIN. If IPC_NOWAIT is not specified, we are blocked until there is room for the message, the queue is removed from the system, or a signal is caught and the signal handler returns. In the second case, an error of EIDRM is returned ("identifier removed"); in the last case, the error returned is EINTR.

Note how ungracefully the removal of a message queue is handled. Since a reference count is not maintained with each message queue (as there is for open files), the removal of a queue simply generates errors on the next queue operation by processes still using the queue. Semaphores handle this removal in the same fashion. In contrast, when a file is removed, the file's contents are not deleted until the last open descriptor for the file is closed.

When msgsnd returns successfully, the msqid_ds structure associated with the message queue is updated to indicate the process ID that made the call (msg_lspid), the time that the call was made (msg_stime), and that one more message is on the queue (msg_qnum).

Messages are retrieved from a queue by msgrcv.

[View full width]

 #include <sys/msg.h> ssize_t msgrcv(int msqid, void *ptr, size_t nbytes , long type, int flag); 

Returns: size of data portion of message if OK, 1 on error


As with msgsnd, the ptr argument points to a long integer (where the message type of the returned message is stored) followed by a data buffer for the actual message data. nbytes specifies the size of the data buffer. If the returned message is larger than nbytes and the MSG_NOERROR bit in flag is set, the message is truncated. (In this case, no notification is given to us that the message was truncated, and the remainder of the message is discarded.) If the message is too big and this flag value is not specified, an error of E2BIG is returned instead (and the message stays on the queue).

The type argument lets us specify which message we want.

type == 0

The first message on the queue is returned.

type > 0

The first message on the queue whose message type equals type is returned.

type < 0

The first message on the queue whose message type is the lowest value less than or equal to the absolute value of type is returned.


A nonzero type is used to read the messages in an order other than first in, first out. For example, the type could be a priority value if the application assigns priorities to the messages. Another use of this field is to contain the process ID of the client if a single message queue is being used by multiple clients and a single server (as long as a process ID fits in a long integer).

We can specify a flag value of IPC_NOWAIT to make the operation nonblocking, causing msgrcv to return -1 with errno set to ENOMSG if a message of the specified type is not available. If IPC_NOWAIT is not specified, the operation blocks until a message of the specified type is available, the queue is removed from the system (-1 is returned with errno set to EIDRM), or a signal is caught and the signal handler returns (causing msgrcv to return 1 with errno set to EINTR).

When msgrcv succeeds, the kernel updates the msqid_ds structure associated with the message queue to indicate the caller's process ID (msg_lrpid), the time of the call (msg_rtime), and that one less message is on the queue (msg_qnum).

ExampleTiming Comparison of Message Queues versus Stream Pipes

If we need a bidirectional flow of data between a client and a server, we can use either message queues or full-duplex pipes. (Recall from Figure 15.1 that full-duplex pipes are available through the UNIX domain sockets mechanism (Section 17.3), although some platforms provide a full-duplex pipe mechanism through the pipe function.)

Figure 15.27 shows a timing comparison of three of these techniques on Solaris: message queues, STREAMS-based pipes, and UNIX domain sockets. The tests consisted of a program that created the IPC channel, called fork, and then sent about 200 megabytes of data from the parent to the child. The data was sent using 100,000 calls to msgsnd, with a message length of 2,000 bytes for the message queue, and 100,000 calls to write, with a length of 2,000 bytes for the STREAMS-based pipe and UNIX domain socket. The times are all in seconds.

These numbers show us that message queues, originally implemented to provide higher-than-normal-speed IPC, are no longer that much faster than other forms of IPC (in fact, STREAMS-based pipes are faster than message queues). (When message queues were implemented, the only other form of IPC available was half-duplex pipes.) When we consider the problems in using message queues (Section 15.6.4), we come to the conclusion that we shouldn't use them for new applications.

Figure 15.27. Timing comparison of IPC alternatives on Solaris

Operation

User

System

Clock

message queue

0.57

3.63

4.22

STREAMS pipe

0.50

3.21

3.71

UNIX domain socket

0.43

4.45

5.59


    team bbl



    Advanced Programming in the UNIX Environment
    Advanced Programming in the UNIX Environment, Second Edition (Addison-Wesley Professional Computing Series)
    ISBN: 0321525949
    EAN: 2147483647
    Year: 2005
    Pages: 370

    flylib.com © 2008-2017.
    If you may any questions please contact us: flylib@qtcs.net