Oracle's DBMS_PIPE package is an API to a set of procedures and functions that support communication between two or more processes connected to the same Oracle instance. Oracle pipes are half-diplex. Data flows in one direction. Figure 11-14 illustrates a single pipe for a one-way directional flow of information between two Oracle processes connected to the same instance. Figure 11-14. Basic Pipe.
In Figure 11-14, process P1 sends data to P2. You need a second pipe for P2 to send data back to P1. Consider P1 to be running on a dedicated piece of hardware and connected to an Oracle instance. The other application, P2, could reside on another server. Both applications are connected to the same database instance. The communication is asynchronous. Writers do not wait for readers and readers do not have to wait for writers. Messages going into the pipe can originate from multiple sources. Remote applications can initiate procedure calls to DBMS_PIPE through ODBC, JDBC, or Net8. Messages can also originate from database triggers. There can be any number of message producers that insert messages into the pipe. Likewise, there can be any number of message consumers that read messages from the pipe. The pipe is a FIFO structure. Messages sent to a database pipe are not transaction based. The message producer, after sending the message, might do a commit or it might do a rollback; either way, that message remains in the pipe. This mechanism is different from database alerts, implemented with the Oracle DBMS_ALERT packagealerts are transaction based. If a process sends an alert and then rolls back, the alert is voided. You will be using two subprograms in the package: PACK_MESSAGE and UNPACK_MESSAGEthink of a message part rather than the whole message. A producer can call PACK_MESSAGE 10 times, each time placing a text string into a local buffer. The packing is followed by a single SEND_MESSAGE. The receiver issues one RECEIVE_MESSAGE, which can be followed by 10 UNPACK_MESSAGE calls. Consider the unpacking to be unpacking 10 message parts. The key is that message parts can vary in datatype. One part can be a DATE type; another a VARCHAR2 type. You can combine datatypes to form a single message. The PACK_MESSAGE procedure is overloaded, so is UNPACK_MESSAGE. You can pack a message part of type NUMBER, then a type of VARCHAR2 type, and a DATE. The receiver of a message can "peek" into the pipe message to determine the datatype of the next message part. 11.20.1 Send-Receive ExampleThe following paragraphs describe and illustrate , with PL/SQL procedures, a design in which processes can communicate in a bidirectional mode. This generalization of communication with pipes can be a basis for solving real-world specific problems with pipes. A group of processes can communicate with each other by creating a "home" pipe for each processsimilar to a mailbox for a house. Each process pulls messages out of their home pipe. Each process knows the name of all other pipes and can send messages to those other home pipes. Figure 11-15 illustrates this communication model. Figure 11-15. Multiple Processes and Pipes.
In a fluid situation, where processes come and go, a software architecture can dynamically generate pipe names with the DBMS_PIPE function, UNIQUE_SESSION_NAME. This function returns a pipe name that is guaranteed to be unique within the instance. In Figure 11-15, each process has a home pipe for receiving messages. Process P1 can send messages to the pipes of the other processes. The same holds for process P2. When process P1 is not busy, it issues a read on its home pipe. The mechanism for reading a message is to issue a read-wait with a timer expressed in seconds. A process can poll a pipe by setting the timer to zero. Otherwise, a timer in seconds will post a read-wait. A process that receives pipe messages is generally dedicated to servicing pipe messages only. When there is no message in a pipe, the process sits in an idle state waiting for the next pipe message. The following paragraphs illustrate the code for P1 and P2. In this example, a procedure called P1 builds two message parts and issues a send to a pipe called HOME_OF_P2. A procedure, named P2, reads the message and sends a reply back to P1 using the pipe named HOME_OF_P1. This code demonstrates the communication illustrated in Figure 11-15. Perform the following steps to demonstrate this communication. First compile and execute the receiver, P2. When you execute P2, the procedure posts a read-wait with a 60-sec. timeout. The CREATE database script CATPROC.SQL does not grant EXECUTE on DBMS_PIPE to public as it does with DBMS_OUTPUT and many other packages. You need the execute grant to use the package.
The following is the PL/SQL code for procedures P1 and P2. CREATE OR REPLACE PROCEDURE P1 is status integer; response varchar2(2000); begin dbms_pipe.reset_buffer; dbms_pipe.pack_message('This is message 1'); dbms_pipe.pack_message('This is message 2'); status := dbms_pipe.send_message('HOME_OF_P2'); status := dbms_pipe.receive_message('HOME_OF_P1', 60); dbms_pipe.unpack_message(response); dbms_output.put_line('P1 received:'response); end P1; CREATE OR REPLACE procedure P2 is status integer; message_1 varchar2(2000); message_2 varchar2(2000); begin status := dbms_pipe.receive_message('HOME_OF_P2', 60); dbms_pipe.unpack_message(message_1); dbms_pipe.unpack_message(message_2); dbms_pipe.pack_message('Got your messages.'); status := dbms_pipe.send_message('HOME_OF_P1'); dbms_output.put_line('P2 received:'message_1); dbms_output.put_line('P2 received:'message_2); end P2; The session output for the P2, which is executed first, is the following. SQL> set serveroutput on SQL> execute p2 P2 received:This is message 1 P2 received:This is message 2 The session output for the P1 is: SQL> set serveroutput on SQL> execute p1 P1 received:Got your messages. 11.20.2 Interface DescriptionThe following paragraphs describe the DBMS_PIPE package API. PACK_MESSAGE PROCEDURE pack_message(item in VARCHAR2); PROCEDURE pack_message(item in NUMBER); PROCEDURE pack_messge(item in DATE);
The DBMS_PIPE package provides overloaded procedures for packing different datatypes. You can pack RAW and LONG datatypes. The model for sending data to a pipe is to PACK one or more times, then SEND once. The PACK_MESSAGE procedure copies data to a process-specific Oracle memory buffer. Each Oracle connection has a buffer available for 4096 bytes of data. This buffer can be packed in stages, from different programs within a single Oracle connected session. Packing a message is not the same as sending a message to a pipe. Several messages can be packed and be followed by a single send, which will place all packed messages in the destination buffer. Once a buffer has messages from a PACK_MESSAGE call, there are three events that can occur:
If you PACK without interruption, you will overflow the 4096 limitation. This overflow condition generates an ORA-06558 error. Similarly, when we UNPACK a buffer we can unpack an empty buffer and get an underflow condition. When the contents of a buffer are sent to a pipe, that buffer is cleared. For example, you can pack 4096 bytes into your session buffer and send these bytes to a pipe. You can call PACK_MESSAGE again, packing another 4096 bytes. This second buffer can be sent. You have to be careful; you have physical limitations when sending data to a pipe. When you send a full buffer to a pipe and the receiver has not read its home pipe, your next send will have to wait for the receiver to read its home pipe. SEND_MESSAGE FUNCTION send_message(pipename in VARCHAR2, timeout IN INTEGER DEFAULT maxwait, maxpipesize IN INTEGER DEFAULT 8192) RETURN INTEGER; maxwait constant integer := 86400000; /* 1000 days */ RETURN INTEGER values: 0 Success. 1 Timeout because the pipe stays full leaving no room, or the sending process cannot place a lock on the pipe. 3 - Interrupted.
The SEND_MESSAGE procedure copies the contents of the message buffer into a designated pipe. Upon completion, there is no guarantee that the message has been read. A successful return code indicates only that the message was copied . After the call, the local message buffer is empty. RECEIVE_MESSAGE FUNCTION receive_message(pipe_name IN VARCHAR2, timeout IN INTEGER DEFAULT maxwait) RETURN INTEGER; maxwait constant integer := 86400000; /* 1000 days */ RETURN INTEGER values: 0 - Success 1 Timeout 2 Record in pipe is too large for the buffer. 3 - Interrupted.
The RECEIVE_MESSAGE function copies messages from the pipe into the local buffer. A successful read is indicated with a zero return code. If no messages are present in the pipe, you still get a zero, indicating a successful read. Once you issue RECEIVE_MESSAGE, you then unpack the message from the local buffer into local variables . UNPACK_MESSAGE PROCEDURE unpack_message(item OUT VARCHAR22); PROCEDURE unpack_message(item OUT NUMBER); PROCEDURE unpack_messge(item OUT DATE);
The DBMS_PIPE package provides overloaded procedures for unpacking different datatypes. You can pack RAW and LONG datatypes. The model for receiving data is to issue one RECEIVE followed by a series of UNPACK commands. The UNPACK_MESSAGE procedure copies the next message from your buffer into the variable. This is an OUT mode variable. Following the call to UNPACK, that variable contains the contents of the latest message. How do you know what, if anything, is in the buffer, especially because a RECEIVE_MESSAGE returns a successful return code when the pipe is empty? The function NEXT_ITEM_TYPE is the key. If your code needs to loop over messages in the buffer, you want to incorporate NEXT_ITEM_TYPE in your loop. This is illustrated in the following sample code. RESET_BUFFER PROCEDURE reset_buffer; The RESET_BUFFER procedure initializes buffer position variables declared within the package body of DBMS_PIPE. When a message is sent with SEND_MESSAGE, these same variables are initialized as a method of clearing, and resetting, the internal buffer. You would call this procedure if a buffer was packed and you wanted to clear it and repack it with different messages, prior to sending the buffer messages to a pipe. PURGE PROCEDURE purge(pipename IN VARCHAR2);
The PURGE procedure removes any messages in the pipe. This procedure frees all memory associated with the pipe. Processes that send and receive messages can abort. When this occurs, and the processes are restarted, they may see messages left in the pipe from the previous run. If this scenario is possible, the logic of these processes should be able to detect, through the presence of messages in the pipe, a warm start situation, as opposed to a cold start. NEXT_ITEM_TYPE FUNCTION next_item_type RETURN INTEGER; RETURN INTEGER values: 0 No more items in the pipe. 9 The next item in the buffer is a VARCHAR2 2 The next item is a NUMBER. 3 The next item is a DATE. The NEXT_ITEM_TYPE function provides the ability to "peek" into the message buffer. It is useful when messages of various types are being sent. Even if you know what type of message is being sent, this function can be incorporated into a WHILE LOOP so you can break easily when the message buffer is empty. If you do not use this function to detect an empty buffer, then you risk calling UNPACK_MESSAGE on an empty bufferthis will generate an ORA-06556 or an ORA-06559 error. UNIQUE_SESSION_NAME FUNCTION unique_session_name RETURN VARCHAR2; The UNIQUE_SESSION_NAME function returns a name like ORA$PIPE$ followed by a sequence of letters and digits. This pipe name is provided by Oracle and is guaranteed to be unique throughout all sessions that connect to that instance. A process must know the name of a pipe prior to calling SEND_MESSAGE; the same holds true for the function, RECEIVE_MESSAGE. When UNIQUE_SESSION_NAME is used to generate pipe names, there has to be some central pipe name registration in the form of a database table where applications register their pipe names; then every process can know the home pipe name of all other processes. CREATE_PIPE, REMOVE_PIPE Creation and removal of a pipe can be accomplished with the following two functions. FUNCTION create_pipe(pipename IN VARCHAR2, maxpipesize IN INTEGER DEFAULT 8192, private IN BOOLEAN DEFAULT TRUE) RETURN INTEGER; FUNCTION remove_pipe (pipename IN VARCHAR2) RETURN INTEGER; Our examples use the functions SEND_MESSAGE and RECEIVE_MESSAGE to implicitly create the pipes. These functions create the pipe named in the call if it does not already exist. You can implicitly create a function and yet still remove it with REMOVE_PIPE. 11.20.3 Exception HandlingThe following lists the possible exceptions that can occur. The last item in this list, UNPACK_MESSAGE, is the most common, and we will demonstrate PL/SQL code to handle this condition.
In PL/SQL code, you can use PRAGMA EXCEPTION_INIT to ensure that, should a particular ORA error occur, you capture that as an exception. This is how we capture an Oracle error. We are most concerned with an error when we unpack an empty message. If no message has been sent to our pipe, the RECEIVE_MESSAGE returns a status of 0. The following error is generated when we follow that receive with either a call to UNPACK_MESSAGE or NEXT_ITEM_TYPE ORA-06556: the pipe is empty, cannot fulfill the unpack_message request The following procedure, P3, is more robust because it will not fail. The parsing of message parts is embedded within a loopthis permits us to get all message parts. That code is further embedded within an exception handler. All of this logic is embedded within a loopbut one that loops three times. Procedure P3, shown next, illustrates how a PL/SQL procedure can serve as an endless-loop message handler. CREATE OR REPLACE procedure P3 is status integer; message_part varchar2(2000); message varchar2(2000); empty_buffer exception; pragma exception_init(empty_buffer, -6556); begin for i in 1..3 loop status := dbms_pipe.receive_message('HOME_OF_P3', 5); begin while (dbms_pipe.next_item_type = 9) loop dbms_pipe.unpack_message(message_part); message := message'-'message_part; end loop; exception when empty_buffer then null; end; end loop; dbms_output.put_line(message); end P3; Procedure P4 is demonstrated with the following SQL*Plus session, which sends messages to the pipe of P3. When P3 completes three cycles, the output is the concatenation of whatever messages were read. For the SQL*Plus session we have (start P3 before this session): SQL> set feedback off SQL> variable status number SQL> execute dbms_pipe.reset_buffer; SQL> execute dbms_pipe.pack_message('This is message 1'); SQL> execute dbms_pipe.pack_message('This is message 2'); SQL> execute :status := dbms_pipe.send_message('HOME_OF_P3'); The output from P3 is the following. SQL> SET SERVEROUTPUT ON SQL> execute p3 -This is message 1-This is message 2 SQL> |