У DBMS_PIPE при использовании на Oracle RAC есть один момент - между узлами не работает.
В связи с этим годик назад я для себя сварганил код, который сейчас потребовалось включить в production код. Проблема в том, что в силу ряда обстоятельств как следует протестировать его не удастся и опа случится именно когда понадобится его использовать. В итоге пришёл к мысли, что следует поменять свой труд по кодированию на Ваш по тестированию :).
В общем - получился пакет (пока что его рыба) близкий по спецификации к utl_pipe. Желающие могут его модифицировать под свои задачи, но - в качестве спасибо - хотелось бы получить информацию по ошибкам и пожеланиям с Вашей стороны.
CREATE OR REPLACE PACKAGE UTL_PIPE
IS
MAXWAIT CONSTANT INTEGER := 86400000;
-- Создание канала
FUNCTION CREATE_PIPE (
PIPENAME IN VARCHAR2 -- Имя канала
,MAXPIPESIZE IN INTEGER DEFAULT 8192 -- Не используется
,PRIVATE IN BOOLEAN DEFAULT FALSE -- Не используется
)
RETURN INTEGER;
-- Создание канала
FUNCTION REMOVE_PIPE (
PIPENAME IN VARCHAR2 -- Имя канала
)
RETURN INTEGER;
-- Упаковка строки в сообщение
PROCEDURE PACK_MESSAGE (
ITEM IN VARCHAR2 CHARACTER SET ANY_CS
);
-- Упаковка числа в сообщение
PROCEDURE PACK_MESSAGE (
ITEM IN NUMBER
);
-- Упаковка даты в сообщение
PROCEDURE PACK_MESSAGE (
ITEM IN DATE
);
-- Упаковка raw в сообщение
PROCEDURE PACK_MESSAGE_RAW (
ITEM IN RAW
);
-- Упаковка rowid/urowid в сообщение
PROCEDURE PACK_MESSAGE_ROWID (
ITEM IN UROWID
);
-- Получение типа следующего элемента сообщения
FUNCTION NEXT_ITEM_TYPE
RETURN INTEGER;
-- Построение XML представления сообщения
FUNCTION CONSTRUCT_MESSAGE
RETURN XMLTYPE;
-- Получение уникального имени для сессии
FUNCTION UNIQUE_SESSION_NAME
RETURN VARCHAR2;
-- Отправка текстового XML сообщения в канал
FUNCTION SEND_MESSAGE (
PIPENAME IN VARCHAR2 -- Имя канала
,MESSAGE IN CLOB -- XML текст сообщение
,TIMEOUT IN INTEGER DEFAULT MAXWAIT -- Не используется
,MAXPIPESIZE IN INTEGER DEFAULT 8192 -- Не используется
)
RETURN INTEGER;
-- Отправка XML сообщения в канал
FUNCTION SEND_MESSAGE (
PIPENAME IN VARCHAR2 -- Имя канала
,MESSAGE IN XMLTYPE -- XML сообщение
,TIMEOUT IN INTEGER DEFAULT MAXWAIT -- Не используется
,MAXPIPESIZE IN INTEGER DEFAULT 8192 -- Не используется
)
RETURN INTEGER;
-- Отправка сообщения, построенного из упакованных в буффер данных, в канал
FUNCTION SEND_MESSAGE (
PIPENAME IN VARCHAR2 -- Имя канала
,TIMEOUT IN INTEGER DEFAULT MAXWAIT -- Не используется
,MAXPIPESIZE IN INTEGER DEFAULT 8192 -- Не используется
)
RETURN INTEGER;
-- Получение сообщения из канала
FUNCTION RECEIVE_MESSAGE (
PIPENAME IN VARCHAR2, -- Имя канала
TIMEOUT IN INTEGER DEFAULT MAXWAIT -- Время ожидания
)
RETURN INTEGER;
-- Получение прочитанного из канала XML сообщения
FUNCTION UNPACK_MESSAGE
RETURN XMLTYPE;
-- Получить численное представление текущего элемента сообщения
PROCEDURE UNPACK_MESSAGE (
ITEM OUT NUMBER
);
-- Получить строковое представление текущего элемента сообщения
PROCEDURE UNPACK_MESSAGE (
ITEM OUT VARCHAR2
);
-- Получить представление текущего элемента сообщения в виде даты
PROCEDURE UNPACK_MESSAGE (
ITEM OUT DATE
);
-- Получить представление текущего элемента сообщения в виде raw
PROCEDURE UNPACK_MESSAGE_RAW (
ITEM OUT RAW
);
-- Получить представление текущего элемента сообщения в виде rowid
PROCEDURE UNPACK_MESSAGE_ROWID (
ITEM OUT UROWID
);
-- Очистка заданного канала
PROCEDURE PURGE (
PIPENAME IN VARCHAR2
);
-- Процесс очистки старых каналов
PROCEDURE PACK (
PR_CONSTRUCT IN BOOLEAN DEFAULT FALSE
);
-- Удаление внутренних структур UTL_PIPE.
-- Доступна только владельцу или суперпользователю
PROCEDURE DESTROY;
END UTL_PIPE;
/
CREATE OR REPLACE PACKAGE BODY UTL_PIPE
--
-- P.S.> По умолчанию размер буффера на очередь находится где-то в пределах 5101 сообщения.
-- расширить буффер можно путём изменения скрытого параметра:
-- _buffered_publisher_flow_control_threshold
-- Например:
-- alter system set "_buffered_publisher_flow_control_threshold"=500000 sid='*' scope=both
--
-- P.P.S.> В случае получения ошибки ORA-025306 необходимо правильно сконфигурировать сервер базы
-- данных в RAC окружении таким образом, чтобы корректно работали буфферизованные очереди
-- при посылке и чтении сообщений на разных узлах. Наличие данной ошибки говорит о некор-
-- ректности сетевых настроек базы данных в кластерном окружении
--
IS
LC_QUEUE_TABLE_NAME CONSTANT VARCHAR2(16) := 'UPQ$PIPE_STORAGE'; -- Имя таблицы для очередей
LC_TOUCH_PIPE_NAME CONSTANT VARCHAR2(20) := 'UPQ$PIPE$TOUCH$QUEUE'; -- Имя служебного PIPE
LC_TOUCH_PROC_PFX CONSTANT VARCHAR2(15) := 'UPQ$PIPE_TOUCH_'; -- Префикс для объектов Scheduler-а
LC_TOUCH_PRG_SUFF CONSTANT VARCHAR2(20) := 'PRG'; -- Суффикс для задания имени программы
LC_TOUCH_SCHED_SUFF CONSTANT VARCHAR2(20) := 'SCH'; -- Суффикс для задания имени расписания
LC_TOUCH_JOB_SUFF CONSTANT VARCHAR2(20) := 'JOB'; -- Суффикс для задания имени задания
LC_NUMBER_TYPEID CONSTANT INTEGER := 6;
LC_VARCHAR2_TYPEID CONSTANT INTEGER := 9;
LC_UROWID_TYPEID CONSTANT INTEGER := 11;
LC_DATE_TYPEID CONSTANT INTEGER := 12;
LC_RAW_TYPEID CONSTANT INTEGER := 23;
LC_UNKNOWN_TYPEID CONSTANT INTEGER := 0;
LV_SCHEMA_NAME SYS.ALL_USERS.USERNAME%TYPE := NULL; -- Имя схемы-владельца пакета
LV_ENQ_CT DBMS_AQ.ENQUEUE_OPTIONS_T;
LV_MSG_PROP DBMS_AQ.MESSAGE_PROPERTIES_T;
LV_PURGEOPT DBMS_AQADM.AQ$_PURGE_OPTIONS_T;
LV_DEQ_CT DBMS_AQ.DEQUEUE_OPTIONS_T;
TYPE LTP_MSG_REC IS RECORD (
TYPE_ID INTEGER
,V_NUMBER NUMBER
,V_DATE DATE
,V_UROWID UROWID
,V_VARCHAR2 VARCHAR2(32767)
,V_RAW RAW(32767)
);
TYPE LTP_MSG_DETAILS IS TABLE OF LTP_MSG_REC;
TYPE LTP_NAMES IS TABLE OF VARCHAR2(30) INDEX BY BINARY_INTEGER;
LC_TOUCH_PERIOD CONSTANT NUMBER := 1/24; -- Частота интервалов сигнализации сессии о том, что с pipe-ом
-- производится работа. (Час)
LC_TOUCH_LIMIT CONSTANT NUMBER := 4; -- Количество интервалов простоя после которых удаляются очереди
-- для пустых pipe. (4 часа).
LC_TOUCH_ANY_LIMIT CONSTANT NUMBER := LC_TOUCH_LIMIT*24; -- Количество интервалов простоя после которых удаляются очереди
-- для не пустых pipe. (4 дня).
TYPE LTP_TOUCH_LIST IS TABLE OF DATE INDEX BY ALL_QUEUES.NAME%TYPE;
LV_TOUCH_LIST LTP_TOUCH_LIST;
LV_NAMES LTP_NAMES;
LV_NEW_MESSAGE LTP_MSG_DETAILS;
LV_READED_MSG SYS.XMLTYPE;
LV_READED_TBL LTP_MSG_DETAILS;
LV_READED_POS INTEGER := 0;
LE_06556 EXCEPTION; -- ORA-06556: канал пуст, невозможно выполнить запрос unpack_message
PRAGMA EXCEPTION_INIT (LE_06556, -06556);
LE_06559 EXCEPTION; -- ORA-06559: Запрошен неверный тип данных...
PRAGMA EXCEPTION_INIT (LE_06559, -06559);
E_QUEUE_TIMEOUT EXCEPTION; -- ORA-25228: таймаут или окончание выборки во время снятия сообщения из очереди .
PRAGMA EXCEPTION_INIT(E_QUEUE_TIMEOUT, -25228);
E_NO_QUEUE EXCEPTION; -- ORA-25205: очередь не существует
PRAGMA EXCEPTION_INIT(E_NO_QUEUE, -25205);
LV_RESULT INTEGER;
-- Расхирение имени объекта до полного имени с учётом схемы и регистра
FUNCTION LF_EXTEND_NAME (
PR_NAME IN VARCHAR2
)
RETURN VARCHAR2
IS
LV_NAME VARCHAR2(98) := TRIM(PR_NAME);
BEGIN
IF SUBSTR(LV_NAME,1,1) != '"' OR
SUBSTR(LV_NAME,-1,1) != '"'
THEN
LV_NAME := '"'||UPPER(LV_NAME)||'"';
END IF;
RETURN '"'||LV_SCHEMA_NAME||'".'||LV_NAME;
END;
-- Преобразование имени канала или очереди к имени очереди
FUNCTION LF_GET_QUEUE_NAME (
PR_NAME IN VARCHAR2
)
RETURN VARCHAR2
IS
BEGIN
IF PR_NAME IS NULL OR
(UPPER(PR_NAME) LIKE 'UPQ$%' AND LENGTH(PR_NAME) <= 24)
THEN
RETURN UPPER(PR_NAME);
END IF;
RETURN SUBSTR (
SUBSTR('UPQ$'||UPPER(PR_NAME),1,14)||
DBMS_OBFUSCATION_TOOLKIT.MD5(input => UTL_RAW.CAST_TO_RAW(UPPER(PR_NAME)))
,1,24
);
END;
-- Создание очереди на служебной таблице
PROCEDURE LP_CREATE_QUEUE (
PIPENAME IN VARCHAR2
)
IS
PRAGMA AUTONOMOUS_TRANSACTION;
LV_QUEUE_NAME ALL_QUEUES.NAME%TYPE := LF_GET_QUEUE_NAME(pr_name => PIPENAME);
LV_CNT INTEGER;
BEGIN
-- Если отсутствует таблица для очередей - создаём
SELECT COUNT(1)
INTO LV_CNT
FROM ALL_QUEUE_TABLES AQTB
WHERE AQTB.OWNER = LV_SCHEMA_NAME
AND AQTB.QUEUE_TABLE = LC_QUEUE_TABLE_NAME;
IF LV_CNT = 0 THEN
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => LF_EXTEND_NAME(pr_name => LC_QUEUE_TABLE_NAME)
,queue_payload_type => 'SYS.XMLTYPE'
,compatible => '10.2'
,multiple_consumers => false
);
END IF;
-- Если отсутствует очередь - создаём
SELECT COUNT(1)
INTO LV_CNT
FROM ALL_QUEUES AQTB
WHERE AQTB.OWNER = LV_SCHEMA_NAME
AND AQTB.NAME = LV_QUEUE_NAME;
IF LV_CNT = 0 THEN
DBMS_AQADM.CREATE_QUEUE (
queue_name => LF_EXTEND_NAME(pr_name => LV_QUEUE_NAME)
,queue_table => LF_EXTEND_NAME(pr_name => LC_QUEUE_TABLE_NAME)
);
END IF;
-- Запускаем очередь
DBMS_AQADM.START_QUEUE (
queue_name => LF_EXTEND_NAME(pr_name => LV_QUEUE_NAME)
);
END;
-- Создание задания на очистку неиспользуемых pipe
PROCEDURE LP_CONSTRUCT
IS
LV_CNT INTEGER;
BEGIN
-- Если нет задания - создаём
SELECT COUNT(1)
INTO LV_CNT
FROM ALL_SCHEDULER_JOBS
WHERE OWNER = LV_SCHEMA_NAME
AND JOB_NAME = LC_TOUCH_PROC_PFX||LC_TOUCH_JOB_SUFF;
IF LV_CNT = 0 THEN
-- Если нет расписания - создаём
SELECT COUNT(1)
INTO LV_CNT
FROM ALL_SCHEDULER_SCHEDULES
WHERE OWNER = LV_SCHEMA_NAME
AND SCHEDULE_NAME = LC_TOUCH_PROC_PFX||LC_TOUCH_SCHED_SUFF;
IF LV_CNT = 0 THEN
DBMS_SCHEDULER.CREATE_SCHEDULE (
schedule_name => LC_TOUCH_PROC_PFX||LC_TOUCH_SCHED_SUFF
,start_date => TRUNC(SYSDATE,'HH24')+1/24
,repeat_interval => 'FREQ=HOURLY;INTERVAl=1'
);
END IF;
-- Если нет программы - создаём
SELECT COUNT(1)
INTO LV_CNT
FROM ALL_SCHEDULER_PROGRAMS
WHERE OWNER = LV_SCHEMA_NAME
AND PROGRAM_NAME = LC_TOUCH_PROC_PFX||LC_TOUCH_PRG_SUFF;
IF LV_CNT = 0 THEN
DBMS_SCHEDULER.CREATE_PROGRAM (
program_name => LC_TOUCH_PROC_PFX||LC_TOUCH_PRG_SUFF
,program_action => LF_EXTEND_NAME(pr_name => '"'||$$PLSQL_UNIT||'"."PACK"')
,program_type => 'STORED_PROCEDURE'
,number_of_arguments => 0
,enabled => TRUE
);
END IF;
-- создаём задание
DBMS_SCHEDULER.CREATE_JOB(
job_name => LC_TOUCH_PROC_PFX||LC_TOUCH_JOB_SUFF
,program_name => LC_TOUCH_PROC_PFX||LC_TOUCH_PRG_SUFF
,schedule_name => LC_TOUCH_PROC_PFX||LC_TOUCH_SCHED_SUFF
,enabled => TRUE
,auto_drop => FALSE
);
END IF;
END;
-- Чтение сообщения из канала
FUNCTION L_RECEIVE_MESSAGE (
PIPENAME IN VARCHAR2 -- имя канала
,TIMEOUT IN INTEGER -- timeout ожидания
,DEQ_CONDITION IN VARCHAR2 DEFAULT NULL -- условие выборки
)
RETURN INTEGER
IS
DEQ_CT DBMS_AQ.DEQUEUE_OPTIONS_T := LV_DEQ_CT;
ENQ_MSGID RAW(16);
BEGIN
LV_READED_TBL := LTP_MSG_DETAILS();
LV_READED_POS := 0;
DEQ_CT.WAIT := TIMEOUT;
IF DEQ_CONDITION IS NOT NULL THEN
DEQ_CT.DEQ_CONDITION := DEQ_CONDITION;
END IF;
DBMS_AQ.DEQUEUE (
dequeue_options => DEQ_CT
,message_properties => LV_MSG_PROP
,msgid => enq_msgid
,payload => LV_READED_MSG
,queue_name => LF_EXTEND_NAME(pr_name => LF_GET_QUEUE_NAME(pr_name => PIPENAME))
);
LV_READED_POS := -1;
RETURN 0;
EXCEPTION
WHEN E_QUEUE_TIMEOUT THEN -- Если прервались по timeout, то
RETURN 1; -- возвращаем сигнал об отсутствии сообщений
WHEN E_NO_QUEUE THEN -- Если не нашли очереди, то создаём её и
LP_CREATE_QUEUE (pipename => PIPENAME); -- повторяем действие
LP_CONSTRUCT;
RETURN L_RECEIVE_MESSAGE (
pipename => PIPENAME
,timeout => TIMEOUT
,deq_condition => DEQ_CONDITION
);
END;
-- Проверяем интервал, когда последний раз вызывася метод для конкретного канала в данной сессии
-- и если он превысил заданную величину - посылаем новый сигнал для указанного канала.
PROCEDURE LP_TOUCH (
PIPENAME IN VARCHAR2
)
IS
V_QUEUE_NAME ALL_QUEUES.NAME%TYPE := LF_GET_QUEUE_NAME(PIPENAME);
BEGIN
IF LV_TOUCH_LIST.EXISTS(V_QUEUE_NAME) AND
SYSDATE <= LV_TOUCH_LIST(V_QUEUE_NAME) + LC_TOUCH_PERIOD
THEN
RETURN;
END IF;
LV_TOUCH_LIST(V_QUEUE_NAME) := SYSDATE;
PACK_MESSAGE(item => V_QUEUE_NAME);
PACK_MESSAGE(item => LV_TOUCH_LIST(V_QUEUE_NAME));
LV_RESULT := SEND_MESSAGE(pipename => LC_TOUCH_PIPE_NAME);
WHILE L_RECEIVE_MESSAGE (
pipename => LC_TOUCH_PIPE_NAME
,timeout => 0
,deq_condition => 'tab.user_data.extract(''/message/varchar2[position()= 1 and text() = '''''||V_QUEUE_NAME||''''']'') is not null'
||' and '
||'tab.enq_time < to_date('''||to_char(LV_TOUCH_LIST(V_QUEUE_NAME),'dd.mm.yyyy hh24:mi:ss')||''',''dd.mm.yyyy hh24:mi:ss'') - 1/24/3600'
) = 0 LOOP
NULL;
END LOOP;
END;
-- Проверка на существование канала
FUNCTION LF_IS_PIPE_EXISTS (
PIPENAME IN VARCHAR2
)
RETURN BOOLEAN
IS
LV_QUEUE_NAME ALL_QUEUES.NAME%TYPE := LF_GET_QUEUE_NAME(pr_name => PIPENAME);
LV_CNT INTEGER;
BEGIN
SELECT COUNT(1)
INTO LV_CNT
FROM ALL_QUEUES ALQE
WHERE ALQE.OWNER = LV_SCHEMA_NAME
AND ALQE.NAME = LV_QUEUE_NAME;
RETURN LV_CNT != 0;
END;
-- Очискта канала с определённым фильтром
PROCEDURE LP_PURGE (
PIPENAME IN VARCHAR2
,PURGE_CONDITIONS IN VARCHAR2 DEFAULT NULL
)
IS
BEGIN
IF LF_IS_PIPE_EXISTS (
pipename => PIPENAME
)
THEN
DBMS_AQADM.PURGE_QUEUE_TABLE (
queue_table => LF_EXTEND_NAME(pr_name => LC_QUEUE_TABLE_NAME)
,purge_condition => 'queue = '''||LF_GET_QUEUE_NAME(pr_name => PIPENAME)||''''||
CASE
WHEN PURGE_CONDITIONS IS NULL THEN
''
ELSE
' and ('||PURGE_CONDITIONS||')'
END
,purge_options => LV_PURGEOPT
);
END IF;
END;
-- Удаление сигналов по пайпам для которых отсутствуют очереди
PROCEDURE LP_PURGE_EPSENT_TOUCHE (
PIPENAME IN VARCHAR2 DEFAULT NULL
) IS
BEGIN
LP_PURGE (
pipename => LC_TOUCH_PIPE_NAME
,purge_conditions => CASE
WHEN PIPENAME IS NULL THEN
'extract(user_data,''/message/varchar2[position()=1]/text()'').getStringVal() not in (select distinct queue from '||LF_EXTEND_NAME(pr_name => LC_QUEUE_TABLE_NAME)||')'
ELSE
'extract(user_data,''/message/varchar2[position()= 1 and text() = '''''||LF_GET_QUEUE_NAME(PIPENAME)||''''']'') is not null'
END
);
END;
-- Проверяем наличие сообщения в очереди
FUNCTION LF_CHECK_MESSAGE (
PR_QUEUE_NAME IN VARCHAR2
,PR_DEQ_CONDITION IN VARCHAR2 DEFAULT NULL
)
RETURN BOOLEAN
IS
DEQ_CT DBMS_AQ.DEQUEUE_OPTIONS_T := LV_DEQ_CT;
ENQ_MSGID RAW(16);
BEGIN
DEQ_CT.WAIT := DBMS_AQ.NO_WAIT;
DEQ_CT.NAVIGATION := DBMS_AQ.FIRST_MESSAGE;
DEQ_CT.DEQUEUE_MODE := DBMS_AQ.BROWSE;
DEQ_CT.DEQ_CONDITION := PR_DEQ_CONDITION;
DBMS_AQ.DEQUEUE (
dequeue_options => DEQ_CT
,message_properties => LV_MSG_PROP
,msgid => enq_msgid
,payload => LV_READED_MSG
,queue_name => LF_EXTEND_NAME(pr_name => LF_GET_QUEUE_NAME(PR_QUEUE_NAME))
);
RETURN TRUE;
EXCEPTION
WHEN E_QUEUE_TIMEOUT
OR E_NO_QUEUE
THEN
RETURN FALSE;
END;
-- Проверяем наличие TOUCH по пайпу (в случае задания даты - после неё)
FUNCTION LF_CHECK_TOUCH (
PR_QUEUE_NAME IN VARCHAR2
,PR_DATE IN DATE DEFAULT NULL
)
RETURN BOOLEAN
IS
BEGIN
RETURN LF_CHECK_MESSAGE (
pr_queue_name => LC_TOUCH_PIPE_NAME
,pr_deq_condition => 'tab.user_data.extract(''/message/varchar2[position()= 1 and text() = '''''||PR_QUEUE_NAME||''''']'') is not null'||
CASE
WHEN PR_DATE IS NULL THEN
NULL
ELSE
' and tab.enq_time >= to_date('''||to_char(PR_DATE,'dd.mm.yyyy hh24:mi:ss')||''',''dd.mm.yyyy hh24:mi:ss'')'
END
);
END;
-- Проверка канала на устаревание
PROCEDURE LP_CHECK_PIPE_QUEUE (
PR_QUEUE_NAME IN VARCHAR2 DEFAULT NULL -- имя очереди канала
)
IS
BEGIN
IF PR_QUEUE_NAME IS NOT NULL THEN -- Если очередь задана
IF NOT LF_CHECK_TOUCH ( -- Если не было сигналов более периода на очистку пустых каналов
pr_queue_name => PR_QUEUE_NAME
,pr_date => SYSDATE - LC_TOUCH_PERIOD * LC_TOUCH_LIMIT
)
THEN
IF NOT LF_CHECK_TOUCH (pr_queue_name => PR_QUEUE_NAME) -- Если вообще не было сигналов по каналу
THEN
LP_TOUCH ( pipename => PR_QUEUE_NAME ); -- Создаём отметку о том, что мы "тронули" канал
ELSE -- иначе
IF NOT LF_CHECK_MESSAGE (pr_queue_name => PR_QUEUE_NAME) OR -- Если канал пуст
NOT LF_CHECK_TOUCH ( -- Или если не было сигналов дольше
pr_queue_name => PR_QUEUE_NAME -- периода на удаление непустых каналов
,pr_date => SYSDATE - LC_TOUCH_PERIOD * LC_TOUCH_ANY_LIMIT
)
THEN
LV_RESULT := REMOVE_PIPE ( pipename => PR_QUEUE_NAME ); -- Удаляем канал
END IF;
END IF;
END IF;
ELSE -- Если очередь не задана, то выполняем для каждой из существующих очередей
FOR REC IN (
SELECT ALQE.NAME
FROM ALL_QUEUES ALQE
WHERE ALQE.OWNER = LV_SCHEMA_NAME
AND ALQE.NAME LIKE 'UPQ$%'
AND ALQE.NAME != LC_TOUCH_PIPE_NAME
) LOOP
LP_CHECK_PIPE_QUEUE( pr_queue_name => REC.NAME);
END LOOP;
LP_PURGE_EPSENT_TOUCHE; -- Очищаем сигналы по несуществующим очередям
END IF;
END;
-- Создание канала
FUNCTION CREATE_PIPE (
PIPENAME IN VARCHAR2 -- Имя канала
,MAXPIPESIZE IN INTEGER DEFAULT 8192 -- Не используется
,PRIVATE IN BOOLEAN DEFAULT FALSE -- Не используется
)
RETURN INTEGER
IS
BEGIN
LP_CREATE_QUEUE (
pipename => PIPENAME
);
LP_TOUCH (
pipename => PIPENAME
);
RETURN 0;
END;
-- Создание канала
FUNCTION REMOVE_PIPE (
PIPENAME IN VARCHAR2 -- Имя канала
)
RETURN INTEGER
IS
PRAGMA AUTONOMOUS_TRANSACTION;
LV_QUEUE_NAME ALL_QUEUES.NAME%TYPE := LF_GET_QUEUE_NAME(pr_name => PIPENAME);
BEGIN
-- Пытаемся удалить канал по имени канала, а если не удалось, то по имени очереди
FOR I IN 1..2 LOOP
FOR REC IN (
SELECT ALQE.NAME AS NAME
,TRIM(ALQE.ENQUEUE_ENABLED) AS ENQUEUE_ENABLED
,TRIM(ALQE.DEQUEUE_ENABLED) AS DEQUEUE_ENABLED
FROM ALL_QUEUES ALQE
WHERE ALQE.OWNER = LV_SCHEMA_NAME
AND ALQE.NAME = LV_QUEUE_NAME
) LOOP
IF REC.ENQUEUE_ENABLED = 'YES' OR
REC.DEQUEUE_ENABLED = 'YES'
THEN
DBMS_AQADM.STOP_QUEUE(queue_name => LF_EXTEND_NAME(pr_name => REC.NAME));
END IF;
DBMS_AQADM.DROP_QUEUE(queue_name => LF_EXTEND_NAME(pr_name => REC.NAME));
EXIT;
END LOOP;
IF UPPER(PIPENAME) NOT LIKE 'UPQ$%' OR
LENGTH(PIPENAME) != 24
THEN
EXIT;
ELSE
LV_QUEUE_NAME := UPPER(PIPENAME);
END IF;
END LOOP;
-- Очищаем сигналы по несуществующим каналам
LP_PURGE_EPSENT_TOUCHE(PIPENAME);
COMMIT;
RETURN 0;
END;
-- Упаковка строки в сообщение
PROCEDURE PACK_MESSAGE (
ITEM IN VARCHAR2 CHARACTER SET ANY_CS
)
IS
BEGIN
LV_NEW_MESSAGE.EXTEND(1);
LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).TYPE_ID := LC_VARCHAR2_TYPEID;
LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).V_VARCHAR2 := ITEM;
END;
-- Упаковка числа в сообщение
PROCEDURE PACK_MESSAGE (
ITEM IN NUMBER
)
IS
BEGIN
LV_NEW_MESSAGE.EXTEND(1);
LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).TYPE_ID := LC_NUMBER_TYPEID;
LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).V_NUMBER := ITEM;
END;
-- Упаковка даты в сообщение
PROCEDURE PACK_MESSAGE (
ITEM IN DATE
)
IS
BEGIN
LV_NEW_MESSAGE.EXTEND(1);
LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).TYPE_ID := LC_DATE_TYPEID;
LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).V_DATE := ITEM;
END;
-- Упаковка raw в сообщение
PROCEDURE PACK_MESSAGE_RAW (
ITEM IN RAW
)
IS
BEGIN
LV_NEW_MESSAGE.EXTEND(1);
LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).TYPE_ID := LC_RAW_TYPEID;
LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).V_RAW := ITEM;
END;
-- Упаковка rowid/urowid в сообщение
PROCEDURE PACK_MESSAGE_ROWID (
ITEM IN UROWID
)
IS
BEGIN
LV_NEW_MESSAGE.EXTEND(1);
LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).TYPE_ID := LC_UROWID_TYPEID;
LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).V_UROWID := ITEM;
END;
-- Получение типа следующего элемента сообщения
FUNCTION NEXT_ITEM_TYPE
RETURN INTEGER
IS
BEGIN
IF LV_READED_TBL.COUNT > LV_READED_POS THEN
RETURN LV_READED_TBL(LV_READED_POS+1).TYPE_ID;
END IF;
RETURN 0;
END;
-- Построение XML представления сообщения
FUNCTION CONSTRUCT_MESSAGE
RETURN XMLTYPE
IS
doc dbms_xmldom.DOMDocument;
node dbms_xmldom.DOMNode;
elem dbms_xmldom.DOMElement;
curr dbms_xmldom.DOMNode;
curr1 dbms_xmldom.DOMNode;
text dbms_xmldom.DOMText;
nodeName VARCHAR2(30);
BEGIN
doc := dbms_xmldom.createDocument('http://www.w3.org/2001/XMLSchema', null, null);
dbms_xmldom.setVersion(doc, '1.0');
dbms_xmldom.setCharset(doc, 'UTF8');
node := dbms_xmldom.makeNode(doc);
elem := dbms_xmldom.createElement(doc, 'message');
curr := dbms_xmldom.appendChild(node, dbms_xmldom.makeNode(elem));
FOR I IN 1..LV_NEW_MESSAGE.COUNT LOOP
nodeName := LV_NAMES(LV_NEW_MESSAGE(I).TYPE_ID);
elem := dbms_xmldom.createElement(doc, nodeName);
curr1 := dbms_xmldom.appendChild(curr, dbms_xmldom.makeNode(elem));
CASE LV_NEW_MESSAGE(I).TYPE_ID
WHEN LC_NUMBER_TYPEID THEN
text := dbms_xmldom.createTextNode(doc, rawtohex(utl_raw.cast_from_number(LV_NEW_MESSAGE(I).V_NUMBER)));
WHEN LC_VARCHAR2_TYPEID THEN
text := dbms_xmldom.createTextNode(doc, LV_NEW_MESSAGE(I).V_VARCHAR2);
WHEN LC_DATE_TYPEID THEN
text := dbms_xmldom.createTextNode(doc, TO_CHAR(LV_NEW_MESSAGE(I).V_DATE, 'YYYY-MM-DD')||'T'||TO_CHAR(LV_NEW_MESSAGE(I).V_DATE, 'HH24:MI:SS'));
WHEN LC_RAW_TYPEID THEN
text := dbms_xmldom.createTextNode(doc, RAWTOHEX(LV_NEW_MESSAGE(I).V_RAW));
WHEN LC_UROWID_TYPEID THEN
text := dbms_xmldom.createTextNode(doc, LV_NEW_MESSAGE(I).V_UROWID);
ELSE
text := dbms_xmldom.createTextNode(doc, NULL);
END CASE;
curr1 := dbms_xmldom.appendChild(curr1, dbms_xmldom.makeNode(text));
END LOOP;
LV_NEW_MESSAGE.DELETE();
RETURN dbms_xmldom.getXMLTYPE(doc);
END;
-- Очистка буфферов сообщений
PROCEDURE RESET_BUFFER IS
BEGIN
LV_NEW_MESSAGE.DELETE();
LV_READED_POS := 0;
END;
-- Получение уникального имени для сессии
FUNCTION UNIQUE_SESSION_NAME
RETURN VARCHAR2
IS
BEGIN
RETURN SYS.DBMS_PIPE.UNIQUE_SESSION_NAME;
END;
-- Отправка текстового XML сообщения в канал
FUNCTION SEND_MESSAGE (
PIPENAME IN VARCHAR2 -- Имя канала
,MESSAGE IN CLOB -- XML текст сообщение
,TIMEOUT IN INTEGER DEFAULT MAXWAIT -- Не используется
,MAXPIPESIZE IN INTEGER DEFAULT 8192 -- Не используется
)
RETURN INTEGER
IS
V_RESULT INTEGER;
BEGIN
V_RESULT:= SEND_MESSAGE (
pipename => PIPENAME
,message => XMLType.createXML(message)
,timeout => TIMEOUT
,maxpipesize => MAXPIPESIZE
);
LV_NEW_MESSAGE.DELETE();
RETURN V_RESULT;
END;
-- Отправка XML сообщения в канал
FUNCTION SEND_MESSAGE (
PIPENAME IN VARCHAR2 -- Имя канала
,MESSAGE IN XMLTYPE -- XML сообщение
,TIMEOUT IN INTEGER DEFAULT MAXWAIT -- Не используется
,MAXPIPESIZE IN INTEGER DEFAULT 8192 -- Не используется
)
RETURN INTEGER
IS
ENQ_MSGID RAW(16);
LE_NOQUEUE EXCEPTION;
PRAGMA EXCEPTION_INIT(LE_NOQUEUE, -025205);
BEGIN
DBMS_AQ.ENQUEUE (
queue_name => LF_EXTEND_NAME(pr_name => LF_GET_QUEUE_NAME (pr_name => PIPENAME))
,enqueue_options => LV_ENQ_CT
,message_properties => LV_MSG_PROP
,payload => MESSAGE
,msgid => ENQ_MSGID
);
IF UPPER(PIPENAME) NOT LIKE 'UPQ$%' THEN
LP_TOUCH(pipename => PIPENAME);
END IF;
RETURN 0;
EXCEPTION
WHEN LE_NOQUEUE THEN
LP_CREATE_QUEUE (pipename => PIPENAME);
IF UPPER(PIPENAME) = LC_TOUCH_PIPE_NAME THEN
LP_CONSTRUCT;
END IF;
RETURN SEND_MESSAGE (
pipename => PIPENAME
,message => MESSAGE
,timeout => TIMEOUT
,maxpipesize => MAXPIPESIZE
);
END;
-- Отправка сообщения, построенного из упакованных в буффер данных, в канал
FUNCTION SEND_MESSAGE (
PIPENAME IN VARCHAR2 -- Имя канала
,TIMEOUT IN INTEGER DEFAULT MAXWAIT -- Не используется
,MAXPIPESIZE IN INTEGER DEFAULT 8192 -- Не используется
)
RETURN INTEGER
IS
BEGIN
RETURN SEND_MESSAGE (
pipename => PIPENAME
,message => CONSTRUCT_MESSAGE
,timeout => TIMEOUT
,maxpipesize => MAXPIPESIZE
);
END;
-- Начальное наполнение переменных
PROCEDURE LP_INIT IS
BEGIN
SELECT USERNAME
INTO LV_SCHEMA_NAME
FROM SYS.ALL_USERS
WHERE USER_ID = USERENV('SCHEMAID');
LV_PURGEOPT.BLOCK := TRUE;
LV_PURGEOPT.DELIVERY_MODE := DBMS_AQADM.BUFFERED;
LV_ENQ_CT.VISIBILITY := DBMS_AQ.IMMEDIATE;
LV_ENQ_CT.DELIVERY_MODE := DBMS_AQ.BUFFERED;
LV_MSG_PROP.DELAY := DBMS_AQ.NO_DELAY;
LV_MSG_PROP.EXPIRATION := DBMS_AQ.NEVER;
LV_DEQ_CT.DEQUEUE_MODE := DBMS_AQ.REMOVE;
LV_DEQ_CT.VISIBILITY := DBMS_AQ.IMMEDIATE;
LV_DEQ_CT.DELIVERY_MODE := DBMS_AQ.BUFFERED;
LV_DEQ_CT.NAVIGATION := DBMS_AQ.FIRST_MESSAGE;
LV_NEW_MESSAGE := LTP_MSG_DETAILS();
LV_READED_TBL := LTP_MSG_DETAILS();
LV_NAMES(LC_DATE_TYPEID) := 'date';
LV_NAMES(LC_NUMBER_TYPEID) := 'number';
LV_NAMES(LC_VARCHAR2_TYPEID) := 'varchar2';
LV_NAMES(LC_RAW_TYPEID) := 'raw';
LV_NAMES(LC_UROWID_TYPEID) := 'urowid';
END;
-- Получение сообщения из канала
FUNCTION RECEIVE_MESSAGE (
PIPENAME IN VARCHAR2, -- Имя канала
TIMEOUT IN INTEGER DEFAULT MAXWAIT -- Время ожидания
)
RETURN INTEGER
IS
BEGIN
RETURN L_RECEIVE_MESSAGE (
pipename => PIPENAME
,timeout => TIMEOUT
);
END;
-- Получение прочитанного из канала XML сообщения
FUNCTION UNPACK_MESSAGE
RETURN XMLTYPE
IS
BEGIN
RETURN LV_READED_MSG;
END;
-- Распаковка прочитанного из канала XML (если оно построено через буффер упаковки или соответствует формату!!!)
PROCEDURE LP_UNPACK_MESSAGE
IS
BEGIN
IF LV_READED_POS = -1 THEN
LV_READED_TBL.DELETE();
LV_READED_POS := 0;
IF LV_READED_MSG IS NOT NULL THEN
DECLARE
doc dbms_xmldom.DOMDocument;
elem dbms_xmldom.DOMElement;
nodeList dbms_xmldom.DOMNodeList;
node dbms_xmldom.DOMNode;
text dbms_xmldom.DOMNode;
BEGIN
doc := dbms_xmldom.newDOMDocument(LV_READED_MSG);
elem := dbms_xmldom.getDocumentElement(doc);
nodeList := dbms_xmldom.getElementsByTagName(elem, '*');
FOR I IN 0..dbms_xmldom.getLength(nodeList)-1 LOOP
node := dbms_xmldom.item(nodeList, i);
text := dbms_xmldom.getFirstChild(node);
LV_READED_TBL.EXTEND(1);
CASE dbms_xmldom.getNodeName(node)
WHEN 'number' THEN
LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID := LC_NUMBER_TYPEID;
LV_READED_TBL(LV_READED_TBL.COUNT).V_NUMBER := UTL_RAW.CAST_TO_NUMBER(hextoraw(dbms_xmldom.getNodeValue(text)));
WHEN 'varchar2' THEN
LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID := LC_VARCHAR2_TYPEID;
LV_READED_TBL(LV_READED_TBL.COUNT).V_VARCHAR2 := dbms_xmldom.getNodeValue(text);
WHEN 'date' THEN
LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID := LC_DATE_TYPEID;
LV_READED_TBL(LV_READED_TBL.COUNT).V_DATE := TO_DATE(TRANSLATE(dbms_xmldom.getNodeValue(text),'T',' '),'YYYY-MM-DD HH24:MI:SS');
WHEN 'urowid' THEN
LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID := LC_UROWID_TYPEID;
LV_READED_TBL(LV_READED_TBL.COUNT).V_UROWID := dbms_xmldom.getNodeValue(text);
WHEN 'raw' THEN
LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID := LC_RAW_TYPEID;
LV_READED_TBL(LV_READED_TBL.COUNT).V_RAW := HEXTORAW(dbms_xmldom.getNodeValue(text));
ELSE
LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID := LC_UNKNOWN_TYPEID;
END CASE;
END LOOP;
END;
END IF;
END IF;
END;
-- Получить численное представление текущего элемента сообщения
PROCEDURE UNPACK_MESSAGE (
ITEM OUT NUMBER
)
IS
BEGIN
LP_UNPACK_MESSAGE;
CASE NEXT_ITEM_TYPE
WHEN 0 THEN
RAISE LE_06556;
WHEN LC_NUMBER_TYPEID THEN
LV_READED_POS := LV_READED_POS+1;
ITEM := LV_READED_TBL(LV_READED_POS).V_NUMBER;
ELSE
RAISE LE_06559;
END CASE;
END;
-- Получить строковое представление текущего элемента сообщения
PROCEDURE UNPACK_MESSAGE (
ITEM OUT VARCHAR2
)
IS
BEGIN
LP_UNPACK_MESSAGE;
CASE NEXT_ITEM_TYPE
WHEN 0 THEN
RAISE LE_06556;
WHEN LC_VARCHAR2_TYPEID THEN
LV_READED_POS := LV_READED_POS+1;
ITEM := LV_READED_TBL(LV_READED_POS).V_VARCHAR2;
ELSE
RAISE LE_06559;
END CASE;
END;
-- Получить представление текущего элемента сообщения в виде даты
PROCEDURE UNPACK_MESSAGE (
ITEM OUT DATE
)
IS
BEGIN
LP_UNPACK_MESSAGE;
CASE NEXT_ITEM_TYPE
WHEN 0 THEN
RAISE LE_06556;
WHEN LC_DATE_TYPEID THEN
LV_READED_POS := LV_READED_POS+1;
ITEM := LV_READED_TBL(LV_READED_POS).V_DATE;
ELSE
RAISE LE_06559;
END CASE;
END;
-- Получить представление текущего элемента сообщения в виде raw
PROCEDURE UNPACK_MESSAGE_RAW (
ITEM OUT RAW
)
IS
BEGIN
LP_UNPACK_MESSAGE;
CASE NEXT_ITEM_TYPE
WHEN 0 THEN
RAISE LE_06556;
WHEN LC_RAW_TYPEID THEN
LV_READED_POS := LV_READED_POS+1;
ITEM := LV_READED_TBL(LV_READED_POS).V_RAW;
ELSE
RAISE LE_06559;
END CASE;
END;
-- Получить представление текущего элемента сообщения в виде rowid
PROCEDURE UNPACK_MESSAGE_ROWID (
ITEM OUT UROWID
)
IS
BEGIN
LP_UNPACK_MESSAGE;
CASE NEXT_ITEM_TYPE
WHEN 0 THEN
RAISE LE_06556;
WHEN LC_UROWID_TYPEID THEN
LV_READED_POS := LV_READED_POS+1;
ITEM := LV_READED_TBL(LV_READED_POS).V_UROWID;
ELSE
RAISE LE_06559;
END CASE;
END;
-- Очистка заданного канала
PROCEDURE PURGE (
PIPENAME IN VARCHAR2
)
IS
PRAGMA AUTONOMOUS_TRANSACTION;
BEGIN
LP_PURGE ( pipename => PIPENAME );
LV_READED_POS := 0;
LV_READED_TBL.DELETE();
LV_NEW_MESSAGE.DELETE();
END;
-- Процесс очистки старых каналов
PROCEDURE PACK (
PR_CONSTRUCT IN BOOLEAN DEFAULT FALSE
)
IS
PRAGMA AUTONOMOUS_TRANSACTION;
BEGIN
IF PR_CONSTRUCT THEN
LP_CONSTRUCT;
END IF;
LP_CHECK_PIPE_QUEUE;
COMMIT;
EXCEPTION
WHEN OTHERS THEN
COMMIT;
END;
-- Удаление внутренних структур UTL_PIPE.
-- Доступна только владельцу или суперпользователю
PROCEDURE DESTROY IS
LV_CNT INTEGER;
E_NO_PRIV EXCEPTION;
PRAGMA EXCEPTION_INIT(E_NO_PRIV, -01031);
BEGIN
-- Проверка прав
IF USER != 'SYS' AND
USER != LV_SCHEMA_NAME
THEN
RAISE E_NO_PRIV;
END IF;
-- Удаляем задание
SELECT COUNT(1)
INTO LV_CNT
FROM ALL_SCHEDULER_JOBS
WHERE OWNER = LV_SCHEMA_NAME
AND JOB_NAME = LC_TOUCH_PROC_PFX||LC_TOUCH_JOB_SUFF;
IF LV_CNT > 0 THEN
DBMS_SCHEDULER.drop_job (
job_name => LF_EXTEND_NAME(pr_name => LC_TOUCH_PROC_PFX||LC_TOUCH_JOB_SUFF)
,force => true
);
END IF;
-- Удаляем программу
SELECT COUNT(1)
INTO LV_CNT
FROM ALL_SCHEDULER_PROGRAMS
WHERE OWNER = LV_SCHEMA_NAME
AND PROGRAM_NAME = LC_TOUCH_PROC_PFX||LC_TOUCH_PRG_SUFF;
IF LV_CNT > 0 THEN
DBMS_SCHEDULER.DROP_PROGRAM (
program_name => LF_EXTEND_NAME(pr_name => LC_TOUCH_PROC_PFX||LC_TOUCH_PRG_SUFF)
,force => true
);
END IF;
-- Удаляем расписание
SELECT COUNT(1)
INTO LV_CNT
FROM ALL_SCHEDULER_SCHEDULES
WHERE OWNER = LV_SCHEMA_NAME
AND SCHEDULE_NAME = LC_TOUCH_PROC_PFX||LC_TOUCH_SCHED_SUFF;
IF LV_CNT > 0 THEN
DBMS_SCHEDULER.DROP_SCHEDULE (
schedule_name => LF_EXTEND_NAME(pr_name => LC_TOUCH_PROC_PFX||LC_TOUCH_SCHED_SUFF)
,force => true
);
END IF;
-- Удаляем все внутренние очереди
FOR REC IN (
SELECT ALQE.NAME
,ALQE.ENQUEUE_ENABLED
,ALQE.DEQUEUE_ENABLED
FROM ALL_QUEUES ALQE
WHERE ALQE.OWNER = LV_SCHEMA_NAME
AND ALQE.QUEUE_TABLE = LC_QUEUE_TABLE_NAME
AND ALQE.NAME LIKE 'UPQ$%'
) LOOP
IF TRIM(REC.ENQUEUE_ENABLED) = 'YES' OR
TRIM(REC.DEQUEUE_ENABLED) = 'YES'
THEN
-- Останавливаем очередь, если необходимо
DBMS_AQADM.STOP_QUEUE (queue_name => LF_EXTEND_NAME(pr_name => REC.NAME));
END IF;
-- Удаляем очередь
DBMS_AQADM.DROP_QUEUE (queue_name => LF_EXTEND_NAME(pr_name => REC.NAME));
END LOOP;
-- Удаляем служебную таблицу для очередей
FOR REC IN (
SELECT AQTB.QUEUE_TABLE
FROM ALL_QUEUE_TABLES AQTB
WHERE AQTB.OWNER = LV_SCHEMA_NAME
AND AQTB.QUEUE_TABLE = LC_QUEUE_TABLE_NAME
) LOOP
DBMS_AQADM.DROP_QUEUE_TABLE (queue_table => LF_EXTEND_NAME(pr_name => REC.QUEUE_TABLE));
END LOOP;
END;
BEGIN
-- Начальная инициализация пакета
LP_INIT;
END UTL_PIPE;
/