пятница, 21 сентября 2012 г.

PL/SQL BULK insert & JDBC batchExecute

Ко мне обратились разработчи с вопросом об эффективности использования executeBatch при использовании JDBC в сравнении с Oracle PL/SQL BULK (forall) insert.

В результате родился небольшой код на java, который производит вызов PL/SQL блока с реализованной BULK вставкой, а так же проделывает такой же объем работы посредством Java JDBC вызовов, но уже с использованием executeBatch().

PL/SQL код BULK операции:

declare
  type t_tbl is table of batchtest.n%type;
  v_tbl t_tbl := t_tbl();
begin
  for i in 1..1000 loop
    v_tbl.extend;
    v_tbl(i) := i;
  end loop;
  forall n in 1..v_tbl.count insert into batchtest /*PLSQL*/ values(1, v_tbl(n));
end;


Java код Batch вызова:


PreparedStatement psttm2 = conn.prepareStatement("insert into batchtest /*JDBC*/ values(2, ?)");
for (int n=1; n<=1000;n++) {
               psttm2.setInt(1, n);
               psttm2.addBatch();
}
psttm2.executeBatch();
conn.commit();

Результат, полученный из анализа Trace файла:

Rank:2(28.6%) Self:0.946s Recursive:0.021s Invoker:52 Definer:52 Depth:1

INSERT INTO BATCHTEST VALUES(1, :B1 )

SQL Self - Time, Totals,
Waits, Binds and Row Source Plan
Call
Response Time
Accounted-for
Elapsed
Time
CPU Time
Non-Idle
Wait Time
Elapsed Time
Unaccounted-for
Idle
Wait Time
Parse:
0.001
0.001
0.001
0.000
0.000
0.000
Execute:
0.945
0.945
0.381
0.565
0.000
0.000
Fetch:
0.000
0.000
0.000
0.000
0.000
0.000
Total:
0.946
0.946
0.382
0.565
-0.001
0.000

Call
Call
Count
OS
Buffer Gets
(disk)
BG Consistent
Read Mode
(query)
BG Current
Mode
(current)
Rows
Processed
or Returned
Library
Cache
Misses
Times
Waited
Non-Idle
Times
Waited
Idle
Parse:
2
0
0
0
0
1
0
0
Execute:
200
17
799
4793
200000
1
62
0
Fetch:
0
0
0
0
0
0
0
0
Total:
202
17
799
4793
200000
2
62
0


Rank:3(22.5%) Self:0.742s Recursive:0.000s Invoker:52 Definer:52 Depth:0

insert into batchtest /*JDBC*/ values(2, :1 )

SQL Self - Time, Totals, Waits, Binds and Row Source Plan

Call
Response Time
Accounted-for
Elapsed
Time
CPU Time
Non-Idle
Wait Time
Elapsed Time
Unaccounted-for
Idle
Wait Time
Parse:
0.001
0.001
0.001
0.000
0.000
0.000
Execute:
0.742
0.574
0.297
1.807
-1.530
0.168
Fetch:
0.000
0.000
0.000
0.000
0.000
0.000
Total:
0.742
0.575
0.298
1.807
-1.531
0.168

Call
Call
Count
OS
Buffer Gets
(disk)
BG Consistent
Read Mode
(query)
BG Current
Mode
(current)
Rows
Processed
or Returned
Library
Cache
Misses
Times
Waited
Non-Idle
Times
Waited
Idle
Parse:
1
0
0
0
0
1
0
0
Execute:
200
12
733
4559
200000
1
423
199
Fetch:
0
0
0
0
0
0
0
0
Total:
201
12
733
4559
200000
2
423
199



Исходя из результатов анализа trace файла можно заключить, что
1) Oracle PL/SQL Bulk и Java JDBC Batch имеют одинаковую картину вызова
2) Оба запроса создают близкую нагрузку как по характеру так и по объему
3) *** При многократном запуске усредненная производительность обоих запросов расходилась менее, чем на 1%.

P.S.> Поскольку на большинстве тестов JDBC Batch показывал - как минимум - не меньшую производительность, то тесты с запуском Java из базы, дабы исключить влияние задержек SQL*Net на доставку объема batch, не проводились.


Анализ статистики был проведён посредством Oracle Trace Analyzer.
Ниже привожу java код:

oracleBatchTest.java


import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import oracle.jdbc.pool.OracleDataSource;

public class oracleBatchTest {
   
    public static void main(String argv[]) throws SQLException {
        OracleDataSource ods = new OracleDataSource();
        ods.setURL("jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=clu-host-02.net.billing.ru)(PORT=1521)))(CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=dvlp11.net.billing.ru)(INSTANCE_NAME=dvlp112)))");
        ods.setUser("ashindarev");
        ods.setPassword("********");
        Connection conn = ods.getConnection();
        Statement sttm = conn.createStatement();
        sttm.execute("begin\n"+
                     "  for rec in (\n"+
                     "    select count(1) as cnt from user_tables where table_name = 'BATCHTEST'\n"+
                     "  ) loop\n"+
                     "    if rec.cnt = 0 then\n"+
                     "      execute immediate 'create table batchtest(id integer, n number) storage(initial 1M next 1M pctincrease 0 minextents 4)';\n"+
                     "    end if;\n"+
                     "  end loop;\n"+
                     "  execute immediate 'truncate table batchtest reuse storage';\n"+
                     "end;");
        sttm.execute("ALTER SESSION SET EVENTS '10046 trace name context forever, level 8'");
        PreparedStatement psttm1 = conn.prepareStatement(
                                       "declare\n"+
                                       "  type t_tbl is table of batchtest.n%type;\n"+
                                       "  v_tbl t_tbl := t_tbl();\n"+
                                       "begin\n"+
                                       "  for i in 1..1000 loop\n"+
                                       "    v_tbl.extend;\n"+
                                       "    v_tbl(i) := i;\n"+
                                       "  end loop;\n"+
                                       "  forall n in 1..v_tbl.count insert into batchtest /*PLSQL*/ values(1, v_tbl(n));\n"+
                                       "end;"
                                   );
        PreparedStatement psttm2 = conn.prepareStatement("insert into batchtest /*JDBC*/ values(2, ?)");
        for (int i=0; i<100; i++) {
            psttm1.execute();
            conn.commit();
            for (int n=1; n<=1000;n++) {
                psttm2.setInt(1, n);
                psttm2.addBatch();
            }
            psttm2.executeBatch();
            conn.commit();
            psttm2.clearBatch();
        }
        for (int i=0; i<100; i++) {
            for (int n=1; n<=1000;n++) {
                psttm2.setInt(1, n);
                psttm2.addBatch();
            }
            psttm2.executeBatch();
            psttm2.clearBatch();
            conn.commit();
            psttm1.execute();
            conn.commit();
        }
        sttm.execute("ALTER SESSION SET EVENTS '10046 trace name context off'");
        sttm.execute("drop table BATCHTEST purge");
        conn.close();
    }
   
}

пятница, 27 апреля 2012 г.

Осторожно - баг Oracle XDK 11.2.0.2

Собственно - наткнулся на баг при использовании Oracle XDK 11.2.0.2.
Если рассмотреть код: <xsl:value-of select="substring($str, string-length($pfx), 1)"/>,
то для строк $str="value+=1" и для $pfx="value+" данное выражение должно вернуть "+".
Ан нет - так происходит где угодно, но не в Oracle XDK 11.2.0.2 - там будет пустая строка.
Лечится небольшой модификацией:

  <xsl:value-of select="substring($str, string-length($pfx)+0, 1)"/>

P.S.> Вообще-то это хак - лучше всё проводить через переменную с длинной.
P.P.S> На данную проблему открыт Oracle Metalink SR: 3-5642816821

среда, 9 марта 2011 г.

Использование dbms_pipe на Oracle Real Application Cluster (RAC)

У DBMS_PIPE при использовании на Oracle RAC есть один момент - между узлами не работает.

В связи с этим годик назад я для себя сварганил код, который сейчас потребовалось включить в production код. Проблема в том, что в силу ряда обстоятельств как следует протестировать его не удастся и опа случится именно когда понадобится его использовать. В итоге пришёл к мысли, что следует поменять свой труд по кодированию на Ваш по тестированию :).

В общем - получился пакет (пока что его рыба) близкий по спецификации к utl_pipe. Желающие могут его модифицировать под свои задачи, но - в качестве спасибо - хотелось бы получить информацию по ошибкам и пожеланиям с Вашей стороны.

CREATE OR REPLACE PACKAGE UTL_PIPE
IS
 
  MAXWAIT         CONSTANT INTEGER     := 86400000;
 
  -- Создание канала
   FUNCTION CREATE_PIPE (
               PIPENAME    IN VARCHAR2              -- Имя канала
              ,MAXPIPESIZE IN INTEGER DEFAULT 8192  -- Не используется
              ,PRIVATE     IN BOOLEAN DEFAULT FALSE -- Не используется
            )
     RETURN INTEGER;
 
  -- Создание канала
   FUNCTION REMOVE_PIPE (
               PIPENAME    IN VARCHAR2 -- Имя канала
            )
     RETURN INTEGER;
 
  -- Упаковка строки в сообщение
  PROCEDURE PACK_MESSAGE (
              ITEM IN VARCHAR2 CHARACTER SET ANY_CS
            );
 
  -- Упаковка числа в сообщение
  PROCEDURE PACK_MESSAGE (
              ITEM IN NUMBER
            );
 
  -- Упаковка даты в сообщение
  PROCEDURE PACK_MESSAGE (
              ITEM IN DATE
            );
 
  -- Упаковка raw в сообщение
  PROCEDURE PACK_MESSAGE_RAW (
              ITEM IN RAW
            );
 
  -- Упаковка rowid/urowid в сообщение
  PROCEDURE PACK_MESSAGE_ROWID (
              ITEM IN UROWID
            );
 
  -- Получение типа следующего элемента сообщения
   FUNCTION NEXT_ITEM_TYPE
     RETURN INTEGER;
 
  -- Построение XML представления сообщения
   FUNCTION CONSTRUCT_MESSAGE
     RETURN XMLTYPE;
 
  -- Получение уникального имени для сессии
   FUNCTION UNIQUE_SESSION_NAME
     RETURN VARCHAR2;
 
  -- Отправка текстового XML сообщения в канал
   FUNCTION SEND_MESSAGE (
              PIPENAME    IN VARCHAR2                -- Имя канала
             ,MESSAGE     IN CLOB                    -- XML текст сообщение
             ,TIMEOUT     IN INTEGER DEFAULT MAXWAIT -- Не используется
             ,MAXPIPESIZE IN INTEGER DEFAULT 8192    -- Не используется
            )
     RETURN INTEGER;
 
  -- Отправка XML сообщения в канал
   FUNCTION SEND_MESSAGE (
              PIPENAME    IN VARCHAR2                -- Имя канала
             ,MESSAGE     IN XMLTYPE                 -- XML сообщение
             ,TIMEOUT     IN INTEGER DEFAULT MAXWAIT -- Не используется
             ,MAXPIPESIZE IN INTEGER DEFAULT 8192    -- Не используется
            )
     RETURN INTEGER;
 
  -- Отправка сообщения, построенного из упакованных в буффер данных, в канал
   FUNCTION SEND_MESSAGE (
              PIPENAME    IN VARCHAR2                -- Имя канала
             ,TIMEOUT     IN INTEGER DEFAULT MAXWAIT -- Не используется
             ,MAXPIPESIZE IN INTEGER DEFAULT 8192    -- Не используется
            )
     RETURN INTEGER;
 
  -- Получение сообщения из канала
   FUNCTION RECEIVE_MESSAGE (
              PIPENAME IN VARCHAR2,               -- Имя канала
              TIMEOUT  IN INTEGER DEFAULT MAXWAIT -- Время ожидания
            )
     RETURN INTEGER;
 
  -- Получение прочитанного из канала XML сообщения
   FUNCTION UNPACK_MESSAGE
     RETURN XMLTYPE;
 
  -- Получить численное представление текущего элемента сообщения
  PROCEDURE UNPACK_MESSAGE (
              ITEM OUT NUMBER
            );
 
  -- Получить строковое представление текущего элемента сообщения
  PROCEDURE UNPACK_MESSAGE (
              ITEM OUT VARCHAR2
            );
 
  -- Получить представление текущего элемента сообщения в виде даты
  PROCEDURE UNPACK_MESSAGE (
              ITEM OUT DATE
            );
 
  -- Получить представление текущего элемента сообщения в виде raw
  PROCEDURE UNPACK_MESSAGE_RAW (
              ITEM OUT RAW
            );
 
  -- Получить представление текущего элемента сообщения в виде rowid
  PROCEDURE UNPACK_MESSAGE_ROWID (
              ITEM OUT UROWID
            );
 
  -- Очистка заданного канала
  PROCEDURE PURGE (
              PIPENAME IN VARCHAR2
            );
 
  -- Процесс очистки старых каналов
  PROCEDURE PACK (
              PR_CONSTRUCT IN BOOLEAN DEFAULT FALSE
            );
 
  -- Удаление внутренних структур UTL_PIPE.
  -- Доступна только владельцу или суперпользователю
  PROCEDURE DESTROY;
 
END UTL_PIPE;
/
CREATE OR REPLACE PACKAGE BODY UTL_PIPE
  --
  -- P.S.> По умолчанию размер буффера на очередь находится где-то в пределах 5101 сообщения.
  --       расширить буффер можно путём изменения скрытого параметра:
  --       _buffered_publisher_flow_control_threshold
  --       Например:
  --         alter system set "_buffered_publisher_flow_control_threshold"=500000 sid='*' scope=both
  --
  -- P.P.S.> В случае получения ошибки ORA-025306 необходимо правильно сконфигурировать  сервер базы
  --         данных в RAC окружении таким образом, чтобы корректно работали буфферизованные  очереди
  --         при посылке и чтении сообщений на разных узлах. Наличие данной ошибки говорит  о  некор-
  --         ректности сетевых настроек базы данных в кластерном окружении
  --
IS
 
  LC_QUEUE_TABLE_NAME CONSTANT VARCHAR2(16) := 'UPQ$PIPE_STORAGE';     -- Имя таблицы для очередей
  LC_TOUCH_PIPE_NAME  CONSTANT VARCHAR2(20) := 'UPQ$PIPE$TOUCH$QUEUE'; -- Имя служебного PIPE
  LC_TOUCH_PROC_PFX   CONSTANT VARCHAR2(15) := 'UPQ$PIPE_TOUCH_';      -- Префикс для объектов Scheduler-а
  LC_TOUCH_PRG_SUFF   CONSTANT VARCHAR2(20) := 'PRG';                  -- Суффикс для задания имени программы
  LC_TOUCH_SCHED_SUFF CONSTANT VARCHAR2(20) := 'SCH';                  -- Суффикс для задания имени расписания
  LC_TOUCH_JOB_SUFF   CONSTANT VARCHAR2(20) := 'JOB';                  -- Суффикс для задания имени задания
 
  LC_NUMBER_TYPEID    CONSTANT INTEGER := 6;
  LC_VARCHAR2_TYPEID  CONSTANT INTEGER := 9;
  LC_UROWID_TYPEID    CONSTANT INTEGER := 11;
  LC_DATE_TYPEID      CONSTANT INTEGER := 12;
  LC_RAW_TYPEID       CONSTANT INTEGER := 23;
  LC_UNKNOWN_TYPEID   CONSTANT INTEGER := 0;
 
  LV_SCHEMA_NAME      SYS.ALL_USERS.USERNAME%TYPE := NULL;             -- Имя схемы-владельца пакета
 
  LV_ENQ_CT    DBMS_AQ.ENQUEUE_OPTIONS_T;
  LV_MSG_PROP  DBMS_AQ.MESSAGE_PROPERTIES_T;
  LV_PURGEOPT  DBMS_AQADM.AQ$_PURGE_OPTIONS_T;
  LV_DEQ_CT    DBMS_AQ.DEQUEUE_OPTIONS_T;
 
  TYPE LTP_MSG_REC     IS RECORD (
                             TYPE_ID    INTEGER
                            ,V_NUMBER   NUMBER
                            ,V_DATE     DATE
                            ,V_UROWID   UROWID
                            ,V_VARCHAR2 VARCHAR2(32767)
                            ,V_RAW      RAW(32767)
                          );
  TYPE LTP_MSG_DETAILS IS TABLE OF LTP_MSG_REC;
  TYPE LTP_NAMES       IS TABLE OF VARCHAR2(30) INDEX BY BINARY_INTEGER;
 
  LC_TOUCH_PERIOD      CONSTANT NUMBER := 1/24;                -- Частота интервалов сигнализации сессии о том, что  с  pipe-ом
                                                               -- производится работа. (Час)
  LC_TOUCH_LIMIT       CONSTANT NUMBER := 4;                   -- Количество интервалов простоя после которых удаляются очереди
                                                               -- для пустых pipe. (4 часа).
  LC_TOUCH_ANY_LIMIT   CONSTANT NUMBER := LC_TOUCH_LIMIT*24;   -- Количество интервалов простоя после которых удаляются очереди
                                                               -- для не пустых pipe. (4 дня).
 
  TYPE LTP_TOUCH_LIST  IS TABLE OF DATE INDEX BY ALL_QUEUES.NAME%TYPE;
 
  LV_TOUCH_LIST        LTP_TOUCH_LIST;
 
  LV_NAMES        LTP_NAMES;
  LV_NEW_MESSAGE  LTP_MSG_DETAILS;
  LV_READED_MSG   SYS.XMLTYPE;
  LV_READED_TBL   LTP_MSG_DETAILS;
  LV_READED_POS   INTEGER := 0;
 
  LE_06556        EXCEPTION; -- ORA-06556: канал пуст, невозможно выполнить запрос unpack_message
  PRAGMA EXCEPTION_INIT (LE_06556, -06556);
  LE_06559        EXCEPTION; -- ORA-06559: Запрошен неверный тип данных...
  PRAGMA EXCEPTION_INIT (LE_06559, -06559);
  E_QUEUE_TIMEOUT EXCEPTION; -- ORA-25228: таймаут или окончание выборки во время снятия сообщения из очереди .
  PRAGMA EXCEPTION_INIT(E_QUEUE_TIMEOUT, -25228);
  E_NO_QUEUE      EXCEPTION; -- ORA-25205: очередь не существует
  PRAGMA EXCEPTION_INIT(E_NO_QUEUE, -25205);
 
  LV_RESULT       INTEGER;
 
  -- Расхирение имени объекта до полного имени с учётом схемы и регистра
  FUNCTION LF_EXTEND_NAME (
             PR_NAME IN VARCHAR2
           )
    RETURN VARCHAR2
  IS
    LV_NAME VARCHAR2(98) := TRIM(PR_NAME);
  BEGIN
    IF SUBSTR(LV_NAME,1,1)  != '"' OR
       SUBSTR(LV_NAME,-1,1) != '"'
    THEN
      LV_NAME := '"'||UPPER(LV_NAME)||'"';
    END IF;
    RETURN '"'||LV_SCHEMA_NAME||'".'||LV_NAME;
  END;
 
  -- Преобразование имени канала или очереди к имени очереди
   FUNCTION LF_GET_QUEUE_NAME (
              PR_NAME IN VARCHAR2
            )
     RETURN VARCHAR2
  IS
  BEGIN
    IF PR_NAME IS NULL OR
       (UPPER(PR_NAME) LIKE 'UPQ$%' AND LENGTH(PR_NAME) <= 24)
    THEN
      RETURN UPPER(PR_NAME);
    END IF;
    RETURN SUBSTR (
             SUBSTR('UPQ$'||UPPER(PR_NAME),1,14)||
             DBMS_OBFUSCATION_TOOLKIT.MD5(input => UTL_RAW.CAST_TO_RAW(UPPER(PR_NAME)))
            ,1,24
           );
  END;
 
  -- Создание очереди на служебной таблице
  PROCEDURE LP_CREATE_QUEUE (
              PIPENAME IN VARCHAR2
            )
  IS
    PRAGMA AUTONOMOUS_TRANSACTION;
    LV_QUEUE_NAME ALL_QUEUES.NAME%TYPE := LF_GET_QUEUE_NAME(pr_name => PIPENAME);
    LV_CNT        INTEGER;
  BEGIN
    -- Если отсутствует таблица для очередей - создаём
    SELECT COUNT(1)
      INTO LV_CNT
      FROM ALL_QUEUE_TABLES AQTB
     WHERE AQTB.OWNER       = LV_SCHEMA_NAME
       AND AQTB.QUEUE_TABLE = LC_QUEUE_TABLE_NAME;
    IF LV_CNT = 0 THEN
      DBMS_AQADM.CREATE_QUEUE_TABLE (
        queue_table        => LF_EXTEND_NAME(pr_name => LC_QUEUE_TABLE_NAME)
       ,queue_payload_type => 'SYS.XMLTYPE'
       ,compatible         => '10.2'
       ,multiple_consumers => false
      );
    END IF;
    -- Если отсутствует очередь - создаём
    SELECT COUNT(1)
      INTO LV_CNT
      FROM ALL_QUEUES AQTB
     WHERE AQTB.OWNER  = LV_SCHEMA_NAME
       AND AQTB.NAME   = LV_QUEUE_NAME;
    IF LV_CNT = 0 THEN
      DBMS_AQADM.CREATE_QUEUE (
        queue_name         => LF_EXTEND_NAME(pr_name => LV_QUEUE_NAME)
       ,queue_table        => LF_EXTEND_NAME(pr_name => LC_QUEUE_TABLE_NAME)
      );
    END IF;
    -- Запускаем очередь
    DBMS_AQADM.START_QUEUE (
      queue_name         => LF_EXTEND_NAME(pr_name => LV_QUEUE_NAME)
    );
  END;
 
  -- Создание задания на очистку неиспользуемых pipe
  PROCEDURE LP_CONSTRUCT
  IS
    LV_CNT   INTEGER;
  BEGIN
    -- Если нет задания - создаём
    SELECT COUNT(1)
      INTO LV_CNT
      FROM ALL_SCHEDULER_JOBS
     WHERE OWNER    = LV_SCHEMA_NAME
       AND JOB_NAME = LC_TOUCH_PROC_PFX||LC_TOUCH_JOB_SUFF;
    IF LV_CNT = 0 THEN
      -- Если нет расписания - создаём
      SELECT COUNT(1)
        INTO LV_CNT
        FROM ALL_SCHEDULER_SCHEDULES
       WHERE OWNER             = LV_SCHEMA_NAME
         AND SCHEDULE_NAME     = LC_TOUCH_PROC_PFX||LC_TOUCH_SCHED_SUFF;
      IF LV_CNT = 0 THEN
        DBMS_SCHEDULER.CREATE_SCHEDULE (
           schedule_name     =>  LC_TOUCH_PROC_PFX||LC_TOUCH_SCHED_SUFF
          ,start_date        =>  TRUNC(SYSDATE,'HH24')+1/24
          ,repeat_interval   =>  'FREQ=HOURLY;INTERVAl=1'
        );
      END IF;
      -- Если нет программы - создаём
      SELECT COUNT(1)
        INTO LV_CNT
        FROM ALL_SCHEDULER_PROGRAMS
       WHERE OWNER        = LV_SCHEMA_NAME
         AND PROGRAM_NAME = LC_TOUCH_PROC_PFX||LC_TOUCH_PRG_SUFF;
      IF LV_CNT = 0 THEN
        DBMS_SCHEDULER.CREATE_PROGRAM (
           program_name        => LC_TOUCH_PROC_PFX||LC_TOUCH_PRG_SUFF
          ,program_action      => LF_EXTEND_NAME(pr_name => '"'||$$PLSQL_UNIT||'"."PACK"')
          ,program_type        => 'STORED_PROCEDURE'
          ,number_of_arguments => 0
          ,enabled             => TRUE
        );
      END IF;
      -- создаём задание
      DBMS_SCHEDULER.CREATE_JOB(
         job_name      => LC_TOUCH_PROC_PFX||LC_TOUCH_JOB_SUFF
        ,program_name  => LC_TOUCH_PROC_PFX||LC_TOUCH_PRG_SUFF
        ,schedule_name => LC_TOUCH_PROC_PFX||LC_TOUCH_SCHED_SUFF
        ,enabled       => TRUE
        ,auto_drop     => FALSE
      );
    END IF;
  END;
 
  -- Чтение сообщения из канала
   FUNCTION L_RECEIVE_MESSAGE (
              PIPENAME      IN VARCHAR2              -- имя канала
             ,TIMEOUT       IN INTEGER               -- timeout ожидания
             ,DEQ_CONDITION IN VARCHAR2 DEFAULT NULL -- условие выборки
            )
     RETURN INTEGER
  IS
    DEQ_CT    DBMS_AQ.DEQUEUE_OPTIONS_T := LV_DEQ_CT;
    ENQ_MSGID RAW(16);
  BEGIN
    LV_READED_TBL  := LTP_MSG_DETAILS();
    LV_READED_POS  := 0;
 
    DEQ_CT.WAIT          := TIMEOUT;
    IF DEQ_CONDITION IS NOT NULL THEN
      DEQ_CT.DEQ_CONDITION := DEQ_CONDITION;
    END IF;
 
    DBMS_AQ.DEQUEUE (
      dequeue_options    => DEQ_CT
     ,message_properties => LV_MSG_PROP
     ,msgid              => enq_msgid
     ,payload            => LV_READED_MSG
     ,queue_name         => LF_EXTEND_NAME(pr_name => LF_GET_QUEUE_NAME(pr_name => PIPENAME))
    );
 
    LV_READED_POS := -1;
    RETURN 0;
  EXCEPTION
    WHEN E_QUEUE_TIMEOUT THEN -- Если прервались по timeout, то
      RETURN 1;               -- возвращаем сигнал об отсутствии сообщений
    WHEN E_NO_QUEUE THEN                      -- Если не нашли очереди, то создаём её и
      LP_CREATE_QUEUE (pipename => PIPENAME); -- повторяем действие
      LP_CONSTRUCT;
      RETURN L_RECEIVE_MESSAGE (
               pipename      => PIPENAME
              ,timeout       => TIMEOUT
              ,deq_condition => DEQ_CONDITION
             );
  END;
 
  -- Проверяем интервал, когда последний раз вызывася метод для конкретного канала в данной сессии
  -- и если он превысил заданную величину - посылаем новый сигнал для указанного канала.
  PROCEDURE LP_TOUCH (
              PIPENAME IN VARCHAR2
            )
  IS
    V_QUEUE_NAME ALL_QUEUES.NAME%TYPE := LF_GET_QUEUE_NAME(PIPENAME);
  BEGIN
    IF LV_TOUCH_LIST.EXISTS(V_QUEUE_NAME) AND
       SYSDATE <= LV_TOUCH_LIST(V_QUEUE_NAME) + LC_TOUCH_PERIOD
    THEN
      RETURN;
    END IF;
    LV_TOUCH_LIST(V_QUEUE_NAME) := SYSDATE;
    PACK_MESSAGE(item => V_QUEUE_NAME);
    PACK_MESSAGE(item => LV_TOUCH_LIST(V_QUEUE_NAME));
    LV_RESULT := SEND_MESSAGE(pipename => LC_TOUCH_PIPE_NAME);
    WHILE L_RECEIVE_MESSAGE (
            pipename      => LC_TOUCH_PIPE_NAME
           ,timeout       => 0
           ,deq_condition => 'tab.user_data.extract(''/message/varchar2[position()= 1 and text() = '''''||V_QUEUE_NAME||''''']'') is not null'
                           ||' and '
                           ||'tab.enq_time < to_date('''||to_char(LV_TOUCH_LIST(V_QUEUE_NAME),'dd.mm.yyyy hh24:mi:ss')||''',''dd.mm.yyyy hh24:mi:ss'') - 1/24/3600'
          ) = 0 LOOP
      NULL;
    END LOOP;
  END;
 
  -- Проверка на существование канала
   FUNCTION LF_IS_PIPE_EXISTS (
              PIPENAME IN VARCHAR2
            )
     RETURN BOOLEAN
  IS
    LV_QUEUE_NAME ALL_QUEUES.NAME%TYPE := LF_GET_QUEUE_NAME(pr_name => PIPENAME);
    LV_CNT        INTEGER;
  BEGIN
    SELECT COUNT(1)
      INTO LV_CNT
      FROM ALL_QUEUES ALQE
     WHERE ALQE.OWNER = LV_SCHEMA_NAME
       AND ALQE.NAME  = LV_QUEUE_NAME;
    RETURN LV_CNT != 0;
  END;
 
  -- Очискта канала с определённым фильтром
  PROCEDURE LP_PURGE (
              PIPENAME         IN VARCHAR2
             ,PURGE_CONDITIONS IN VARCHAR2 DEFAULT NULL
            )
  IS
  BEGIN
 
    IF LF_IS_PIPE_EXISTS (
         pipename => PIPENAME
       )
    THEN
      DBMS_AQADM.PURGE_QUEUE_TABLE (
        queue_table     => LF_EXTEND_NAME(pr_name => LC_QUEUE_TABLE_NAME)
       ,purge_condition => 'queue = '''||LF_GET_QUEUE_NAME(pr_name => PIPENAME)||''''||
                           CASE
                             WHEN PURGE_CONDITIONS IS NULL THEN
                               ''
                             ELSE
                               ' and ('||PURGE_CONDITIONS||')'
                           END
       ,purge_options   => LV_PURGEOPT
      );
    END IF;
 
  END;
 
  -- Удаление сигналов по пайпам для которых отсутствуют очереди
  PROCEDURE LP_PURGE_EPSENT_TOUCHE (
              PIPENAME IN VARCHAR2 DEFAULT NULL
            ) IS
  BEGIN
    LP_PURGE (
       pipename         => LC_TOUCH_PIPE_NAME
      ,purge_conditions => CASE
                             WHEN PIPENAME IS NULL THEN
                               'extract(user_data,''/message/varchar2[position()=1]/text()'').getStringVal() not in (select distinct queue from '||LF_EXTEND_NAME(pr_name => LC_QUEUE_TABLE_NAME)||')'
                             ELSE
                               'extract(user_data,''/message/varchar2[position()= 1 and text() = '''''||LF_GET_QUEUE_NAME(PIPENAME)||''''']'') is not null'
                           END
    );
  END;
 
  -- Проверяем наличие сообщения в очереди
   FUNCTION LF_CHECK_MESSAGE (
              PR_QUEUE_NAME    IN VARCHAR2
             ,PR_DEQ_CONDITION IN VARCHAR2 DEFAULT NULL
            )
     RETURN BOOLEAN
  IS
    DEQ_CT    DBMS_AQ.DEQUEUE_OPTIONS_T := LV_DEQ_CT;
    ENQ_MSGID RAW(16);
  BEGIN
    DEQ_CT.WAIT         := DBMS_AQ.NO_WAIT;
    DEQ_CT.NAVIGATION   := DBMS_AQ.FIRST_MESSAGE;
    DEQ_CT.DEQUEUE_MODE := DBMS_AQ.BROWSE;
 
    DEQ_CT.DEQ_CONDITION := PR_DEQ_CONDITION;
 
    DBMS_AQ.DEQUEUE (
      dequeue_options    => DEQ_CT
     ,message_properties => LV_MSG_PROP
     ,msgid              => enq_msgid
     ,payload            => LV_READED_MSG
     ,queue_name         => LF_EXTEND_NAME(pr_name => LF_GET_QUEUE_NAME(PR_QUEUE_NAME))
    );
    RETURN TRUE;
  EXCEPTION
    WHEN E_QUEUE_TIMEOUT
      OR E_NO_QUEUE
    THEN
      RETURN FALSE;
  END;
 
  -- Проверяем наличие TOUCH по пайпу (в случае задания даты - после неё)
   FUNCTION LF_CHECK_TOUCH (
              PR_QUEUE_NAME IN VARCHAR2
             ,PR_DATE       IN DATE     DEFAULT NULL
            )
     RETURN BOOLEAN
  IS
  BEGIN
    RETURN LF_CHECK_MESSAGE (
              pr_queue_name    => LC_TOUCH_PIPE_NAME
             ,pr_deq_condition => 'tab.user_data.extract(''/message/varchar2[position()= 1 and text() = '''''||PR_QUEUE_NAME||''''']'') is not null'||
                                  CASE
                                    WHEN PR_DATE IS NULL THEN
                                      NULL
                                    ELSE
                                      ' and tab.enq_time >= to_date('''||to_char(PR_DATE,'dd.mm.yyyy hh24:mi:ss')||''',''dd.mm.yyyy hh24:mi:ss'')'
                                  END
           );
  END;
 
  -- Проверка канала на устаревание
  PROCEDURE LP_CHECK_PIPE_QUEUE (
              PR_QUEUE_NAME IN VARCHAR2 DEFAULT NULL -- имя очереди канала
            )
  IS
  BEGIN
    IF PR_QUEUE_NAME IS NOT NULL THEN -- Если очередь задана
      IF NOT LF_CHECK_TOUCH ( -- Если не было сигналов более периода на очистку пустых каналов
               pr_queue_name => PR_QUEUE_NAME
              ,pr_date       => SYSDATE - LC_TOUCH_PERIOD * LC_TOUCH_LIMIT
             )
      THEN
        IF NOT LF_CHECK_TOUCH (pr_queue_name => PR_QUEUE_NAME) -- Если вообще не было сигналов по каналу
        THEN
          LP_TOUCH ( pipename => PR_QUEUE_NAME ); -- Создаём отметку о том, что мы "тронули" канал
        ELSE -- иначе
          IF NOT LF_CHECK_MESSAGE (pr_queue_name => PR_QUEUE_NAME) OR -- Если канал пуст
             NOT LF_CHECK_TOUCH (                                     -- Или если  не  было  сигналов  дольше
                      pr_queue_name => PR_QUEUE_NAME                  -- периода на удаление непустых каналов
                     ,pr_date       => SYSDATE - LC_TOUCH_PERIOD * LC_TOUCH_ANY_LIMIT
                    )
          THEN
            LV_RESULT := REMOVE_PIPE ( pipename => PR_QUEUE_NAME ); -- Удаляем канал
          END IF;
        END IF;
      END IF;
    ELSE -- Если очередь не задана, то выполняем для каждой из существующих очередей
      FOR REC IN (
        SELECT ALQE.NAME
          FROM ALL_QUEUES ALQE
         WHERE ALQE.OWNER   = LV_SCHEMA_NAME
           AND ALQE.NAME LIKE 'UPQ$%'
           AND ALQE.NAME   != LC_TOUCH_PIPE_NAME
      ) LOOP
        LP_CHECK_PIPE_QUEUE( pr_queue_name => REC.NAME);
      END LOOP;
      LP_PURGE_EPSENT_TOUCHE; -- Очищаем сигналы по несуществующим очередям
    END IF;
  END;
 
  -- Создание канала
   FUNCTION CREATE_PIPE (
               PIPENAME    IN VARCHAR2              -- Имя канала
              ,MAXPIPESIZE IN INTEGER DEFAULT 8192  -- Не используется
              ,PRIVATE     IN BOOLEAN DEFAULT FALSE -- Не используется
            )
     RETURN INTEGER
  IS
  BEGIN
    LP_CREATE_QUEUE (
      pipename => PIPENAME
    );
    LP_TOUCH (
      pipename => PIPENAME
    );
    RETURN 0;
  END;
 
  -- Создание канала
   FUNCTION REMOVE_PIPE (
               PIPENAME    IN VARCHAR2 -- Имя канала
            )
     RETURN INTEGER
  IS
    PRAGMA AUTONOMOUS_TRANSACTION;
    LV_QUEUE_NAME ALL_QUEUES.NAME%TYPE := LF_GET_QUEUE_NAME(pr_name => PIPENAME);
  BEGIN
    -- Пытаемся удалить канал по имени канала, а если не удалось, то по имени очереди
    FOR I IN 1..2 LOOP
      FOR REC IN (
        SELECT ALQE.NAME                  AS NAME
              ,TRIM(ALQE.ENQUEUE_ENABLED) AS ENQUEUE_ENABLED
              ,TRIM(ALQE.DEQUEUE_ENABLED) AS DEQUEUE_ENABLED
          FROM ALL_QUEUES ALQE
         WHERE ALQE.OWNER = LV_SCHEMA_NAME
           AND ALQE.NAME  = LV_QUEUE_NAME
      ) LOOP
        IF REC.ENQUEUE_ENABLED = 'YES' OR
           REC.DEQUEUE_ENABLED = 'YES'
        THEN
          DBMS_AQADM.STOP_QUEUE(queue_name => LF_EXTEND_NAME(pr_name => REC.NAME));
        END IF;
        DBMS_AQADM.DROP_QUEUE(queue_name => LF_EXTEND_NAME(pr_name => REC.NAME));
        EXIT;
      END LOOP;
      IF UPPER(PIPENAME) NOT LIKE 'UPQ$%' OR
         LENGTH(PIPENAME) != 24
      THEN
        EXIT;
      ELSE
        LV_QUEUE_NAME := UPPER(PIPENAME);
      END IF;
    END LOOP;
    -- Очищаем сигналы по несуществующим каналам
    LP_PURGE_EPSENT_TOUCHE(PIPENAME);
    COMMIT;
    RETURN 0;
  END;
 
  -- Упаковка строки в сообщение
  PROCEDURE PACK_MESSAGE (
              ITEM IN VARCHAR2 CHARACTER SET ANY_CS
            )
  IS
  BEGIN
    LV_NEW_MESSAGE.EXTEND(1);
    LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).TYPE_ID    := LC_VARCHAR2_TYPEID;
    LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).V_VARCHAR2 := ITEM;
  END;
 
  -- Упаковка числа в сообщение
  PROCEDURE PACK_MESSAGE (
              ITEM IN NUMBER
            )
  IS
  BEGIN
    LV_NEW_MESSAGE.EXTEND(1);
    LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).TYPE_ID  := LC_NUMBER_TYPEID;
    LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).V_NUMBER := ITEM;
  END;
 
  -- Упаковка даты в сообщение
  PROCEDURE PACK_MESSAGE (
              ITEM IN DATE
            )
  IS
  BEGIN
    LV_NEW_MESSAGE.EXTEND(1);
    LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).TYPE_ID  := LC_DATE_TYPEID;
    LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).V_DATE   := ITEM;
  END;
 
  -- Упаковка raw в сообщение
  PROCEDURE PACK_MESSAGE_RAW (
              ITEM IN RAW
            )
  IS
  BEGIN
    LV_NEW_MESSAGE.EXTEND(1);
    LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).TYPE_ID := LC_RAW_TYPEID;
    LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).V_RAW   := ITEM;
  END;
 
  -- Упаковка rowid/urowid в сообщение
  PROCEDURE PACK_MESSAGE_ROWID (
              ITEM IN UROWID
            )
  IS
  BEGIN
    LV_NEW_MESSAGE.EXTEND(1);
    LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).TYPE_ID  := LC_UROWID_TYPEID;
    LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).V_UROWID := ITEM;
  END;
 
  -- Получение типа следующего элемента сообщения
   FUNCTION NEXT_ITEM_TYPE
     RETURN INTEGER
  IS
  BEGIN
    IF LV_READED_TBL.COUNT > LV_READED_POS THEN
      RETURN LV_READED_TBL(LV_READED_POS+1).TYPE_ID;
    END IF;
    RETURN 0;
  END;
 
  -- Построение XML представления сообщения
   FUNCTION CONSTRUCT_MESSAGE
     RETURN XMLTYPE
  IS
    doc        dbms_xmldom.DOMDocument;
    node       dbms_xmldom.DOMNode;
    elem       dbms_xmldom.DOMElement;
    curr       dbms_xmldom.DOMNode;
    curr1      dbms_xmldom.DOMNode;
    text       dbms_xmldom.DOMText;
    nodeName   VARCHAR2(30);
 
  BEGIN
    doc := dbms_xmldom.createDocument('http://www.w3.org/2001/XMLSchema', null, null);
    dbms_xmldom.setVersion(doc, '1.0');
    dbms_xmldom.setCharset(doc, 'UTF8');
    node := dbms_xmldom.makeNode(doc);
    elem := dbms_xmldom.createElement(doc, 'message');
    curr := dbms_xmldom.appendChild(node, dbms_xmldom.makeNode(elem));
    FOR I IN 1..LV_NEW_MESSAGE.COUNT LOOP
      nodeName := LV_NAMES(LV_NEW_MESSAGE(I).TYPE_ID);
      elem  := dbms_xmldom.createElement(doc, nodeName);
      curr1 := dbms_xmldom.appendChild(curr, dbms_xmldom.makeNode(elem));
      CASE LV_NEW_MESSAGE(I).TYPE_ID
        WHEN LC_NUMBER_TYPEID THEN
           text  := dbms_xmldom.createTextNode(doc, rawtohex(utl_raw.cast_from_number(LV_NEW_MESSAGE(I).V_NUMBER)));
        WHEN LC_VARCHAR2_TYPEID THEN
           text  := dbms_xmldom.createTextNode(doc, LV_NEW_MESSAGE(I).V_VARCHAR2);
        WHEN LC_DATE_TYPEID THEN
           text  := dbms_xmldom.createTextNode(doc, TO_CHAR(LV_NEW_MESSAGE(I).V_DATE, 'YYYY-MM-DD')||'T'||TO_CHAR(LV_NEW_MESSAGE(I).V_DATE, 'HH24:MI:SS'));
        WHEN LC_RAW_TYPEID THEN
           text := dbms_xmldom.createTextNode(doc, RAWTOHEX(LV_NEW_MESSAGE(I).V_RAW));
        WHEN LC_UROWID_TYPEID THEN
           text := dbms_xmldom.createTextNode(doc, LV_NEW_MESSAGE(I).V_UROWID);
        ELSE
           text := dbms_xmldom.createTextNode(doc, NULL);
      END CASE;
      curr1 := dbms_xmldom.appendChild(curr1, dbms_xmldom.makeNode(text));
    END LOOP;
    LV_NEW_MESSAGE.DELETE();
    RETURN dbms_xmldom.getXMLTYPE(doc);
  END;
 
  -- Очистка буфферов сообщений
  PROCEDURE RESET_BUFFER IS
  BEGIN
    LV_NEW_MESSAGE.DELETE();
    LV_READED_POS := 0;
  END;
 
  -- Получение уникального имени для сессии
   FUNCTION UNIQUE_SESSION_NAME
     RETURN VARCHAR2
  IS
  BEGIN
    RETURN SYS.DBMS_PIPE.UNIQUE_SESSION_NAME;
  END;
 
  -- Отправка текстового XML сообщения в канал
   FUNCTION SEND_MESSAGE (
              PIPENAME    IN VARCHAR2                -- Имя канала
             ,MESSAGE     IN CLOB                    -- XML текст сообщение
             ,TIMEOUT     IN INTEGER DEFAULT MAXWAIT -- Не используется
             ,MAXPIPESIZE IN INTEGER DEFAULT 8192    -- Не используется
            )
     RETURN INTEGER
  IS
    V_RESULT INTEGER;
  BEGIN
    V_RESULT:= SEND_MESSAGE (
                  pipename    => PIPENAME
                 ,message     => XMLType.createXML(message)
                 ,timeout     => TIMEOUT
                 ,maxpipesize => MAXPIPESIZE
               );
    LV_NEW_MESSAGE.DELETE();
    RETURN V_RESULT;
  END;
 
  -- Отправка XML сообщения в канал
   FUNCTION SEND_MESSAGE (
              PIPENAME    IN VARCHAR2                -- Имя канала
             ,MESSAGE     IN XMLTYPE                 -- XML сообщение
             ,TIMEOUT     IN INTEGER DEFAULT MAXWAIT -- Не используется
             ,MAXPIPESIZE IN INTEGER DEFAULT 8192    -- Не используется
            )
     RETURN INTEGER
   IS
     ENQ_MSGID  RAW(16);
     LE_NOQUEUE EXCEPTION;
         PRAGMA EXCEPTION_INIT(LE_NOQUEUE, -025205);
   BEGIN
     DBMS_AQ.ENQUEUE (
       queue_name         => LF_EXTEND_NAME(pr_name => LF_GET_QUEUE_NAME (pr_name => PIPENAME))
      ,enqueue_options    => LV_ENQ_CT
      ,message_properties => LV_MSG_PROP
      ,payload            => MESSAGE
      ,msgid              => ENQ_MSGID
     );
     IF UPPER(PIPENAME) NOT LIKE 'UPQ$%' THEN
       LP_TOUCH(pipename => PIPENAME);
     END IF;
     RETURN 0;
   EXCEPTION
     WHEN LE_NOQUEUE THEN
       LP_CREATE_QUEUE (pipename => PIPENAME);
       IF UPPER(PIPENAME) = LC_TOUCH_PIPE_NAME THEN
         LP_CONSTRUCT;
       END IF;
       RETURN SEND_MESSAGE (
                 pipename    => PIPENAME
                ,message     => MESSAGE
                ,timeout     => TIMEOUT
                ,maxpipesize => MAXPIPESIZE
              );
   END;
 
  -- Отправка сообщения, построенного из упакованных в буффер данных, в канал
   FUNCTION SEND_MESSAGE (
              PIPENAME    IN VARCHAR2                -- Имя канала
             ,TIMEOUT     IN INTEGER DEFAULT MAXWAIT -- Не используется
             ,MAXPIPESIZE IN INTEGER DEFAULT 8192    -- Не используется
            )
     RETURN INTEGER
  IS
  BEGIN
    RETURN SEND_MESSAGE (
              pipename    => PIPENAME
             ,message     => CONSTRUCT_MESSAGE
             ,timeout     => TIMEOUT
             ,maxpipesize => MAXPIPESIZE
           );
  END;
 
  -- Начальное наполнение переменных
  PROCEDURE LP_INIT IS
  BEGIN
 
    SELECT USERNAME
      INTO LV_SCHEMA_NAME
      FROM SYS.ALL_USERS
     WHERE USER_ID = USERENV('SCHEMAID');
 
    LV_PURGEOPT.BLOCK         := TRUE;
    LV_PURGEOPT.DELIVERY_MODE := DBMS_AQADM.BUFFERED;
 
 
    LV_ENQ_CT.VISIBILITY      := DBMS_AQ.IMMEDIATE;
    LV_ENQ_CT.DELIVERY_MODE   := DBMS_AQ.BUFFERED;
 
    LV_MSG_PROP.DELAY         := DBMS_AQ.NO_DELAY;
    LV_MSG_PROP.EXPIRATION    := DBMS_AQ.NEVER;
 
    LV_DEQ_CT.DEQUEUE_MODE    := DBMS_AQ.REMOVE;
    LV_DEQ_CT.VISIBILITY      := DBMS_AQ.IMMEDIATE;
    LV_DEQ_CT.DELIVERY_MODE   := DBMS_AQ.BUFFERED;
    LV_DEQ_CT.NAVIGATION      := DBMS_AQ.FIRST_MESSAGE;
 
    LV_NEW_MESSAGE            := LTP_MSG_DETAILS();
    LV_READED_TBL             := LTP_MSG_DETAILS();
 
    LV_NAMES(LC_DATE_TYPEID)     := 'date';
    LV_NAMES(LC_NUMBER_TYPEID)   := 'number';
    LV_NAMES(LC_VARCHAR2_TYPEID) := 'varchar2';
    LV_NAMES(LC_RAW_TYPEID)      := 'raw';
    LV_NAMES(LC_UROWID_TYPEID)   := 'urowid';
 
  END;
 
  -- Получение сообщения из канала
   FUNCTION RECEIVE_MESSAGE (
              PIPENAME IN VARCHAR2,               -- Имя канала
              TIMEOUT  IN INTEGER DEFAULT MAXWAIT -- Время ожидания
            )
     RETURN INTEGER
  IS
  BEGIN
     RETURN L_RECEIVE_MESSAGE (
              pipename => PIPENAME
             ,timeout  => TIMEOUT
            );
  END;
 
  -- Получение прочитанного из канала XML сообщения
   FUNCTION UNPACK_MESSAGE
     RETURN XMLTYPE
  IS
  BEGIN
    RETURN LV_READED_MSG;
  END;
 
  -- Распаковка прочитанного из канала XML (если оно построено через буффер упаковки или соответствует формату!!!)
  PROCEDURE LP_UNPACK_MESSAGE
  IS
  BEGIN
    IF LV_READED_POS = -1 THEN
      LV_READED_TBL.DELETE();
      LV_READED_POS := 0;
      IF LV_READED_MSG IS NOT NULL THEN
        DECLARE
          doc        dbms_xmldom.DOMDocument;
          elem       dbms_xmldom.DOMElement;
          nodeList   dbms_xmldom.DOMNodeList;
          node       dbms_xmldom.DOMNode;
          text       dbms_xmldom.DOMNode;
        BEGIN
          doc      := dbms_xmldom.newDOMDocument(LV_READED_MSG);
          elem     := dbms_xmldom.getDocumentElement(doc);
          nodeList := dbms_xmldom.getElementsByTagName(elem, '*');
          FOR I IN 0..dbms_xmldom.getLength(nodeList)-1 LOOP
            node := dbms_xmldom.item(nodeList, i);
            text := dbms_xmldom.getFirstChild(node);
            LV_READED_TBL.EXTEND(1);
            CASE dbms_xmldom.getNodeName(node)
              WHEN 'number' THEN
                LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID    := LC_NUMBER_TYPEID;
                LV_READED_TBL(LV_READED_TBL.COUNT).V_NUMBER   := UTL_RAW.CAST_TO_NUMBER(hextoraw(dbms_xmldom.getNodeValue(text)));
              WHEN 'varchar2' THEN
                LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID    := LC_VARCHAR2_TYPEID;
                LV_READED_TBL(LV_READED_TBL.COUNT).V_VARCHAR2 := dbms_xmldom.getNodeValue(text);
              WHEN 'date' THEN
                LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID    := LC_DATE_TYPEID;
                LV_READED_TBL(LV_READED_TBL.COUNT).V_DATE     := TO_DATE(TRANSLATE(dbms_xmldom.getNodeValue(text),'T',' '),'YYYY-MM-DD HH24:MI:SS');
              WHEN 'urowid' THEN
                LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID    := LC_UROWID_TYPEID;
                LV_READED_TBL(LV_READED_TBL.COUNT).V_UROWID   := dbms_xmldom.getNodeValue(text);
              WHEN 'raw' THEN
                LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID    := LC_RAW_TYPEID;
                LV_READED_TBL(LV_READED_TBL.COUNT).V_RAW      := HEXTORAW(dbms_xmldom.getNodeValue(text));
              ELSE
                LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID    := LC_UNKNOWN_TYPEID;
            END CASE;
          END LOOP;
        END;
      END IF;
    END IF;
  END;
 
  -- Получить численное представление текущего элемента сообщения
  PROCEDURE UNPACK_MESSAGE (
              ITEM OUT NUMBER
            )
  IS
  BEGIN
    LP_UNPACK_MESSAGE;
    CASE NEXT_ITEM_TYPE
      WHEN 0 THEN
        RAISE LE_06556;
      WHEN LC_NUMBER_TYPEID THEN
        LV_READED_POS := LV_READED_POS+1;
        ITEM := LV_READED_TBL(LV_READED_POS).V_NUMBER;
      ELSE
        RAISE LE_06559;
    END CASE;
  END;
 
  -- Получить строковое представление текущего элемента сообщения
  PROCEDURE UNPACK_MESSAGE (
              ITEM OUT VARCHAR2
            )
  IS
  BEGIN
    LP_UNPACK_MESSAGE;
    CASE NEXT_ITEM_TYPE
      WHEN 0 THEN
        RAISE LE_06556;
      WHEN LC_VARCHAR2_TYPEID THEN
        LV_READED_POS := LV_READED_POS+1;
        ITEM := LV_READED_TBL(LV_READED_POS).V_VARCHAR2;
      ELSE
        RAISE LE_06559;
    END CASE;
  END;
 
  -- Получить представление текущего элемента сообщения в виде даты
  PROCEDURE UNPACK_MESSAGE (
              ITEM OUT DATE
            )
  IS
  BEGIN
    LP_UNPACK_MESSAGE;
    CASE NEXT_ITEM_TYPE
      WHEN 0 THEN
        RAISE LE_06556;
      WHEN LC_DATE_TYPEID THEN
        LV_READED_POS := LV_READED_POS+1;
        ITEM := LV_READED_TBL(LV_READED_POS).V_DATE;
      ELSE
        RAISE LE_06559;
    END CASE;
  END;
 
  -- Получить представление текущего элемента сообщения в виде raw
  PROCEDURE UNPACK_MESSAGE_RAW (
              ITEM OUT RAW
            )
  IS
  BEGIN
    LP_UNPACK_MESSAGE;
    CASE NEXT_ITEM_TYPE
      WHEN 0 THEN
        RAISE LE_06556;
      WHEN LC_RAW_TYPEID THEN
        LV_READED_POS := LV_READED_POS+1;
        ITEM := LV_READED_TBL(LV_READED_POS).V_RAW;
      ELSE
        RAISE LE_06559;
    END CASE;
  END;
 
  -- Получить представление текущего элемента сообщения в виде rowid
  PROCEDURE UNPACK_MESSAGE_ROWID (
              ITEM OUT UROWID
            )
  IS
  BEGIN
    LP_UNPACK_MESSAGE;
    CASE NEXT_ITEM_TYPE
      WHEN 0 THEN
        RAISE LE_06556;
      WHEN LC_UROWID_TYPEID THEN
        LV_READED_POS := LV_READED_POS+1;
        ITEM := LV_READED_TBL(LV_READED_POS).V_UROWID;
      ELSE
        RAISE LE_06559;
    END CASE;
  END;
 
  -- Очистка заданного канала
  PROCEDURE PURGE (
              PIPENAME IN VARCHAR2
            )
  IS
    PRAGMA AUTONOMOUS_TRANSACTION;
  BEGIN
 
    LP_PURGE ( pipename => PIPENAME );
 
    LV_READED_POS := 0;
    LV_READED_TBL.DELETE();
    LV_NEW_MESSAGE.DELETE();
 
  END;
 
  -- Процесс очистки старых каналов
  PROCEDURE PACK (
              PR_CONSTRUCT IN BOOLEAN DEFAULT FALSE
            )
  IS
    PRAGMA AUTONOMOUS_TRANSACTION;
  BEGIN
    IF PR_CONSTRUCT THEN
      LP_CONSTRUCT;
    END IF;
    LP_CHECK_PIPE_QUEUE;
    COMMIT;
  EXCEPTION
    WHEN OTHERS THEN
      COMMIT;
  END;
 
  -- Удаление внутренних структур UTL_PIPE.
  -- Доступна только владельцу или суперпользователю
  PROCEDURE DESTROY IS
    LV_CNT    INTEGER;
    E_NO_PRIV EXCEPTION;
       PRAGMA EXCEPTION_INIT(E_NO_PRIV, -01031);
  BEGIN
    -- Проверка прав
    IF USER != 'SYS' AND
       USER != LV_SCHEMA_NAME
    THEN
      RAISE E_NO_PRIV;
    END IF;
   -- Удаляем задание
    SELECT COUNT(1)
      INTO LV_CNT
      FROM ALL_SCHEDULER_JOBS
     WHERE OWNER    = LV_SCHEMA_NAME
       AND JOB_NAME = LC_TOUCH_PROC_PFX||LC_TOUCH_JOB_SUFF;
    IF LV_CNT > 0 THEN
      DBMS_SCHEDULER.drop_job (
         job_name      => LF_EXTEND_NAME(pr_name => LC_TOUCH_PROC_PFX||LC_TOUCH_JOB_SUFF)
        ,force         => true
      );
    END IF;
   -- Удаляем программу
    SELECT COUNT(1)
      INTO LV_CNT
      FROM ALL_SCHEDULER_PROGRAMS
     WHERE OWNER        = LV_SCHEMA_NAME
       AND PROGRAM_NAME = LC_TOUCH_PROC_PFX||LC_TOUCH_PRG_SUFF;
    IF LV_CNT > 0 THEN
      DBMS_SCHEDULER.DROP_PROGRAM (
         program_name  => LF_EXTEND_NAME(pr_name => LC_TOUCH_PROC_PFX||LC_TOUCH_PRG_SUFF)
        ,force         => true
      );
    END IF;
   -- Удаляем расписание
    SELECT COUNT(1)
      INTO LV_CNT
      FROM ALL_SCHEDULER_SCHEDULES
     WHERE OWNER             = LV_SCHEMA_NAME
       AND SCHEDULE_NAME     = LC_TOUCH_PROC_PFX||LC_TOUCH_SCHED_SUFF;
    IF LV_CNT > 0 THEN
      DBMS_SCHEDULER.DROP_SCHEDULE (
         schedule_name =>  LF_EXTEND_NAME(pr_name => LC_TOUCH_PROC_PFX||LC_TOUCH_SCHED_SUFF)
        ,force         => true
      );
    END IF;
    -- Удаляем все внутренние очереди
    FOR REC IN (
      SELECT ALQE.NAME
            ,ALQE.ENQUEUE_ENABLED
            ,ALQE.DEQUEUE_ENABLED
        FROM ALL_QUEUES ALQE
       WHERE ALQE.OWNER       = LV_SCHEMA_NAME
         AND ALQE.QUEUE_TABLE = LC_QUEUE_TABLE_NAME
         AND ALQE.NAME     LIKE 'UPQ$%'
    ) LOOP
      IF TRIM(REC.ENQUEUE_ENABLED) = 'YES' OR
         TRIM(REC.DEQUEUE_ENABLED) = 'YES'
      THEN
        -- Останавливаем очередь, если необходимо
        DBMS_AQADM.STOP_QUEUE (queue_name => LF_EXTEND_NAME(pr_name => REC.NAME));
      END IF;
      -- Удаляем очередь
      DBMS_AQADM.DROP_QUEUE (queue_name => LF_EXTEND_NAME(pr_name => REC.NAME));
    END LOOP;
    -- Удаляем служебную таблицу для очередей
    FOR REC IN (
      SELECT AQTB.QUEUE_TABLE
        FROM ALL_QUEUE_TABLES AQTB
       WHERE AQTB.OWNER       = LV_SCHEMA_NAME
         AND AQTB.QUEUE_TABLE = LC_QUEUE_TABLE_NAME
    ) LOOP
      DBMS_AQADM.DROP_QUEUE_TABLE (queue_table => LF_EXTEND_NAME(pr_name => REC.QUEUE_TABLE));
    END LOOP;
 
  END;
 
BEGIN
 
  -- Начальная инициализация пакета
  LP_INIT;
 
END UTL_PIPE;
/