11.20 Sending Pipe Messages (DBMS_PIPE)


Oracle's DBMS_PIPE package is an API to a set of procedures and functions that support communication between two or more processes connected to the same Oracle instance. Oracle pipes are half-diplex. Data flows in one direction. Figure 11-14 illustrates a single pipe for a one-way directional flow of information between two Oracle processes connected to the same instance.

Figure 11-14. Basic Pipe.

graphics/11fig14.gif

In Figure 11-14, process P1 sends data to P2. You need a second pipe for P2 to send data back to P1. Consider P1 to be running on a dedicated piece of hardware and connected to an Oracle instance. The other application, P2, could reside on another server. Both applications are connected to the same database instance. The communication is asynchronous. Writers do not wait for readers and readers do not have to wait for writers.

Messages going into the pipe can originate from multiple sources. Remote applications can initiate procedure calls to DBMS_PIPE through ODBC, JDBC, or Net8. Messages can also originate from database triggers. There can be any number of message producers that insert messages into the pipe. Likewise, there can be any number of message consumers that read messages from the pipe. The pipe is a FIFO structure.

Messages sent to a database pipe are not transaction based. The message producer, after sending the message, might do a commit or it might do a rollback; either way, that message remains in the pipe. This mechanism is different from database alerts, implemented with the Oracle DBMS_ALERT packagealerts are transaction based. If a process sends an alert and then rolls back, the alert is voided.

You will be using two subprograms in the package: PACK_MESSAGE and UNPACK_MESSAGEthink of a message part rather than the whole message. A producer can call PACK_MESSAGE 10 times, each time placing a text string into a local buffer. The packing is followed by a single SEND_MESSAGE. The receiver issues one RECEIVE_MESSAGE, which can be followed by 10 UNPACK_MESSAGE calls. Consider the unpacking to be unpacking 10 message parts. The key is that message parts can vary in datatype. One part can be a DATE type; another a VARCHAR2 type.

You can combine datatypes to form a single message. The PACK_MESSAGE procedure is overloaded, so is UNPACK_MESSAGE. You can pack a message part of type NUMBER, then a type of VARCHAR2 type, and a DATE. The receiver of a message can "peek" into the pipe message to determine the datatype of the next message part.

11.20.1 Send-Receive Example

The following paragraphs describe and illustrate , with PL/SQL procedures, a design in which processes can communicate in a bidirectional mode. This generalization of communication with pipes can be a basis for solving real-world specific problems with pipes.

A group of processes can communicate with each other by creating a "home" pipe for each processsimilar to a mailbox for a house. Each process pulls messages out of their home pipe. Each process knows the name of all other pipes and can send messages to those other home pipes. Figure 11-15 illustrates this communication model.

Figure 11-15. Multiple Processes and Pipes.

graphics/11fig15.gif

In a fluid situation, where processes come and go, a software architecture can dynamically generate pipe names with the DBMS_PIPE function, UNIQUE_SESSION_NAME. This function returns a pipe name that is guaranteed to be unique within the instance.

In Figure 11-15, each process has a home pipe for receiving messages. Process P1 can send messages to the pipes of the other processes. The same holds for process P2. When process P1 is not busy, it issues a read on its home pipe. The mechanism for reading a message is to issue a read-wait with a timer expressed in seconds. A process can poll a pipe by setting the timer to zero. Otherwise, a timer in seconds will post a read-wait. A process that receives pipe messages is generally dedicated to servicing pipe messages only. When there is no message in a pipe, the process sits in an idle state waiting for the next pipe message.

The following paragraphs illustrate the code for P1 and P2. In this example, a procedure called P1 builds two message parts and issues a send to a pipe called HOME_OF_P2. A procedure, named P2, reads the message and sends a reply back to P1 using the pipe named HOME_OF_P1. This code demonstrates the communication illustrated in Figure 11-15.

Perform the following steps to demonstrate this communication. First compile and execute the receiver, P2. When you execute P2, the procedure posts a read-wait with a 60-sec. timeout.

The CREATE database script CATPROC.SQL does not grant EXECUTE on DBMS_PIPE to public as it does with DBMS_OUTPUT and many other packages. You need the execute grant to use the package.

  1. Connect as SYS and GRANT EXECUTE ON DBMS_PIPE TO SCOTT.

  2. Connect as SCOTT.

  3. Compile P1 and P2.

  4. Connect as SCOTT using SQL*Plus.

  5. SET SERVEROUTPUT ON.

  6. Execute P2your session will hang because you have issued a read-wait on your home pipe. The read-wait is for 60 sec. If no message is sent from P1 in that time, there will be a harmless error generatedwe'll talk about pipe errors shortly. Before the 60 sec is up, you need to perform the next three steps.

  7. Start a new SQL*Plus session.

  8. SET SERVEROUTPUT ON.

  9. Execute P1this starts the process by sending a message to the home of P2. After sending the message, the procedure issues a read-wait (for 60 sec) on its home pipe.

The following is the PL/SQL code for procedures P1 and P2.

 
 CREATE OR REPLACE PROCEDURE P1 is     status       integer;     response     varchar2(2000); begin     dbms_pipe.reset_buffer;     dbms_pipe.pack_message('This is message 1');     dbms_pipe.pack_message('This is message 2');     status := dbms_pipe.send_message('HOME_OF_P2');     status := dbms_pipe.receive_message('HOME_OF_P1', 60);     dbms_pipe.unpack_message(response);     dbms_output.put_line('P1 received:'response); end P1; CREATE OR REPLACE procedure P2 is     status       integer;     message_1    varchar2(2000);     message_2    varchar2(2000); begin     status := dbms_pipe.receive_message('HOME_OF_P2', 60);     dbms_pipe.unpack_message(message_1);     dbms_pipe.unpack_message(message_2);     dbms_pipe.pack_message('Got your messages.');     status := dbms_pipe.send_message('HOME_OF_P1');     dbms_output.put_line('P2 received:'message_1);     dbms_output.put_line('P2 received:'message_2); end P2; 

The session output for the P2, which is executed first, is the following.

 
  SQL>  set serveroutput on  SQL>  execute p2  P2 received:This is message 1   P2 received:This is message 2  

The session output for the P1 is:

 
  SQL>  set serveroutput on  SQL>  execute p1  P1 received:Got your messages.  

11.20.2 Interface Description

The following paragraphs describe the DBMS_PIPE package API.

 
  PACK_MESSAGE  PROCEDURE pack_message(item in VARCHAR2); PROCEDURE pack_message(item in NUMBER); PROCEDURE pack_messge(item in DATE); 

item

This is a VARCHAR2, NUMBER, or DATE type variable that is placed into a local buffer. You can pack a single buffer with several messages, each of a different type.

The DBMS_PIPE package provides overloaded procedures for packing different datatypes. You can pack RAW and LONG datatypes. The model for sending data to a pipe is to PACK one or more times, then SEND once.

The PACK_MESSAGE procedure copies data to a process-specific Oracle memory buffer. Each Oracle connection has a buffer available for 4096 bytes of data. This buffer can be packed in stages, from different programs within a single Oracle connected session. Packing a message is not the same as sending a message to a pipe. Several messages can be packed and be followed by a single send, which will place all packed messages in the destination buffer. Once a buffer has messages from a PACK_MESSAGE call, there are three events that can occur:

  • The process disconnects from Oracle and no messages are sent.

  • The process calls DBMS_PIPE.RESET_BUFFER, which clears the buffer of all messages.

  • The process calls DBMS_PIPE.SEND_MESSAGE, which sends all messages and clears the buffer.

If you PACK without interruption, you will overflow the 4096 limitation. This overflow condition generates an ORA-06558 error. Similarly, when we UNPACK a buffer we can unpack an empty buffer and get an underflow condition.

When the contents of a buffer are sent to a pipe, that buffer is cleared. For example, you can pack 4096 bytes into your session buffer and send these bytes to a pipe. You can call PACK_MESSAGE again, packing another 4096 bytes. This second buffer can be sent. You have to be careful; you have physical limitations when sending data to a pipe. When you send a full buffer to a pipe and the receiver has not read its home pipe, your next send will have to wait for the receiver to read its home pipe.

 
  SEND_MESSAGE  FUNCTION send_message(pipename in VARCHAR2,     timeout IN INTEGER DEFAULT maxwait,     maxpipesize IN INTEGER DEFAULT 8192) RETURN INTEGER;     maxwait constant integer := 86400000;  /* 1000 days */     RETURN INTEGER values:     0  Success.     1  Timeout because the pipe stays full leaving no room, or         the sending process cannot place a lock on the pipe.     3 - Interrupted. 

pipename

This parameter is the name of the pipe that is to receive all messages present in the local buffer. Pipe names are limited to 128 characters . Do not use pipe names beginning with ORA$ as these are reserved by Oracle.

timeout

This is an optional argument that is the time allowed for moving the buffer contents to the pipe. This is not the time limit placed on a process reading the message. If a process is slow in reading messages from its home pipe (i.e., slow compared to the rate at which data is flowing into the pipe, the pipe may be full; hence a SEND_MESSAGE will wait for room to be made. If room in the pipe is not freed, the timer will expire and the message is not sent.

maxpipesize

This is the maximum size message allowed for the pipe. This must be at least as large as the message. Subsequent calls to SEND_MESSAGE with a larger maxpipesize will increase the maximum size of a message allowed in the pipe. Maxpipesize, when used, becomes a persistent attribute of the pipe. The demo procedures above, P1 and P2, used this parameter. Had those procedures explicitly created their pipes with the CREATE_PIPE procedure, which requires a maxpipesize, then there would not be a need for maxpipesize as an argument to SEND_MESSAGE.

The SEND_MESSAGE procedure copies the contents of the message buffer into a designated pipe. Upon completion, there is no guarantee that the message has been read. A successful return code indicates only that the message was copied . After the call, the local message buffer is empty.

 
  RECEIVE_MESSAGE  FUNCTION receive_message(pipe_name IN VARCHAR2,     timeout IN INTEGER DEFAULT maxwait) RETURN INTEGER;     maxwait constant integer := 86400000;  /* 1000 days */     RETURN INTEGER values:     0 - Success     1  Timeout     2  Record in pipe is too large for the buffer.     3 - Interrupted. 

pipename

This parameter is the name of the pipe that is the source of the message to be received. In previous discussions, this was referred to as the home pipe. Pipe names are limited to 128 characters. Do not use pipe names beginning with ORA$ as these are reserved by Oracle.

timeout

An optional argument is the time allowed for moving the pipe contents to the buffer. If the pipe is empty you will wait on a message until the timer expires . You can post a read on just one pipe. This is in contrast to alerts (DBMS_ALERT package) where you can post a "wait on any" alert. If a process wishes to receive data from several pipes, that process must pole each pipe with a RECEIVE_MESSAGE. For this reason, there is no advantage to a design in which a process has several home pipes.

You can use a timeout of zero to issue a nonblocking read.

The RECEIVE_MESSAGE function copies messages from the pipe into the local buffer. A successful read is indicated with a zero return code. If no messages are present in the pipe, you still get a zero, indicating a successful read. Once you issue RECEIVE_MESSAGE, you then unpack the message from the local buffer into local variables .

 
  UNPACK_MESSAGE  PROCEDURE unpack_message(item OUT VARCHAR22); PROCEDURE unpack_message(item OUT NUMBER); PROCEDURE unpack_messge(item OUT DATE); 

Item

This is a variable that has been declared in your procedure.

The DBMS_PIPE package provides overloaded procedures for unpacking different datatypes. You can pack RAW and LONG datatypes. The model for receiving data is to issue one RECEIVE followed by a series of UNPACK commands.

The UNPACK_MESSAGE procedure copies the next message from your buffer into the variable. This is an OUT mode variable. Following the call to UNPACK, that variable contains the contents of the latest message. How do you know what, if anything, is in the buffer, especially because a RECEIVE_MESSAGE returns a successful return code when the pipe is empty? The function NEXT_ITEM_TYPE is the key. If your code needs to loop over messages in the buffer, you want to incorporate NEXT_ITEM_TYPE in your loop. This is illustrated in the following sample code.

 
  RESET_BUFFER  PROCEDURE reset_buffer; 

The RESET_BUFFER procedure initializes buffer position variables declared within the package body of DBMS_PIPE. When a message is sent with SEND_MESSAGE, these same variables are initialized as a method of clearing, and resetting, the internal buffer. You would call this procedure if a buffer was packed and you wanted to clear it and repack it with different messages, prior to sending the buffer messages to a pipe.

 
  PURGE  PROCEDURE purge(pipename IN VARCHAR2); 

pipename

The name of the pipe you are cleaning out.

The PURGE procedure removes any messages in the pipe. This procedure frees all memory associated with the pipe. Processes that send and receive messages can abort. When this occurs, and the processes are restarted, they may see messages left in the pipe from the previous run. If this scenario is possible, the logic of these processes should be able to detect, through the presence of messages in the pipe, a warm start situation, as opposed to a cold start.

 
  NEXT_ITEM_TYPE  FUNCTION next_item_type RETURN INTEGER;     RETURN INTEGER values:     0  No more items in the pipe.     9  The next item in the buffer is a VARCHAR2     2  The next item is a NUMBER.     3  The next item is a DATE. 

The NEXT_ITEM_TYPE function provides the ability to "peek" into the message buffer. It is useful when messages of various types are being sent. Even if you know what type of message is being sent, this function can be incorporated into a WHILE LOOP so you can break easily when the message buffer is empty.

If you do not use this function to detect an empty buffer, then you risk calling UNPACK_MESSAGE on an empty bufferthis will generate an ORA-06556 or an ORA-06559 error.

 
  UNIQUE_SESSION_NAME  FUNCTION unique_session_name RETURN VARCHAR2; 

The UNIQUE_SESSION_NAME function returns a name like ORA$PIPE$ followed by a sequence of letters and digits. This pipe name is provided by Oracle and is guaranteed to be unique throughout all sessions that connect to that instance. A process must know the name of a pipe prior to calling SEND_MESSAGE; the same holds true for the function, RECEIVE_MESSAGE. When UNIQUE_SESSION_NAME is used to generate pipe names, there has to be some central pipe name registration in the form of a database table where applications register their pipe names; then every process can know the home pipe name of all other processes.

 
  CREATE_PIPE, REMOVE_PIPE  

Creation and removal of a pipe can be accomplished with the following two functions.

 
 FUNCTION create_pipe(pipename IN VARCHAR2,     maxpipesize IN INTEGER DEFAULT 8192,     private IN BOOLEAN DEFAULT TRUE) RETURN INTEGER; FUNCTION remove_pipe (pipename IN VARCHAR2) RETURN INTEGER; 

Our examples use the functions SEND_MESSAGE and RECEIVE_MESSAGE to implicitly create the pipes. These functions create the pipe named in the call if it does not already exist. You can implicitly create a function and yet still remove it with REMOVE_PIPE.

11.20.3 Exception Handling

The following lists the possible exceptions that can occur. The last item in this list, UNPACK_MESSAGE, is the most common, and we will demonstrate PL/SQL code to handle this condition.

  • SEND_MESSAGE : The destination pipe may remain full. When this happens, you will wait for TIMEOUT seconds to pass and then get a return code of 1.

  • RECEIVE_MESSAGE : If the message in the pipe is too large for the buffer, you get a return code of 2. This event is not likely because all messages originate from the same size buffer, which is limited to 4096. The pipe limit is twice that.

  • PACK_MESSAGE : You can overflow your local buffer, which produces an ORA-06558 error.

  • UNPACK_MESSAGE, NEXT_ITEM_TYPE : You can unpack and empty your buffer, which produces an ORA-06556 or an ORA-06559 error.

In PL/SQL code, you can use PRAGMA EXCEPTION_INIT to ensure that, should a particular ORA error occur, you capture that as an exception. This is how we capture an Oracle error.

We are most concerned with an error when we unpack an empty message. If no message has been sent to our pipe, the RECEIVE_MESSAGE returns a status of 0. The following error is generated when we follow that receive with either a call to UNPACK_MESSAGE or NEXT_ITEM_TYPE

 
 ORA-06556: the pipe is empty, cannot fulfill the unpack_message request 

The following procedure, P3, is more robust because it will not fail. The parsing of message parts is embedded within a loopthis permits us to get all message parts. That code is further embedded within an exception handler. All of this logic is embedded within a loopbut one that loops three times. Procedure P3, shown next, illustrates how a PL/SQL procedure can serve as an endless-loop message handler.

 
 CREATE OR REPLACE procedure P3 is     status            integer;     message_part      varchar2(2000);     message           varchar2(2000);     empty_buffer      exception;     pragma exception_init(empty_buffer, -6556); begin     for i in 1..3 loop         status := dbms_pipe.receive_message('HOME_OF_P3', 5);         begin             while (dbms_pipe.next_item_type = 9) loop                 dbms_pipe.unpack_message(message_part);                 message := message'-'message_part;             end loop;         exception             when empty_buffer then null;         end;     end loop;     dbms_output.put_line(message); end P3; 

Procedure P4 is demonstrated with the following SQL*Plus session, which sends messages to the pipe of P3. When P3 completes three cycles, the output is the concatenation of whatever messages were read. For the SQL*Plus session we have (start P3 before this session):

 
  SQL>  set feedback off  SQL>  variable status number  SQL>  execute dbms_pipe.reset_buffer;  SQL>  execute dbms_pipe.pack_message('This is message 1');  SQL>  execute dbms_pipe.pack_message('This is message 2');  SQL>  execute :status := dbms_pipe.send_message('HOME_OF_P3'); 

The output from P3 is the following.

 
  SQL>  SET SERVEROUTPUT ON  SQL>  execute p3  -This is  message 1-This is message 2 SQL> 


Programming Oracle Triggers and Stored Procedures
Programming Oracle Triggers and Stored Procedures (3rd Edition) (Prentice Hall PTR Oracle Series)
ISBN: 0130850330
EAN: 2147483647
Year: 2003
Pages: 111
Authors: Kevin Owens

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net