У DBMS_PIPE при использовании на Oracle RAC есть один момент - между узлами не работает.
В связи с этим годик назад я для себя сварганил код, который сейчас потребовалось включить в production код. Проблема в том, что в силу ряда обстоятельств как следует протестировать его не удастся и опа случится именно когда понадобится его использовать. В итоге пришёл к мысли, что следует поменять свой труд по кодированию на Ваш по тестированию :).
В общем - получился пакет (пока что его рыба) близкий по спецификации к utl_pipe. Желающие могут его модифицировать под свои задачи, но - в качестве спасибо - хотелось бы получить информацию по ошибкам и пожеланиям с Вашей стороны.
В связи с этим годик назад я для себя сварганил код, который сейчас потребовалось включить в production код. Проблема в том, что в силу ряда обстоятельств как следует протестировать его не удастся и опа случится именно когда понадобится его использовать. В итоге пришёл к мысли, что следует поменять свой труд по кодированию на Ваш по тестированию :).
В общем - получился пакет (пока что его рыба) близкий по спецификации к utl_pipe. Желающие могут его модифицировать под свои задачи, но - в качестве спасибо - хотелось бы получить информацию по ошибкам и пожеланиям с Вашей стороны.
CREATE OR REPLACE PACKAGE UTL_PIPE IS MAXWAIT CONSTANT INTEGER := 86400000; -- Создание канала FUNCTION CREATE_PIPE ( PIPENAME IN VARCHAR2 -- Имя канала ,MAXPIPESIZE IN INTEGER DEFAULT 8192 -- Не используется ,PRIVATE IN BOOLEAN DEFAULT FALSE -- Не используется ) RETURN INTEGER; -- Создание канала FUNCTION REMOVE_PIPE ( PIPENAME IN VARCHAR2 -- Имя канала ) RETURN INTEGER; -- Упаковка строки в сообщение PROCEDURE PACK_MESSAGE ( ITEM IN VARCHAR2 CHARACTER SET ANY_CS ); -- Упаковка числа в сообщение PROCEDURE PACK_MESSAGE ( ITEM IN NUMBER ); -- Упаковка даты в сообщение PROCEDURE PACK_MESSAGE ( ITEM IN DATE ); -- Упаковка raw в сообщение PROCEDURE PACK_MESSAGE_RAW ( ITEM IN RAW ); -- Упаковка rowid/urowid в сообщение PROCEDURE PACK_MESSAGE_ROWID ( ITEM IN UROWID ); -- Получение типа следующего элемента сообщения FUNCTION NEXT_ITEM_TYPE RETURN INTEGER; -- Построение XML представления сообщения FUNCTION CONSTRUCT_MESSAGE RETURN XMLTYPE; -- Получение уникального имени для сессии FUNCTION UNIQUE_SESSION_NAME RETURN VARCHAR2; -- Отправка текстового XML сообщения в канал FUNCTION SEND_MESSAGE ( PIPENAME IN VARCHAR2 -- Имя канала ,MESSAGE IN CLOB -- XML текст сообщение ,TIMEOUT IN INTEGER DEFAULT MAXWAIT -- Не используется ,MAXPIPESIZE IN INTEGER DEFAULT 8192 -- Не используется ) RETURN INTEGER; -- Отправка XML сообщения в канал FUNCTION SEND_MESSAGE ( PIPENAME IN VARCHAR2 -- Имя канала ,MESSAGE IN XMLTYPE -- XML сообщение ,TIMEOUT IN INTEGER DEFAULT MAXWAIT -- Не используется ,MAXPIPESIZE IN INTEGER DEFAULT 8192 -- Не используется ) RETURN INTEGER; -- Отправка сообщения, построенного из упакованных в буффер данных, в канал FUNCTION SEND_MESSAGE ( PIPENAME IN VARCHAR2 -- Имя канала ,TIMEOUT IN INTEGER DEFAULT MAXWAIT -- Не используется ,MAXPIPESIZE IN INTEGER DEFAULT 8192 -- Не используется ) RETURN INTEGER; -- Получение сообщения из канала FUNCTION RECEIVE_MESSAGE ( PIPENAME IN VARCHAR2, -- Имя канала TIMEOUT IN INTEGER DEFAULT MAXWAIT -- Время ожидания ) RETURN INTEGER; -- Получение прочитанного из канала XML сообщения FUNCTION UNPACK_MESSAGE RETURN XMLTYPE; -- Получить численное представление текущего элемента сообщения PROCEDURE UNPACK_MESSAGE ( ITEM OUT NUMBER ); -- Получить строковое представление текущего элемента сообщения PROCEDURE UNPACK_MESSAGE ( ITEM OUT VARCHAR2 ); -- Получить представление текущего элемента сообщения в виде даты PROCEDURE UNPACK_MESSAGE ( ITEM OUT DATE ); -- Получить представление текущего элемента сообщения в виде raw PROCEDURE UNPACK_MESSAGE_RAW ( ITEM OUT RAW ); -- Получить представление текущего элемента сообщения в виде rowid PROCEDURE UNPACK_MESSAGE_ROWID ( ITEM OUT UROWID ); -- Очистка заданного канала PROCEDURE PURGE ( PIPENAME IN VARCHAR2 ); -- Процесс очистки старых каналов PROCEDURE PACK ( PR_CONSTRUCT IN BOOLEAN DEFAULT FALSE ); -- Удаление внутренних структур UTL_PIPE. -- Доступна только владельцу или суперпользователю PROCEDURE DESTROY; END UTL_PIPE; / CREATE OR REPLACE PACKAGE BODY UTL_PIPE -- -- P.S.> По умолчанию размер буффера на очередь находится где-то в пределах 5101 сообщения. -- расширить буффер можно путём изменения скрытого параметра: -- _buffered_publisher_flow_control_threshold -- Например: -- alter system set "_buffered_publisher_flow_control_threshold"=500000 sid='*' scope=both -- -- P.P.S.> В случае получения ошибки ORA-025306 необходимо правильно сконфигурировать сервер базы -- данных в RAC окружении таким образом, чтобы корректно работали буфферизованные очереди -- при посылке и чтении сообщений на разных узлах. Наличие данной ошибки говорит о некор- -- ректности сетевых настроек базы данных в кластерном окружении -- IS LC_QUEUE_TABLE_NAME CONSTANT VARCHAR2(16) := 'UPQ$PIPE_STORAGE'; -- Имя таблицы для очередей LC_TOUCH_PIPE_NAME CONSTANT VARCHAR2(20) := 'UPQ$PIPE$TOUCH$QUEUE'; -- Имя служебного PIPE LC_TOUCH_PROC_PFX CONSTANT VARCHAR2(15) := 'UPQ$PIPE_TOUCH_'; -- Префикс для объектов Scheduler-а LC_TOUCH_PRG_SUFF CONSTANT VARCHAR2(20) := 'PRG'; -- Суффикс для задания имени программы LC_TOUCH_SCHED_SUFF CONSTANT VARCHAR2(20) := 'SCH'; -- Суффикс для задания имени расписания LC_TOUCH_JOB_SUFF CONSTANT VARCHAR2(20) := 'JOB'; -- Суффикс для задания имени задания LC_NUMBER_TYPEID CONSTANT INTEGER := 6; LC_VARCHAR2_TYPEID CONSTANT INTEGER := 9; LC_UROWID_TYPEID CONSTANT INTEGER := 11; LC_DATE_TYPEID CONSTANT INTEGER := 12; LC_RAW_TYPEID CONSTANT INTEGER := 23; LC_UNKNOWN_TYPEID CONSTANT INTEGER := 0; LV_SCHEMA_NAME SYS.ALL_USERS.USERNAME%TYPE := NULL; -- Имя схемы-владельца пакета LV_ENQ_CT DBMS_AQ.ENQUEUE_OPTIONS_T; LV_MSG_PROP DBMS_AQ.MESSAGE_PROPERTIES_T; LV_PURGEOPT DBMS_AQADM.AQ$_PURGE_OPTIONS_T; LV_DEQ_CT DBMS_AQ.DEQUEUE_OPTIONS_T; TYPE LTP_MSG_REC IS RECORD ( TYPE_ID INTEGER ,V_NUMBER NUMBER ,V_DATE DATE ,V_UROWID UROWID ,V_VARCHAR2 VARCHAR2(32767) ,V_RAW RAW(32767) ); TYPE LTP_MSG_DETAILS IS TABLE OF LTP_MSG_REC; TYPE LTP_NAMES IS TABLE OF VARCHAR2(30) INDEX BY BINARY_INTEGER; LC_TOUCH_PERIOD CONSTANT NUMBER := 1/24; -- Частота интервалов сигнализации сессии о том, что с pipe-ом -- производится работа. (Час) LC_TOUCH_LIMIT CONSTANT NUMBER := 4; -- Количество интервалов простоя после которых удаляются очереди -- для пустых pipe. (4 часа). LC_TOUCH_ANY_LIMIT CONSTANT NUMBER := LC_TOUCH_LIMIT*24; -- Количество интервалов простоя после которых удаляются очереди -- для не пустых pipe. (4 дня). TYPE LTP_TOUCH_LIST IS TABLE OF DATE INDEX BY ALL_QUEUES.NAME%TYPE; LV_TOUCH_LIST LTP_TOUCH_LIST; LV_NAMES LTP_NAMES; LV_NEW_MESSAGE LTP_MSG_DETAILS; LV_READED_MSG SYS.XMLTYPE; LV_READED_TBL LTP_MSG_DETAILS; LV_READED_POS INTEGER := 0; LE_06556 EXCEPTION; -- ORA-06556: канал пуст, невозможно выполнить запрос unpack_message PRAGMA EXCEPTION_INIT (LE_06556, -06556); LE_06559 EXCEPTION; -- ORA-06559: Запрошен неверный тип данных... PRAGMA EXCEPTION_INIT (LE_06559, -06559); E_QUEUE_TIMEOUT EXCEPTION; -- ORA-25228: таймаут или окончание выборки во время снятия сообщения из очереди . PRAGMA EXCEPTION_INIT(E_QUEUE_TIMEOUT, -25228); E_NO_QUEUE EXCEPTION; -- ORA-25205: очередь не существует PRAGMA EXCEPTION_INIT(E_NO_QUEUE, -25205); LV_RESULT INTEGER; -- Расхирение имени объекта до полного имени с учётом схемы и регистра FUNCTION LF_EXTEND_NAME ( PR_NAME IN VARCHAR2 ) RETURN VARCHAR2 IS LV_NAME VARCHAR2(98) := TRIM(PR_NAME); BEGIN IF SUBSTR(LV_NAME,1,1) != '"' OR SUBSTR(LV_NAME,-1,1) != '"' THEN LV_NAME := '"'||UPPER(LV_NAME)||'"'; END IF; RETURN '"'||LV_SCHEMA_NAME||'".'||LV_NAME; END; -- Преобразование имени канала или очереди к имени очереди FUNCTION LF_GET_QUEUE_NAME ( PR_NAME IN VARCHAR2 ) RETURN VARCHAR2 IS BEGIN IF PR_NAME IS NULL OR (UPPER(PR_NAME) LIKE 'UPQ$%' AND LENGTH(PR_NAME) <= 24) THEN RETURN UPPER(PR_NAME); END IF; RETURN SUBSTR ( SUBSTR('UPQ$'||UPPER(PR_NAME),1,14)|| DBMS_OBFUSCATION_TOOLKIT.MD5(input => UTL_RAW.CAST_TO_RAW(UPPER(PR_NAME))) ,1,24 ); END; -- Создание очереди на служебной таблице PROCEDURE LP_CREATE_QUEUE ( PIPENAME IN VARCHAR2 ) IS PRAGMA AUTONOMOUS_TRANSACTION; LV_QUEUE_NAME ALL_QUEUES.NAME%TYPE := LF_GET_QUEUE_NAME(pr_name => PIPENAME); LV_CNT INTEGER; BEGIN -- Если отсутствует таблица для очередей - создаём SELECT COUNT(1) INTO LV_CNT FROM ALL_QUEUE_TABLES AQTB WHERE AQTB.OWNER = LV_SCHEMA_NAME AND AQTB.QUEUE_TABLE = LC_QUEUE_TABLE_NAME; IF LV_CNT = 0 THEN DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => LF_EXTEND_NAME(pr_name => LC_QUEUE_TABLE_NAME) ,queue_payload_type => 'SYS.XMLTYPE' ,compatible => '10.2' ,multiple_consumers => false ); END IF; -- Если отсутствует очередь - создаём SELECT COUNT(1) INTO LV_CNT FROM ALL_QUEUES AQTB WHERE AQTB.OWNER = LV_SCHEMA_NAME AND AQTB.NAME = LV_QUEUE_NAME; IF LV_CNT = 0 THEN DBMS_AQADM.CREATE_QUEUE ( queue_name => LF_EXTEND_NAME(pr_name => LV_QUEUE_NAME) ,queue_table => LF_EXTEND_NAME(pr_name => LC_QUEUE_TABLE_NAME) ); END IF; -- Запускаем очередь DBMS_AQADM.START_QUEUE ( queue_name => LF_EXTEND_NAME(pr_name => LV_QUEUE_NAME) ); END; -- Создание задания на очистку неиспользуемых pipe PROCEDURE LP_CONSTRUCT IS LV_CNT INTEGER; BEGIN -- Если нет задания - создаём SELECT COUNT(1) INTO LV_CNT FROM ALL_SCHEDULER_JOBS WHERE OWNER = LV_SCHEMA_NAME AND JOB_NAME = LC_TOUCH_PROC_PFX||LC_TOUCH_JOB_SUFF; IF LV_CNT = 0 THEN -- Если нет расписания - создаём SELECT COUNT(1) INTO LV_CNT FROM ALL_SCHEDULER_SCHEDULES WHERE OWNER = LV_SCHEMA_NAME AND SCHEDULE_NAME = LC_TOUCH_PROC_PFX||LC_TOUCH_SCHED_SUFF; IF LV_CNT = 0 THEN DBMS_SCHEDULER.CREATE_SCHEDULE ( schedule_name => LC_TOUCH_PROC_PFX||LC_TOUCH_SCHED_SUFF ,start_date => TRUNC(SYSDATE,'HH24')+1/24 ,repeat_interval => 'FREQ=HOURLY;INTERVAl=1' ); END IF; -- Если нет программы - создаём SELECT COUNT(1) INTO LV_CNT FROM ALL_SCHEDULER_PROGRAMS WHERE OWNER = LV_SCHEMA_NAME AND PROGRAM_NAME = LC_TOUCH_PROC_PFX||LC_TOUCH_PRG_SUFF; IF LV_CNT = 0 THEN DBMS_SCHEDULER.CREATE_PROGRAM ( program_name => LC_TOUCH_PROC_PFX||LC_TOUCH_PRG_SUFF ,program_action => LF_EXTEND_NAME(pr_name => '"'||$$PLSQL_UNIT||'"."PACK"') ,program_type => 'STORED_PROCEDURE' ,number_of_arguments => 0 ,enabled => TRUE ); END IF; -- создаём задание DBMS_SCHEDULER.CREATE_JOB( job_name => LC_TOUCH_PROC_PFX||LC_TOUCH_JOB_SUFF ,program_name => LC_TOUCH_PROC_PFX||LC_TOUCH_PRG_SUFF ,schedule_name => LC_TOUCH_PROC_PFX||LC_TOUCH_SCHED_SUFF ,enabled => TRUE ,auto_drop => FALSE ); END IF; END; -- Чтение сообщения из канала FUNCTION L_RECEIVE_MESSAGE ( PIPENAME IN VARCHAR2 -- имя канала ,TIMEOUT IN INTEGER -- timeout ожидания ,DEQ_CONDITION IN VARCHAR2 DEFAULT NULL -- условие выборки ) RETURN INTEGER IS DEQ_CT DBMS_AQ.DEQUEUE_OPTIONS_T := LV_DEQ_CT; ENQ_MSGID RAW(16); BEGIN LV_READED_TBL := LTP_MSG_DETAILS(); LV_READED_POS := 0; DEQ_CT.WAIT := TIMEOUT; IF DEQ_CONDITION IS NOT NULL THEN DEQ_CT.DEQ_CONDITION := DEQ_CONDITION; END IF; DBMS_AQ.DEQUEUE ( dequeue_options => DEQ_CT ,message_properties => LV_MSG_PROP ,msgid => enq_msgid ,payload => LV_READED_MSG ,queue_name => LF_EXTEND_NAME(pr_name => LF_GET_QUEUE_NAME(pr_name => PIPENAME)) ); LV_READED_POS := -1; RETURN 0; EXCEPTION WHEN E_QUEUE_TIMEOUT THEN -- Если прервались по timeout, то RETURN 1; -- возвращаем сигнал об отсутствии сообщений WHEN E_NO_QUEUE THEN -- Если не нашли очереди, то создаём её и LP_CREATE_QUEUE (pipename => PIPENAME); -- повторяем действие LP_CONSTRUCT; RETURN L_RECEIVE_MESSAGE ( pipename => PIPENAME ,timeout => TIMEOUT ,deq_condition => DEQ_CONDITION ); END; -- Проверяем интервал, когда последний раз вызывася метод для конкретного канала в данной сессии -- и если он превысил заданную величину - посылаем новый сигнал для указанного канала. PROCEDURE LP_TOUCH ( PIPENAME IN VARCHAR2 ) IS V_QUEUE_NAME ALL_QUEUES.NAME%TYPE := LF_GET_QUEUE_NAME(PIPENAME); BEGIN IF LV_TOUCH_LIST.EXISTS(V_QUEUE_NAME) AND SYSDATE <= LV_TOUCH_LIST(V_QUEUE_NAME) + LC_TOUCH_PERIOD THEN RETURN; END IF; LV_TOUCH_LIST(V_QUEUE_NAME) := SYSDATE; PACK_MESSAGE(item => V_QUEUE_NAME); PACK_MESSAGE(item => LV_TOUCH_LIST(V_QUEUE_NAME)); LV_RESULT := SEND_MESSAGE(pipename => LC_TOUCH_PIPE_NAME); WHILE L_RECEIVE_MESSAGE ( pipename => LC_TOUCH_PIPE_NAME ,timeout => 0 ,deq_condition => 'tab.user_data.extract(''/message/varchar2[position()= 1 and text() = '''''||V_QUEUE_NAME||''''']'') is not null' ||' and ' ||'tab.enq_time < to_date('''||to_char(LV_TOUCH_LIST(V_QUEUE_NAME),'dd.mm.yyyy hh24:mi:ss')||''',''dd.mm.yyyy hh24:mi:ss'') - 1/24/3600' ) = 0 LOOP NULL; END LOOP; END; -- Проверка на существование канала FUNCTION LF_IS_PIPE_EXISTS ( PIPENAME IN VARCHAR2 ) RETURN BOOLEAN IS LV_QUEUE_NAME ALL_QUEUES.NAME%TYPE := LF_GET_QUEUE_NAME(pr_name => PIPENAME); LV_CNT INTEGER; BEGIN SELECT COUNT(1) INTO LV_CNT FROM ALL_QUEUES ALQE WHERE ALQE.OWNER = LV_SCHEMA_NAME AND ALQE.NAME = LV_QUEUE_NAME; RETURN LV_CNT != 0; END; -- Очискта канала с определённым фильтром PROCEDURE LP_PURGE ( PIPENAME IN VARCHAR2 ,PURGE_CONDITIONS IN VARCHAR2 DEFAULT NULL ) IS BEGIN IF LF_IS_PIPE_EXISTS ( pipename => PIPENAME ) THEN DBMS_AQADM.PURGE_QUEUE_TABLE ( queue_table => LF_EXTEND_NAME(pr_name => LC_QUEUE_TABLE_NAME) ,purge_condition => 'queue = '''||LF_GET_QUEUE_NAME(pr_name => PIPENAME)||''''|| CASE WHEN PURGE_CONDITIONS IS NULL THEN '' ELSE ' and ('||PURGE_CONDITIONS||')' END ,purge_options => LV_PURGEOPT ); END IF; END; -- Удаление сигналов по пайпам для которых отсутствуют очереди PROCEDURE LP_PURGE_EPSENT_TOUCHE ( PIPENAME IN VARCHAR2 DEFAULT NULL ) IS BEGIN LP_PURGE ( pipename => LC_TOUCH_PIPE_NAME ,purge_conditions => CASE WHEN PIPENAME IS NULL THEN 'extract(user_data,''/message/varchar2[position()=1]/text()'').getStringVal() not in (select distinct queue from '||LF_EXTEND_NAME(pr_name => LC_QUEUE_TABLE_NAME)||')' ELSE 'extract(user_data,''/message/varchar2[position()= 1 and text() = '''''||LF_GET_QUEUE_NAME(PIPENAME)||''''']'') is not null' END ); END; -- Проверяем наличие сообщения в очереди FUNCTION LF_CHECK_MESSAGE ( PR_QUEUE_NAME IN VARCHAR2 ,PR_DEQ_CONDITION IN VARCHAR2 DEFAULT NULL ) RETURN BOOLEAN IS DEQ_CT DBMS_AQ.DEQUEUE_OPTIONS_T := LV_DEQ_CT; ENQ_MSGID RAW(16); BEGIN DEQ_CT.WAIT := DBMS_AQ.NO_WAIT; DEQ_CT.NAVIGATION := DBMS_AQ.FIRST_MESSAGE; DEQ_CT.DEQUEUE_MODE := DBMS_AQ.BROWSE; DEQ_CT.DEQ_CONDITION := PR_DEQ_CONDITION; DBMS_AQ.DEQUEUE ( dequeue_options => DEQ_CT ,message_properties => LV_MSG_PROP ,msgid => enq_msgid ,payload => LV_READED_MSG ,queue_name => LF_EXTEND_NAME(pr_name => LF_GET_QUEUE_NAME(PR_QUEUE_NAME)) ); RETURN TRUE; EXCEPTION WHEN E_QUEUE_TIMEOUT OR E_NO_QUEUE THEN RETURN FALSE; END; -- Проверяем наличие TOUCH по пайпу (в случае задания даты - после неё) FUNCTION LF_CHECK_TOUCH ( PR_QUEUE_NAME IN VARCHAR2 ,PR_DATE IN DATE DEFAULT NULL ) RETURN BOOLEAN IS BEGIN RETURN LF_CHECK_MESSAGE ( pr_queue_name => LC_TOUCH_PIPE_NAME ,pr_deq_condition => 'tab.user_data.extract(''/message/varchar2[position()= 1 and text() = '''''||PR_QUEUE_NAME||''''']'') is not null'|| CASE WHEN PR_DATE IS NULL THEN NULL ELSE ' and tab.enq_time >= to_date('''||to_char(PR_DATE,'dd.mm.yyyy hh24:mi:ss')||''',''dd.mm.yyyy hh24:mi:ss'')' END ); END; -- Проверка канала на устаревание PROCEDURE LP_CHECK_PIPE_QUEUE ( PR_QUEUE_NAME IN VARCHAR2 DEFAULT NULL -- имя очереди канала ) IS BEGIN IF PR_QUEUE_NAME IS NOT NULL THEN -- Если очередь задана IF NOT LF_CHECK_TOUCH ( -- Если не было сигналов более периода на очистку пустых каналов pr_queue_name => PR_QUEUE_NAME ,pr_date => SYSDATE - LC_TOUCH_PERIOD * LC_TOUCH_LIMIT ) THEN IF NOT LF_CHECK_TOUCH (pr_queue_name => PR_QUEUE_NAME) -- Если вообще не было сигналов по каналу THEN LP_TOUCH ( pipename => PR_QUEUE_NAME ); -- Создаём отметку о том, что мы "тронули" канал ELSE -- иначе IF NOT LF_CHECK_MESSAGE (pr_queue_name => PR_QUEUE_NAME) OR -- Если канал пуст NOT LF_CHECK_TOUCH ( -- Или если не было сигналов дольше pr_queue_name => PR_QUEUE_NAME -- периода на удаление непустых каналов ,pr_date => SYSDATE - LC_TOUCH_PERIOD * LC_TOUCH_ANY_LIMIT ) THEN LV_RESULT := REMOVE_PIPE ( pipename => PR_QUEUE_NAME ); -- Удаляем канал END IF; END IF; END IF; ELSE -- Если очередь не задана, то выполняем для каждой из существующих очередей FOR REC IN ( SELECT ALQE.NAME FROM ALL_QUEUES ALQE WHERE ALQE.OWNER = LV_SCHEMA_NAME AND ALQE.NAME LIKE 'UPQ$%' AND ALQE.NAME != LC_TOUCH_PIPE_NAME ) LOOP LP_CHECK_PIPE_QUEUE( pr_queue_name => REC.NAME); END LOOP; LP_PURGE_EPSENT_TOUCHE; -- Очищаем сигналы по несуществующим очередям END IF; END; -- Создание канала FUNCTION CREATE_PIPE ( PIPENAME IN VARCHAR2 -- Имя канала ,MAXPIPESIZE IN INTEGER DEFAULT 8192 -- Не используется ,PRIVATE IN BOOLEAN DEFAULT FALSE -- Не используется ) RETURN INTEGER IS BEGIN LP_CREATE_QUEUE ( pipename => PIPENAME ); LP_TOUCH ( pipename => PIPENAME ); RETURN 0; END; -- Создание канала FUNCTION REMOVE_PIPE ( PIPENAME IN VARCHAR2 -- Имя канала ) RETURN INTEGER IS PRAGMA AUTONOMOUS_TRANSACTION; LV_QUEUE_NAME ALL_QUEUES.NAME%TYPE := LF_GET_QUEUE_NAME(pr_name => PIPENAME); BEGIN -- Пытаемся удалить канал по имени канала, а если не удалось, то по имени очереди FOR I IN 1..2 LOOP FOR REC IN ( SELECT ALQE.NAME AS NAME ,TRIM(ALQE.ENQUEUE_ENABLED) AS ENQUEUE_ENABLED ,TRIM(ALQE.DEQUEUE_ENABLED) AS DEQUEUE_ENABLED FROM ALL_QUEUES ALQE WHERE ALQE.OWNER = LV_SCHEMA_NAME AND ALQE.NAME = LV_QUEUE_NAME ) LOOP IF REC.ENQUEUE_ENABLED = 'YES' OR REC.DEQUEUE_ENABLED = 'YES' THEN DBMS_AQADM.STOP_QUEUE(queue_name => LF_EXTEND_NAME(pr_name => REC.NAME)); END IF; DBMS_AQADM.DROP_QUEUE(queue_name => LF_EXTEND_NAME(pr_name => REC.NAME)); EXIT; END LOOP; IF UPPER(PIPENAME) NOT LIKE 'UPQ$%' OR LENGTH(PIPENAME) != 24 THEN EXIT; ELSE LV_QUEUE_NAME := UPPER(PIPENAME); END IF; END LOOP; -- Очищаем сигналы по несуществующим каналам LP_PURGE_EPSENT_TOUCHE(PIPENAME); COMMIT; RETURN 0; END; -- Упаковка строки в сообщение PROCEDURE PACK_MESSAGE ( ITEM IN VARCHAR2 CHARACTER SET ANY_CS ) IS BEGIN LV_NEW_MESSAGE.EXTEND(1); LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).TYPE_ID := LC_VARCHAR2_TYPEID; LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).V_VARCHAR2 := ITEM; END; -- Упаковка числа в сообщение PROCEDURE PACK_MESSAGE ( ITEM IN NUMBER ) IS BEGIN LV_NEW_MESSAGE.EXTEND(1); LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).TYPE_ID := LC_NUMBER_TYPEID; LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).V_NUMBER := ITEM; END; -- Упаковка даты в сообщение PROCEDURE PACK_MESSAGE ( ITEM IN DATE ) IS BEGIN LV_NEW_MESSAGE.EXTEND(1); LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).TYPE_ID := LC_DATE_TYPEID; LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).V_DATE := ITEM; END; -- Упаковка raw в сообщение PROCEDURE PACK_MESSAGE_RAW ( ITEM IN RAW ) IS BEGIN LV_NEW_MESSAGE.EXTEND(1); LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).TYPE_ID := LC_RAW_TYPEID; LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).V_RAW := ITEM; END; -- Упаковка rowid/urowid в сообщение PROCEDURE PACK_MESSAGE_ROWID ( ITEM IN UROWID ) IS BEGIN LV_NEW_MESSAGE.EXTEND(1); LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).TYPE_ID := LC_UROWID_TYPEID; LV_NEW_MESSAGE(LV_NEW_MESSAGE.COUNT).V_UROWID := ITEM; END; -- Получение типа следующего элемента сообщения FUNCTION NEXT_ITEM_TYPE RETURN INTEGER IS BEGIN IF LV_READED_TBL.COUNT > LV_READED_POS THEN RETURN LV_READED_TBL(LV_READED_POS+1).TYPE_ID; END IF; RETURN 0; END; -- Построение XML представления сообщения FUNCTION CONSTRUCT_MESSAGE RETURN XMLTYPE IS doc dbms_xmldom.DOMDocument; node dbms_xmldom.DOMNode; elem dbms_xmldom.DOMElement; curr dbms_xmldom.DOMNode; curr1 dbms_xmldom.DOMNode; text dbms_xmldom.DOMText; nodeName VARCHAR2(30); BEGIN doc := dbms_xmldom.createDocument('http://www.w3.org/2001/XMLSchema', null, null); dbms_xmldom.setVersion(doc, '1.0'); dbms_xmldom.setCharset(doc, 'UTF8'); node := dbms_xmldom.makeNode(doc); elem := dbms_xmldom.createElement(doc, 'message'); curr := dbms_xmldom.appendChild(node, dbms_xmldom.makeNode(elem)); FOR I IN 1..LV_NEW_MESSAGE.COUNT LOOP nodeName := LV_NAMES(LV_NEW_MESSAGE(I).TYPE_ID); elem := dbms_xmldom.createElement(doc, nodeName); curr1 := dbms_xmldom.appendChild(curr, dbms_xmldom.makeNode(elem)); CASE LV_NEW_MESSAGE(I).TYPE_ID WHEN LC_NUMBER_TYPEID THEN text := dbms_xmldom.createTextNode(doc, rawtohex(utl_raw.cast_from_number(LV_NEW_MESSAGE(I).V_NUMBER))); WHEN LC_VARCHAR2_TYPEID THEN text := dbms_xmldom.createTextNode(doc, LV_NEW_MESSAGE(I).V_VARCHAR2); WHEN LC_DATE_TYPEID THEN text := dbms_xmldom.createTextNode(doc, TO_CHAR(LV_NEW_MESSAGE(I).V_DATE, 'YYYY-MM-DD')||'T'||TO_CHAR(LV_NEW_MESSAGE(I).V_DATE, 'HH24:MI:SS')); WHEN LC_RAW_TYPEID THEN text := dbms_xmldom.createTextNode(doc, RAWTOHEX(LV_NEW_MESSAGE(I).V_RAW)); WHEN LC_UROWID_TYPEID THEN text := dbms_xmldom.createTextNode(doc, LV_NEW_MESSAGE(I).V_UROWID); ELSE text := dbms_xmldom.createTextNode(doc, NULL); END CASE; curr1 := dbms_xmldom.appendChild(curr1, dbms_xmldom.makeNode(text)); END LOOP; LV_NEW_MESSAGE.DELETE(); RETURN dbms_xmldom.getXMLTYPE(doc); END; -- Очистка буфферов сообщений PROCEDURE RESET_BUFFER IS BEGIN LV_NEW_MESSAGE.DELETE(); LV_READED_POS := 0; END; -- Получение уникального имени для сессии FUNCTION UNIQUE_SESSION_NAME RETURN VARCHAR2 IS BEGIN RETURN SYS.DBMS_PIPE.UNIQUE_SESSION_NAME; END; -- Отправка текстового XML сообщения в канал FUNCTION SEND_MESSAGE ( PIPENAME IN VARCHAR2 -- Имя канала ,MESSAGE IN CLOB -- XML текст сообщение ,TIMEOUT IN INTEGER DEFAULT MAXWAIT -- Не используется ,MAXPIPESIZE IN INTEGER DEFAULT 8192 -- Не используется ) RETURN INTEGER IS V_RESULT INTEGER; BEGIN V_RESULT:= SEND_MESSAGE ( pipename => PIPENAME ,message => XMLType.createXML(message) ,timeout => TIMEOUT ,maxpipesize => MAXPIPESIZE ); LV_NEW_MESSAGE.DELETE(); RETURN V_RESULT; END; -- Отправка XML сообщения в канал FUNCTION SEND_MESSAGE ( PIPENAME IN VARCHAR2 -- Имя канала ,MESSAGE IN XMLTYPE -- XML сообщение ,TIMEOUT IN INTEGER DEFAULT MAXWAIT -- Не используется ,MAXPIPESIZE IN INTEGER DEFAULT 8192 -- Не используется ) RETURN INTEGER IS ENQ_MSGID RAW(16); LE_NOQUEUE EXCEPTION; PRAGMA EXCEPTION_INIT(LE_NOQUEUE, -025205); BEGIN DBMS_AQ.ENQUEUE ( queue_name => LF_EXTEND_NAME(pr_name => LF_GET_QUEUE_NAME (pr_name => PIPENAME)) ,enqueue_options => LV_ENQ_CT ,message_properties => LV_MSG_PROP ,payload => MESSAGE ,msgid => ENQ_MSGID ); IF UPPER(PIPENAME) NOT LIKE 'UPQ$%' THEN LP_TOUCH(pipename => PIPENAME); END IF; RETURN 0; EXCEPTION WHEN LE_NOQUEUE THEN LP_CREATE_QUEUE (pipename => PIPENAME); IF UPPER(PIPENAME) = LC_TOUCH_PIPE_NAME THEN LP_CONSTRUCT; END IF; RETURN SEND_MESSAGE ( pipename => PIPENAME ,message => MESSAGE ,timeout => TIMEOUT ,maxpipesize => MAXPIPESIZE ); END; -- Отправка сообщения, построенного из упакованных в буффер данных, в канал FUNCTION SEND_MESSAGE ( PIPENAME IN VARCHAR2 -- Имя канала ,TIMEOUT IN INTEGER DEFAULT MAXWAIT -- Не используется ,MAXPIPESIZE IN INTEGER DEFAULT 8192 -- Не используется ) RETURN INTEGER IS BEGIN RETURN SEND_MESSAGE ( pipename => PIPENAME ,message => CONSTRUCT_MESSAGE ,timeout => TIMEOUT ,maxpipesize => MAXPIPESIZE ); END; -- Начальное наполнение переменных PROCEDURE LP_INIT IS BEGIN SELECT USERNAME INTO LV_SCHEMA_NAME FROM SYS.ALL_USERS WHERE USER_ID = USERENV('SCHEMAID'); LV_PURGEOPT.BLOCK := TRUE; LV_PURGEOPT.DELIVERY_MODE := DBMS_AQADM.BUFFERED; LV_ENQ_CT.VISIBILITY := DBMS_AQ.IMMEDIATE; LV_ENQ_CT.DELIVERY_MODE := DBMS_AQ.BUFFERED; LV_MSG_PROP.DELAY := DBMS_AQ.NO_DELAY; LV_MSG_PROP.EXPIRATION := DBMS_AQ.NEVER; LV_DEQ_CT.DEQUEUE_MODE := DBMS_AQ.REMOVE; LV_DEQ_CT.VISIBILITY := DBMS_AQ.IMMEDIATE; LV_DEQ_CT.DELIVERY_MODE := DBMS_AQ.BUFFERED; LV_DEQ_CT.NAVIGATION := DBMS_AQ.FIRST_MESSAGE; LV_NEW_MESSAGE := LTP_MSG_DETAILS(); LV_READED_TBL := LTP_MSG_DETAILS(); LV_NAMES(LC_DATE_TYPEID) := 'date'; LV_NAMES(LC_NUMBER_TYPEID) := 'number'; LV_NAMES(LC_VARCHAR2_TYPEID) := 'varchar2'; LV_NAMES(LC_RAW_TYPEID) := 'raw'; LV_NAMES(LC_UROWID_TYPEID) := 'urowid'; END; -- Получение сообщения из канала FUNCTION RECEIVE_MESSAGE ( PIPENAME IN VARCHAR2, -- Имя канала TIMEOUT IN INTEGER DEFAULT MAXWAIT -- Время ожидания ) RETURN INTEGER IS BEGIN RETURN L_RECEIVE_MESSAGE ( pipename => PIPENAME ,timeout => TIMEOUT ); END; -- Получение прочитанного из канала XML сообщения FUNCTION UNPACK_MESSAGE RETURN XMLTYPE IS BEGIN RETURN LV_READED_MSG; END; -- Распаковка прочитанного из канала XML (если оно построено через буффер упаковки или соответствует формату!!!) PROCEDURE LP_UNPACK_MESSAGE IS BEGIN IF LV_READED_POS = -1 THEN LV_READED_TBL.DELETE(); LV_READED_POS := 0; IF LV_READED_MSG IS NOT NULL THEN DECLARE doc dbms_xmldom.DOMDocument; elem dbms_xmldom.DOMElement; nodeList dbms_xmldom.DOMNodeList; node dbms_xmldom.DOMNode; text dbms_xmldom.DOMNode; BEGIN doc := dbms_xmldom.newDOMDocument(LV_READED_MSG); elem := dbms_xmldom.getDocumentElement(doc); nodeList := dbms_xmldom.getElementsByTagName(elem, '*'); FOR I IN 0..dbms_xmldom.getLength(nodeList)-1 LOOP node := dbms_xmldom.item(nodeList, i); text := dbms_xmldom.getFirstChild(node); LV_READED_TBL.EXTEND(1); CASE dbms_xmldom.getNodeName(node) WHEN 'number' THEN LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID := LC_NUMBER_TYPEID; LV_READED_TBL(LV_READED_TBL.COUNT).V_NUMBER := UTL_RAW.CAST_TO_NUMBER(hextoraw(dbms_xmldom.getNodeValue(text))); WHEN 'varchar2' THEN LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID := LC_VARCHAR2_TYPEID; LV_READED_TBL(LV_READED_TBL.COUNT).V_VARCHAR2 := dbms_xmldom.getNodeValue(text); WHEN 'date' THEN LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID := LC_DATE_TYPEID; LV_READED_TBL(LV_READED_TBL.COUNT).V_DATE := TO_DATE(TRANSLATE(dbms_xmldom.getNodeValue(text),'T',' '),'YYYY-MM-DD HH24:MI:SS'); WHEN 'urowid' THEN LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID := LC_UROWID_TYPEID; LV_READED_TBL(LV_READED_TBL.COUNT).V_UROWID := dbms_xmldom.getNodeValue(text); WHEN 'raw' THEN LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID := LC_RAW_TYPEID; LV_READED_TBL(LV_READED_TBL.COUNT).V_RAW := HEXTORAW(dbms_xmldom.getNodeValue(text)); ELSE LV_READED_TBL(LV_READED_TBL.COUNT).TYPE_ID := LC_UNKNOWN_TYPEID; END CASE; END LOOP; END; END IF; END IF; END; -- Получить численное представление текущего элемента сообщения PROCEDURE UNPACK_MESSAGE ( ITEM OUT NUMBER ) IS BEGIN LP_UNPACK_MESSAGE; CASE NEXT_ITEM_TYPE WHEN 0 THEN RAISE LE_06556; WHEN LC_NUMBER_TYPEID THEN LV_READED_POS := LV_READED_POS+1; ITEM := LV_READED_TBL(LV_READED_POS).V_NUMBER; ELSE RAISE LE_06559; END CASE; END; -- Получить строковое представление текущего элемента сообщения PROCEDURE UNPACK_MESSAGE ( ITEM OUT VARCHAR2 ) IS BEGIN LP_UNPACK_MESSAGE; CASE NEXT_ITEM_TYPE WHEN 0 THEN RAISE LE_06556; WHEN LC_VARCHAR2_TYPEID THEN LV_READED_POS := LV_READED_POS+1; ITEM := LV_READED_TBL(LV_READED_POS).V_VARCHAR2; ELSE RAISE LE_06559; END CASE; END; -- Получить представление текущего элемента сообщения в виде даты PROCEDURE UNPACK_MESSAGE ( ITEM OUT DATE ) IS BEGIN LP_UNPACK_MESSAGE; CASE NEXT_ITEM_TYPE WHEN 0 THEN RAISE LE_06556; WHEN LC_DATE_TYPEID THEN LV_READED_POS := LV_READED_POS+1; ITEM := LV_READED_TBL(LV_READED_POS).V_DATE; ELSE RAISE LE_06559; END CASE; END; -- Получить представление текущего элемента сообщения в виде raw PROCEDURE UNPACK_MESSAGE_RAW ( ITEM OUT RAW ) IS BEGIN LP_UNPACK_MESSAGE; CASE NEXT_ITEM_TYPE WHEN 0 THEN RAISE LE_06556; WHEN LC_RAW_TYPEID THEN LV_READED_POS := LV_READED_POS+1; ITEM := LV_READED_TBL(LV_READED_POS).V_RAW; ELSE RAISE LE_06559; END CASE; END; -- Получить представление текущего элемента сообщения в виде rowid PROCEDURE UNPACK_MESSAGE_ROWID ( ITEM OUT UROWID ) IS BEGIN LP_UNPACK_MESSAGE; CASE NEXT_ITEM_TYPE WHEN 0 THEN RAISE LE_06556; WHEN LC_UROWID_TYPEID THEN LV_READED_POS := LV_READED_POS+1; ITEM := LV_READED_TBL(LV_READED_POS).V_UROWID; ELSE RAISE LE_06559; END CASE; END; -- Очистка заданного канала PROCEDURE PURGE ( PIPENAME IN VARCHAR2 ) IS PRAGMA AUTONOMOUS_TRANSACTION; BEGIN LP_PURGE ( pipename => PIPENAME ); LV_READED_POS := 0; LV_READED_TBL.DELETE(); LV_NEW_MESSAGE.DELETE(); END; -- Процесс очистки старых каналов PROCEDURE PACK ( PR_CONSTRUCT IN BOOLEAN DEFAULT FALSE ) IS PRAGMA AUTONOMOUS_TRANSACTION; BEGIN IF PR_CONSTRUCT THEN LP_CONSTRUCT; END IF; LP_CHECK_PIPE_QUEUE; COMMIT; EXCEPTION WHEN OTHERS THEN COMMIT; END; -- Удаление внутренних структур UTL_PIPE. -- Доступна только владельцу или суперпользователю PROCEDURE DESTROY IS LV_CNT INTEGER; E_NO_PRIV EXCEPTION; PRAGMA EXCEPTION_INIT(E_NO_PRIV, -01031); BEGIN -- Проверка прав IF USER != 'SYS' AND USER != LV_SCHEMA_NAME THEN RAISE E_NO_PRIV; END IF; -- Удаляем задание SELECT COUNT(1) INTO LV_CNT FROM ALL_SCHEDULER_JOBS WHERE OWNER = LV_SCHEMA_NAME AND JOB_NAME = LC_TOUCH_PROC_PFX||LC_TOUCH_JOB_SUFF; IF LV_CNT > 0 THEN DBMS_SCHEDULER.drop_job ( job_name => LF_EXTEND_NAME(pr_name => LC_TOUCH_PROC_PFX||LC_TOUCH_JOB_SUFF) ,force => true ); END IF; -- Удаляем программу SELECT COUNT(1) INTO LV_CNT FROM ALL_SCHEDULER_PROGRAMS WHERE OWNER = LV_SCHEMA_NAME AND PROGRAM_NAME = LC_TOUCH_PROC_PFX||LC_TOUCH_PRG_SUFF; IF LV_CNT > 0 THEN DBMS_SCHEDULER.DROP_PROGRAM ( program_name => LF_EXTEND_NAME(pr_name => LC_TOUCH_PROC_PFX||LC_TOUCH_PRG_SUFF) ,force => true ); END IF; -- Удаляем расписание SELECT COUNT(1) INTO LV_CNT FROM ALL_SCHEDULER_SCHEDULES WHERE OWNER = LV_SCHEMA_NAME AND SCHEDULE_NAME = LC_TOUCH_PROC_PFX||LC_TOUCH_SCHED_SUFF; IF LV_CNT > 0 THEN DBMS_SCHEDULER.DROP_SCHEDULE ( schedule_name => LF_EXTEND_NAME(pr_name => LC_TOUCH_PROC_PFX||LC_TOUCH_SCHED_SUFF) ,force => true ); END IF; -- Удаляем все внутренние очереди FOR REC IN ( SELECT ALQE.NAME ,ALQE.ENQUEUE_ENABLED ,ALQE.DEQUEUE_ENABLED FROM ALL_QUEUES ALQE WHERE ALQE.OWNER = LV_SCHEMA_NAME AND ALQE.QUEUE_TABLE = LC_QUEUE_TABLE_NAME AND ALQE.NAME LIKE 'UPQ$%' ) LOOP IF TRIM(REC.ENQUEUE_ENABLED) = 'YES' OR TRIM(REC.DEQUEUE_ENABLED) = 'YES' THEN -- Останавливаем очередь, если необходимо DBMS_AQADM.STOP_QUEUE (queue_name => LF_EXTEND_NAME(pr_name => REC.NAME)); END IF; -- Удаляем очередь DBMS_AQADM.DROP_QUEUE (queue_name => LF_EXTEND_NAME(pr_name => REC.NAME)); END LOOP; -- Удаляем служебную таблицу для очередей FOR REC IN ( SELECT AQTB.QUEUE_TABLE FROM ALL_QUEUE_TABLES AQTB WHERE AQTB.OWNER = LV_SCHEMA_NAME AND AQTB.QUEUE_TABLE = LC_QUEUE_TABLE_NAME ) LOOP DBMS_AQADM.DROP_QUEUE_TABLE (queue_table => LF_EXTEND_NAME(pr_name => REC.QUEUE_TABLE)); END LOOP; END; BEGIN -- Начальная инициализация пакета LP_INIT; END UTL_PIPE; /
Небольшой такой P.S.> Хочется использовать буферизованную (nonpersistent,...) очередь, она - типа - быстрее, но секс заключается в том, что на RAC такая очередь создается на одном узле, а второй идёт по неявному (прозрачно для вас созданному) db_link-у. Т.е. с базового узла, который владеет очередью, обращение идёт прямо, а со второстепенного - через линк. И всё бы хорошо, да вот только job-ы поднимаются под SYS-ом, а оттуда db_link по тому же пользователю не прокинуть. Итого - либо persistent, либо RAC, либо забыть про job-ы. Почти аналог CAP теоремы - выбери 2 ;).
ОтветитьУдалить