11.20 Sending Pipe Messages (DBMS_PIPE)
Oracle's DBMS_PIPE package is an API to a set of procedures and functions that support communication between two or more processes connected to the same Oracle instance. Oracle pipes are half-diplex. Data flows in one direction. Figure 11-14 illustrates a single pipe for a one-way directional flow of information between two Oracle processes connected to the same instance.
Figure 11-14. Basic Pipe.
In Figure 11-14, process P1 sends data to P2. You need a second pipe for P2 to send data back to P1. Consider P1 to be running on a dedicated piece of hardware and connected to an Oracle instance. The other application, P2, could reside on another server. Both applications are connected to the same database instance. The communication is asynchronous. Writers do not wait for readers and readers do not have to wait for writers.
Messages going into the pipe can originate from multiple sources. Remote applications can initiate procedure calls to DBMS_PIPE through ODBC, JDBC, or Net8. Messages can also originate from database triggers. There can be any number of message
Messages sent to a database pipe are not transaction based. The message producer, after sending the message, might do a commit or it might do a rollback; either way, that message remains in the pipe. This mechanism is different from database alerts, implemented with the Oracle DBMS_ALERT packagealerts are transaction based. If a process sends an alert and then rolls back, the alert is voided.
You will be using two subprograms in the package: PACK_MESSAGE and UNPACK_MESSAGEthink of a message part rather than the whole message. A producer can call PACK_MESSAGE 10 times, each time placing a text string into a local buffer. The packing is followed by a single SEND_MESSAGE. The receiver issues one RECEIVE_MESSAGE, which can be followed by 10 UNPACK_MESSAGE calls. Consider the unpacking to be unpacking 10 message parts. The key is that message
You can combine datatypes to form a single message. The PACK_MESSAGE procedure is overloaded, so is UNPACK_MESSAGE. You can pack a message part of type NUMBER, then a type of VARCHAR2 type, and a DATE. The receiver of a message can "peek" into the pipe message to determine the datatype of the
11.20.1 Send-Receive Example
The following paragraphs describe and
Figure 11-15. Multiple Processes and Pipes.
In a fluid situation, where processes come and go, a software architecture can dynamically generate pipe
In Figure 11-15, each process has a home pipe for receiving messages. Process P1 can send messages to the pipes of the other processes. The same holds for process P2. When process P1 is not busy, it issues a read on its home pipe. The mechanism for reading a message is to issue a read-wait with a timer
The following paragraphs illustrate the code for P1 and P2. In this example, a procedure called P1 builds two message parts and issues a send to a pipe called HOME_OF_P2. A procedure, named P2, reads the message and sends a reply back to P1 using the pipe named HOME_OF_P1. This code
Perform the following steps to
The CREATE database script CATPROC.SQL does not grant EXECUTE on DBMS_PIPE to public as it does with DBMS_OUTPUT and many other packages. You need the execute grant to use the package.
The following is the PL/SQL code for procedures P1 and P2.
CREATE OR REPLACE PROCEDURE P1 is status integer; response varchar2(2000); begin dbms_pipe.reset_buffer; dbms_pipe.pack_message('This is message 1'); dbms_pipe.pack_message('This is message 2'); status := dbms_pipe.send_message('HOME_OF_P2'); status := dbms_pipe.receive_message('HOME_OF_P1', 60); dbms_pipe.unpack_message(response); dbms_output.put_line('P1 received:'response); end P1; CREATE OR REPLACE procedure P2 is status integer; message_1 varchar2(2000); message_2 varchar2(2000); begin status := dbms_pipe.receive_message('HOME_OF_P2', 60); dbms_pipe.unpack_message(message_1); dbms_pipe.unpack_message(message_2); dbms_pipe.pack_message('Got your messages.'); status := dbms_pipe.send_message('HOME_OF_P1'); dbms_output.put_line('P2 received:'message_1); dbms_output.put_line('P2 received:'message_2); end P2;
The session output for the P2, which is executed first, is the following.
SQL> set serveroutput on SQL> execute p2 P2 received:This is message 1 P2 received:This is message 2
The session output for the P1 is:
SQL> set serveroutput on SQL> execute p1 P1 received:Got your messages.
11.20.2 Interface Description
The following paragraphs describe the DBMS_PIPE package API.
PACK_MESSAGE PROCEDURE pack_message(item in VARCHAR2); PROCEDURE pack_message(item in NUMBER); PROCEDURE pack_messge(item in DATE);
The DBMS_PIPE package provides overloaded procedures for packing different datatypes. You can pack RAW and LONG datatypes. The model for sending data to a pipe is to PACK one or more times, then SEND once.
The PACK_MESSAGE procedure copies data to a process-specific Oracle memory buffer. Each Oracle connection has a buffer available for 4096 bytes of data. This buffer can be packed in stages, from different programs within a single Oracle connected session. Packing a message is not the same as sending a message to a pipe. Several messages can be packed and be followed by a single send, which will place all packed messages in the destination buffer. Once a buffer has messages from a PACK_MESSAGE call, there are three events that can occur:
If you PACK without interruption, you will overflow the 4096 limitation. This overflow condition generates an ORA-06558 error. Similarly, when we UNPACK a buffer we can unpack an empty buffer and get an underflow condition.
When the contents of a buffer are sent to a pipe, that buffer is cleared. For example, you can pack 4096 bytes into your session buffer and send these bytes to a pipe. You can call PACK_MESSAGE again, packing another 4096 bytes. This second buffer can be sent. You have to be careful; you have physical limitations when sending data to a pipe. When you send a full buffer to a pipe and the receiver has not read its home pipe, your next send will have to wait for the receiver to read its home pipe.
SEND_MESSAGE FUNCTION send_message(pipename in VARCHAR2, timeout IN INTEGER DEFAULT maxwait, maxpipesize IN INTEGER DEFAULT 8192) RETURN INTEGER; maxwait constant integer := 86400000; /* 1000 days */ RETURN INTEGER values: 0 Success. 1 Timeout because the pipe stays full leaving no room, or the sending process cannot place a lock on the pipe. 3 - Interrupted.
The SEND_MESSAGE procedure copies the contents of the message buffer into a designated pipe. Upon completion, there is no guarantee that the message has been read. A successful return code indicates only that the message was
RECEIVE_MESSAGE FUNCTION receive_message(pipe_name IN VARCHAR2, timeout IN INTEGER DEFAULT maxwait) RETURN INTEGER; maxwait constant integer := 86400000; /* 1000 days */ RETURN INTEGER values: 0 - Success 1 Timeout 2 Record in pipe is too large for the buffer. 3 - Interrupted.
The RECEIVE_MESSAGE function copies messages from the pipe into the local buffer. A successful read is indicated with a zero return code. If no messages are present in the pipe, you still get a zero, indicating a successful read. Once you issue RECEIVE_MESSAGE, you then unpack the message from the local buffer into local
UNPACK_MESSAGE PROCEDURE unpack_message(item OUT VARCHAR22); PROCEDURE unpack_message(item OUT NUMBER); PROCEDURE unpack_messge(item OUT DATE);
The DBMS_PIPE package provides overloaded procedures for unpacking different datatypes. You can pack RAW and LONG datatypes. The model for receiving data is to issue one RECEIVE followed by a series of UNPACK commands.
The UNPACK_MESSAGE procedure copies the next message from your buffer into the variable. This is an OUT mode variable. Following the call to UNPACK, that variable contains the contents of the latest message. How do you know what, if anything, is in the buffer,
RESET_BUFFER PROCEDURE reset_buffer;
The RESET_BUFFER procedure initializes buffer position variables declared within the package body of DBMS_PIPE. When a message is sent with SEND_MESSAGE, these same variables are
PURGE PROCEDURE purge(pipename IN VARCHAR2);
The PURGE procedure
NEXT_ITEM_TYPE FUNCTION next_item_type RETURN INTEGER; RETURN INTEGER values: 0 No more items in the pipe. 9 The next item in the buffer is a VARCHAR2 2 The next item is a NUMBER. 3 The next item is a DATE.
The NEXT_ITEM_TYPE function provides the ability to "peek" into the message buffer. It is useful when messages of various types are being sent. Even if you know what type of message is being sent, this function can be incorporated into a WHILE LOOP so you can break easily when the message buffer is empty.
If you do not use this function to detect an empty buffer, then you risk calling UNPACK_MESSAGE on an empty bufferthis will generate an ORA-06556 or an ORA-06559 error.
UNIQUE_SESSION_NAME FUNCTION unique_session_name RETURN VARCHAR2;
The UNIQUE_SESSION_NAME function returns a name like ORA$PIPE$ followed by a sequence of
Creation and removal of a pipe can be accomplished with the following two functions.
FUNCTION create_pipe(pipename IN VARCHAR2, maxpipesize IN INTEGER DEFAULT 8192, private IN BOOLEAN DEFAULT TRUE) RETURN INTEGER; FUNCTION remove_pipe (pipename IN VARCHAR2) RETURN INTEGER;
Our examples use the functions SEND_MESSAGE and RECEIVE_MESSAGE to implicitly create the pipes. These functions create the pipe named in the call if it does not already exist. You can implicitly create a function and yet still remove it with REMOVE_PIPE.
11.20.3 Exception Handling
The following lists the possible exceptions that can occur. The last item in this list, UNPACK_MESSAGE, is the most common, and we will demonstrate PL/SQL code to handle this condition.
In PL/SQL code, you can use PRAGMA EXCEPTION_INIT to ensure that, should a particular ORA error occur, you capture that as an exception. This is how we capture an Oracle error.
We are most
ORA-06556: the pipe is empty, cannot fulfill the unpack_message request
The following procedure, P3, is more robust because it will not fail. The parsing of message parts is embedded within a loopthis
CREATE OR REPLACE procedure P3 is status integer; message_part varchar2(2000); message varchar2(2000); empty_buffer exception; pragma exception_init(empty_buffer, -6556); begin for i in 1..3 loop status := dbms_pipe.receive_message('HOME_OF_P3', 5); begin while (dbms_pipe.next_item_type = 9) loop dbms_pipe.unpack_message(message_part); message := message'-'message_part; end loop; exception when empty_buffer then null; end; end loop; dbms_output.put_line(message); end P3;
Procedure P4 is demonstrated with the following SQL*Plus session, which sends messages to the pipe of P3. When P3 completes three cycles, the output is the concatenation of whatever messages were read. For the SQL*Plus session we have (start P3 before this session):
SQL> set feedback off SQL> variable status number SQL> execute dbms_pipe.reset_buffer; SQL> execute dbms_pipe.pack_message('This is message 1'); SQL> execute dbms_pipe.pack_message('This is message 2'); SQL> execute :status := dbms_pipe.send_message('HOME_OF_P3');
The output from P3 is the following.
SQL> SET SERVEROUTPUT ON SQL> execute p3 -This is message 1-This is message 2 SQL>