Setup As SYS |
conn / as sysdba
-- validate Oracle parameters
show parameter aq_tm_processes
show parameter job_queue_processes
SELECT owner, queue_name, queue_table, consumer_name
FROM dba_queue_subscribers;
col owner format a10
col grantee format a10
col grantor format a10
SELECT *
FROM queue_privileges;
-- create AQ administrator
CREATE USER aqadmin
IDENTIFIED BY aqadmin
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 20M ON uwdata;
GRANT create session TO aqadmin;
GRANT create procedure TO aqadmin;
GRANT create table TO aqadmin;
GRANT create type TO aqadmin;
GRANT create public synonym TO aqadmin;
GRANT aq_administrator_role TO aqadmin IDENTIFIED BY aqadmin;
GRANT execute ON dbms_aq TO aqadmin;
GRANT execute ON dbms_aqadm TO aqadmin;
GRANT execute ON dbms_lock TO aqadmin; -- required for demo but not for AQ
GRANT execute on dbms_crypto TO aqadmin; -- required for demo but not for AQ
SELECT username, account_status, created
FROM dba_users
ORDER BY 1;
SELECT *
FROM dba_sys_privs
WHERE grantee = 'AQADMIN';
col privilege format a15
col owner format a15
SELECT role, owner, table_name, privilege
FROM role_tab_privs
WHERE role = 'AQ_ADMINISTRATOR_ROLE'
ORDER BY 4, 2, 3;
-- create AQ users
CREATE USER gen_pharmacy
IDENTIFIED BY gen_pharm
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 5M ON uwdata;
GRANT create session, create synonym TO gen_pharmacy;
CREATE USER icu_pharmacy
IDENTIFIED BY icu_pharm
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 5M ON uwdata;
GRANT create session, create synonym TO icu_pharmacy;
CREATE USER icu
IDENTIFIED BY icu
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 5M ON uwdata;
GRANT create session, create_synonym TO icu;
CREATE USER nurses_stn
IDENTIFIED BY nurses_stn
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 5M ON uwdata;
GRANT create session, create synonym TO nurses_stn; |
|
Setup As AQADMIN |
conn aqadmin/aqadmin
SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;
-- create message user-defined data type
CREATE OR REPLACE TYPE message_t AS OBJECT (
id NUMBER,
source VARCHAR2(30),
rx VARCHAR2(30));
/
SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;
desc message_t
-- examine message for message content demo
SELECT text
FROM user_source
WHERE name = 'MESSAGE_T'
ORDER BY line;
CREATE TABLE test (
reg_col VARCHAR2(11),
see_msg message_t);
desc test
set describe depth all linenum on indent on
desc test
INSERT INTO test
(reg_col, see_msg)
VALUES
('Test Values', message_t(1, 'Thorazine', USER));
SELECT * FROM test;
DROP TABLE test PURGE; |
|
Build As AQADMIN |
-- table to hold dequeued messages
CREATE TABLE rx_processed_data (
rx_id NUMBER,
rx_name VARCHAR2(30),
source VARCHAR2(30),
mprop_priority NUMBER(38),
mprop_delay NUMBER(38),
mprop_attempts NUMBER(38),
msg_enqueue_time DATE,
mprop_state NUMBER(5),
processed_by VARCHAR2(30),
processed_ts TIMESTAMP);
SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;
Create a Queue Table |
/* -- CREATE QUEUE TABLE syntax
dbms_aqadm.create_queue_table(
queue_table IN VARCHAR2, -- table's name
queue_payload_type IN VARCHAR2, -- user defined data type's name
storage_clause IN VARCHAR2 DEFAULT NULL, -- define pctfree
sort_list IN VARCHAR2 DEFAULT NULL, -- priority and/or enq_time
multiple_consumers IN BOOLEAN DEFAULT FALSE, -- FALSE: message can be consumed only once
message_grouping IN BINARY_INTEGER DEFAULT NONE, -- TRANSACTIONAL: messages in a transaction are a group
comment IN VARCHAR2 DEFAULT NULL, -- definer's comments
auto_commit IN BOOLEAN DEFAULT TRUE, -- deprecated parameter
primary_instance IN BINARY_INTEGER DEFAULT 0, -- manage queue in primary
secondary_instance IN BINARY_INTEGER DEFAULT 0, -- RAC failover if possible
compatible IN VARCHAR2 DEFAULT NULL, -- lowest compatible version
non_repudiation IN BINARY_INTEGER DEFAULT 0,
secure IN BOOLEAN DEFAULT FALSE);
*/ |
exec dbms_aqadm.create_queue_table(
queue_table => 'rx_queue_table',
queue_payload_type => 'message_t',
storage_clause => 'PCTFREE 0 PCTUSED 99',
sort_list => 'ENQ_TIME',
multiple_consumers => TRUE,
comment => 'Pharmacy Queue Table',
compatible => '11.2',
secure => FALSE);
SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;
Single Consumer Queue Table |
Multiple Consumer Queue Table |
SQL> SELECT object_name, object_type
2 FROM user_objects
3 ORDER by 2,1;
OBJECT_NAME OBJECT_TYPE
------------------------- ----------
AQ$_RX_QUEUE_TABLE_I INDEX
AQ$_RX_QUEUE_TABLE_T INDEX
SYS_C0028215 INDEX
SYS_LOB0000093484C00032$$ LOB
AQ$_RX_QUEUE_TABLE_E QUEUE
RX_PROCESSED_DATA TABLE
RX_QUEUE_TABLE TABLE
MESSAGE_T TYPE
AQ$RX_QUEUE_TABLE VIEW
AQ$_RX_QUEUE_TABLE_F VIEW |
SQL> SELECT object_name, object_type
2 FROM user_objects
3 ORDER by 2,1;
OBJECT_NAME OBJECT_TYPE
------------------------- -------------------
AQ$_RX_QUEUE_TABLE_V EVALUATION CONTEXT
SYS_C0028218 INDEX
SYS_C0028221 INDEX
SYS_IOT_TOP_93503 INDEX
SYS_IOT_TOP_93505 INDEX
SYS_IOT_TOP_93507 INDEX
SYS_IOT_TOP_93510 INDEX
SYS_LOB0000093494C00032$$ LOB
AQ$_RX_QUEUE_TABLE_E QUEUE
AQ$_RX_QUEUE_TABLE_N SEQUENCE
Q$_RX_QUEUE_TABLE_G TABLE
AQ$_RX_QUEUE_TABLE_H TABLE
AQ$_RX_QUEUE_TABLE_I TABLE
AQ$_RX_QUEUE_TABLE_S TABLE
AQ$_RX_QUEUE_TABLE_T TABLE
RX_PROCESSED_DATA TABLE
RX_QUEUE_TABLE TABLE
SYS_IOT_OVER_93507 TABLE
MESSAGE_T TYPE
AQ$RX_QUEUE_TABLE VIEW
AQ$RX_QUEUE_TABLE_S VIEW
AQ$_RX_QUEUE_TABLE_F VIEW |
col evaluation_function format a30
col evaluation_context_comment format a30
col table_name format a30
col user_comment format a30
-- examine evaluation context if a multiple consumer queue
SELECT * FROM user_evaluation_contexts;
-- examine evaluation context table if a multiple consumer_queue
SELECT * FROM user_evaluation_context_tables;
-- examine queues
SELECT name, queue_table, user_comment
FROM user_queues;
-- examine sequence
SELECT sequence_name, min_value, max_value,
increment_by, cycle_flag, order_flag, cache_size
FROM user_sequences;
set describe depth 1
-- examine tables
desc RX_QUEUE_TABLE
set describe depth all
desc RX_QUEUE_TABLE
-- look at those tables and views that exist based on your consumer type
desc AQ$RX_QUEUE_TABLE
desc AQ$_RX_QUEUE_TABLE_G
desc AQ$_RX_QUEUE_TABLE_H
desc AQ$_RX_QUEUE_TABLE_I
desc AQ$_RX_QUEUE_TABLE_S
desc AQ$_RX_QUEUE_TABLE_T
-- if a multi-consumer queue
SELECT table_name, iot_name
FROM user_tables;
-- examine views
desc AQ$RX_QUEUE_TABLE
desc AQ$_RX_QUEUE_TABLE_F --
desc AQ$RX_QUEUE_TABLE_S -- transformations
desc AQ$RX_QUEUE_TABLE_R -- rules
Create the rx_queue Queue Using the rx_queue_Table |
dbms_aqadm.create_queue (
queue_name IN VARCHAR2, -- queue's name
queue_table IN VARCHAR2, -- previously defined queue table
queue_type IN BINARY_INTEGER DEFAULT NORMAL_QUEUE, -- Normal or Exception
max_retries IN NUMBER DEFAULT NULL, -- default is 2**31-1
retry_delay IN NUMBER DEFAULT 0, -- in seconds
retention_time IN NUMBER DEFAULT 0,
dependency_tracking IN BOOLEAN DEFAULT FALSE, -- must be FALSE: the default
comment IN VARCHAR2 DEFAULT NULL, --definer's comment
auto_commit IN BOOLEAN DEFAULT TRUE); -- deprecated parameter |
exec dbms_aqadm.create_queue(
queue_name => 'rx_queue',
queue_table => 'rx_queue_table',
queue_type => dbms_aqadm.NORMAL_QUEUE,
max_retries => 3,
retry_delay => 1,
comment => 'Prescription Queue');
SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type
ORDER BY 1;
-- note QUEUE and RULE SET creation plus new views if you have a multiple consumer queue
SELECT name, queue_table, user_comment
FROM user_queues;
SELECT rule_set_name, rule_set_eval_context_owner, rule_set_eval_context_name
FROM user_rule_sets;
set long 10000
set pagesize 0
SELECT view_name, text
FROM user_views;
set pagesize 25
SELECT name, enqueue_enabled, dequeue_enabled
FROM user_queues;
Start Queue |
dbms_aqadm.start_queue (
queue_name IN VARCHAR2,
enqueue IN BOOLEAN DEFAULT TRUE,
dequeue IN BOOLEAN DEFAULT TRUE); |
exec dbms_aqadm.start_queue(queue_name => 'RX_QUEUE');
SELECT name, enqueue_enabled, dequeue_enabled
FROM user_queues;
Create Queue Subscribers |
dbms_aqadm.grant_queue_privilege (
privilege IN VARCHAR2,
queue_name IN VARCHAR2,
grantee IN VARCHAR2,
grant_option IN BOOLEAN DEFAULT FALSE); |
desc queue_privileges
col grantee format a12
col grantor format a12
col owner format a20
col name format a20
SELECT grantee, owner, name, grantor, enqueue_privilege, dequeue_privilege
FROM queue_privileges
WHERE name = 'RX_QUEUE';
BEGIN
dbms_aqadm.grant_queue_privilege('DEQUEUE', 'RX_QUEUE', 'gen_pharmacy', FALSE);
dbms_aqadm.grant_queue_privilege('DEQUEUE', 'RX_QUEUE', 'icu_pharmacy', FALSE);
dbms_aqadm.grant_queue_privilege('ENQUEUE', 'RX_QUEUE', 'gen_pharmacy', FALSE);
dbms_aqadm.grant_queue_privilege('ENQUEUE', 'RX_QUEUE', 'icu', FALSE);
dbms_aqadm.grant_queue_privilege('ENQUEUE', 'RX_QUEUE', 'nurses_stn', FALSE);
END;
/
SELECT grantee, owner, name, grantor, enqueue_privilege, dequeue_privilege
FROM queue_privileges
WHERE name = 'RX_QUEUE';
-- create queue subscribers
desc user_queue_subscribers
col queue_name format a12
col address format a10
SELECT queue_name, consumer_name, address, protocol, delivery_mode, queue_to_queue
FROM user_queue_subscribers;
TYPE aq$_agent AS OBJECT (
name VARCHAR2(30), -- name of message producer or consumer
address VARCHAR2(1024), -- Protocol-specific address of the recipient in the form [schema.]queue[@dblink]
protocol NUMBER DEFAULT 0); -- must be 0, other values for internal use only
dbms_aqadm.add_subscriber(
queue_name IN VARCHAR2, -- name of queue
subscriber IN sys.aq$_agent, -- name, address and, protocol
rule IN VARCHAR2 DEFAULT NULL, -- conditional / similar to WHERE clause
transformation IN VARCHAR2 DEFAULT NULL -- message transformation rule
queue_to_queue IN BOOLEAN DEFAULT FALSE, -- TRUE indicates queue-to-queue messaging
delivery_mode IN PLS_INTEGER DEFAULT dbms_aqadm.persistent); -- BUFFERED, PERSISTENT_OR_BUFFERED, or PERSISTENT |
DECLARE
subsc_t sys.aq$_agent;
subsc_addr VARCHAR2(1024) := 'AQADMIN.RX_QUEUE';
BEGIN
subsc_t := sys.aq$_agent('gen_pharmacy', subsc_addr, 0);
dbms_aqadm.add_subscriber('rx_queue', subsc_t, 'priority > 11');
subsc_t := sys.aq$_agent('icu_pharmacy', subsc_addr, 0);
dbms_aqadm.add_subscriber('rx_queue', subsc_t, 'priority < 10');
subsc_t := sys.aq$_agent('icu', subsc_addr, 0);
dbms_aqadm.add_subscriber('rx_queue', subsc_t);
subsc_t := sys.aq$_agent('nurse_stn', subsc_addr, 0);
dbms_aqadm.add_subscriber('rx_queue', subsc_t);
END;
/
col consumer_name format a13
SELECT queue_name, queue_table, consumer_name, protocol, delivery_mode, queue_to_queue
FROM user_queue_subscribers;
Create Propagation Schedule |
dbms_aqadm.schedule_propagation (
queue_name IN VARCHAR2, -- name of queue
destination IN VARCHAR2 DEFAULT NULL, -- destination database link
start_time IN DATE DEFAULT SYSDATE, -- initial propagation start time
duration IN NUMBER DEFAULT NULL, -- propagation window in seconds
next_time IN VARCHAR2 DEFAULT NULL, -- date-time of next window
latency IN NUMBER DEFAULT 60, -- maximum wait in seconds
destination_queue IN VARCHAR2 DEFAULT NULL); -- target queue name and db link |
desc user_queue_schedules
SELECT COUNT(*) FROM user_queue_schedules;
exec dbms_aqadm.schedule_propagation('RX_QUEUE', latency =>0);
col qname format a10
col destination format a11
col start_date format a12
col start_time format a10
col next_time format a10
SELECT qname, destination, start_date, start_time, propagation_window, next_time, latency
FROM user_queue_schedules;
col process_name format a12
col session_id format a10
col instance format 99
col last_run_date format a20
col last_run_time format a13
col current_start_date format a20
SELECT qname, process_name, session_id, instance, last_run_date, last_run_time, current_start_date
FROM user_queue_schedules;
col next_run_date format a20
SELECT qname, current_start_time, next_run_date, next_run_time, total_time, total_number
FROM user_queue_schedules;
SELECT qname, total_bytes, max_number, max_bytes, avg_number, avg_size, avg_time
FROM user_queue_schedules; |
|
CREATE TABLE rx_inventory (
pharm_id VARCHAR2(3),
rx_name VARCHAR2(30),
rx_quantity NUMBER(5));
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Aspirin', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Compazine', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Prozac', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Lipitor', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Oxycontin', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'aaa', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'bbb', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'ccc', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Tetracycline', 10);
INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Thorazine', 10);
COMMIT;
SELECT *
FROM rx_inventory;
GRANT SELECT, UPDATE ON rx_inventory TO gen_pharmacy;
GRANT SELECT, UPDATE ON rx_inventory TO icu_pharmacy; |
|
-- create procedure to dequeue messages
TYPE DEQUEUE_OPTIONS_T IS RECORD (
consumer_name VARCHAR2(30) DEFAULT NULL,
dequeue_mode BINARY_INTEGER DEFAULT REMOVE,
navigation BINARY_INTEGER DEFAULT NEXT_MESSAGE,
visibility BINARY_INTEGER DEFAULT ON_COMMIT,
wait BINARY_INTEGER DEFAULT FOREVER,
msgid RAW(16) DEFAULT NULL,
correlation VARCHAR2(128) DEFAULT NULL,
deq_condition VARCHAR2(4000) DEFAULT NULL,
signature aq$_sig_prop DEFAULT NULL,
transformation VARCHAR2(60) DEFAULT NULL,
delivery_mode PLS_INTEGER DEFAULT PERSISTENT);
TYPE MESSAGE_PROPERTIES_T IS RECORD (
priority BINARY_INTEGER DEFAULT 1,
delay BINARY_INTEGER DEFAULT NO_DELAY,
expiration BINARY_INTEGER DEFAULT NEVER,
correlation VARCHAR2(128) DEFAULT NULL,
attempts BINARY_INTEGER,
recipient_list aq$_recipient_list_t,
exception_queue VARCHAR2(51) DEFAULT NULL,
enqueue_time DATE,
state BINARY_INTEGER,
sender_id aq$_agent DEFAULT NULL,
original_msgid RAW(16) DEFAULT NULL);
TYPE SYS.AQ$_RECIPIENT_LIST_T IS TABLE OF sys.aq$_agent
INDEX BY BINARY_INTEGER;
dbms_aq.dequeue(
queue_name IN VARCHAR2,
dequeue_options IN dequeue_options_t,
message_properties OUT message_properties_t,
payload OUT <user_defined_data_type_name>
msgid OUT RAW); |
CREATE OR REPLACE PROCEDURE demo_dequeue(appuser IN VARCHAR2) AUTHID DEFINER IS
dq_msgid RAW(16);
dq_opt dbms_aq.dequeue_options_t;
msg_prop dbms_aq.message_properties_t;
payload_t message_t;
q_on_hand rx_inventory.rx_quantity%TYPE;
no_messages EXCEPTION;
pragma exception_init(no_messages, -25228);
pragma autonomous_transaction;
BEGIN
dq_opt.consumer_name := appuser;
dq_opt.dequeue_mode := dbms_aq.remove;
dq_opt.navigation := dbms_aq.first_message;
dq_opt.visibility := dbms_aq.immediate;
dq_opt.wait := 10;
dbms_aq.dequeue('AQADMIN.RX_QUEUE', dq_opt, msg_prop, payload_t, dq_msgid);
SELECT rx_quantity
INTO q_on_hand
FROM rx_inventory
WHERE rx_name = payload_t.rx;
IF q_on_hand > 0 THEN
UPDATE rx_inventory
SET rx_quantity = rx_quantity - 1
WHERE rx_name = payload_t.rx;
ELSE
NULL; -- you would never do
this in the real world ... it is just a placeholder
-- send back a failure
message using the queue as homework for the nurses to dequeue
END IF;
INSERT INTO rx_processed_data
(rx_id, rx_name, source, mprop_priority, mprop_delay,
mprop_attempts, msg_enqueue_time, mprop_state, processed_by, processed_ts)
VALUES
(payload_t.id, payload_t.rx, payload_t.source, msg_prop.priority, msg_prop.delay,
msg_prop.attempts, msg_prop.enqueue_time, msg_prop.state, APPUSER, SYSTIMESTAMP);
COMMIT;
EXCEPTION
WHEN no_messages THEN
COMMIT;
WHEN others THEN
RAISE;
END demo_dequeue;
/ |
GRANT execute ON demo_dequeue TO gen_pharmacy;
GRANT execute ON demo_dequeue TO icu_pharmacy;
conn gen_pharmacy/gen_pharm
CREATE SYNONYM demo_dequeue FOR aqadmin.demo_dequeue;
conn icu_pharmacy/icu_pharm
CREATE SYNONYM demo_dequeue FOR aqadmin.demo_dequeue;
conn aqadmin/aqadmin
Create Procedure to Enqueue Messages |
TYPE ENQUEUE_OPTIONS_T IS RECORD (
visibility BINARY_INTEGER DEFAULT ON_COMMIT,
relative_msgid RAW(16) DEFAULT NULL,
sequence_deviation BINARY_INTEGER DEFAULT NULL,
transformation VARCHAR2(60) DEFAULT NULL);
TYPE MESSAGE_PROPERTIES_T IS RECORD (
priority BINARY_INTEGER DEFAULT 1,
delay BINARY_INTEGER DEFAULT NO_DELAY,
expiration BINARY_INTEGER DEFAULT NEVER,
correlation VARCHAR2(128) DEFAULT NULL,
attempts BINARY_INTEGER,
recipient_list aq$_recipient_list_t,
exception_queue VARCHAR2(51) DEFAULT NULL,
enqueue_time DATE,
state BINARY_INTEGER,
sender_id aq$_agent DEFAULT NULL,
original_msgid RAW(16) DEFAULT NULL);
dbms_aq.enqueue(
queue_name IN VARCHAR2,
enqueue_options IN enqueue_options_t,
message_properties IN message_properties_t,
payload IN <user_defined_data_type>,
msgid OUT RAW); |
CREATE OR REPLACE PROCEDURE demo_enqueue(usermsg IN MESSAGE_T, urgency IN NUMBER)
AUTHID DEFINER IS
eq_msgid RAW(16);
eq_opt dbms_aq.enqueue_options_t;
msg_prop dbms_aq.message_properties_t;
agt_prop sys.aq$_agent;
pragma autonomous_transaction;
BEGIN
agt_prop := sys.aq$_agent(USER, NULL, 0);
msg_prop.sender_id := agt_prop;
IF urgency < 10 THEN
msg_prop.priority := urgency;
eq_opt.visibility :=
dbms_aq.immediate;
msg_prop.delay := dbms_aq.no_delay;
msg_prop.expiration := 300; -- push to exception queue in 5 min.
ELSE
msg_prop.priority := urgency;
eq_opt.sequence_deviation := NULL;
msg_prop.delay :=
1; -- one second delay before sending
msg_prop.expiration := 1800; -- push to exception queue in 30 min.
END IF;
dbms_aq.enqueue('aqadmin.rx_queue', eq_opt, msg_prop, usermsg, eq_msgid);
COMMIT;
END demo_enqueue;
/ |
|
|
Enqueues Message Procedure |
conn aqadmin/aqadmin
CREATE OR REPLACE PROCEDURE order_rx AUTHID DEFINER IS
TYPE ixbb IS TABLE OF VARCHAR2(30)
INDEX BY BINARY_INTEGER;
TYPE ixbi IS TABLE OF NUMBER(2)
INDEX BY BINARY_INTEGER;
rxarray ixbb; -- rx name
rxurgnt ixbi; -- rx delivery priority
usermsg aqadmin.message_t;
x PLS_INTEGER;
endcnt PLS_INTEGER;
BEGIN
rxarray(0):='ccc';
rxarray(1):='Aspirin';
rxarray(2):='Prozac';
rxarray(3):='Lipitor';
rxarray(4):='Tetracycline';
rxarray(5):='Thorazine';
rxarray(6):='Oxycontin';
rxarray(7):='Compazine';
rxarray(8):='aaa';
rxarray(9):='bbb';
rxurgnt(0) := 1;
rxurgnt(1) := 3;
rxurgnt(2) := 5;
rxurgnt(3) := 11;
rxurgnt(4) := 19;
rxurgnt(5) := 20;
rxurgnt(6) := 32;
rxurgnt(7) := 89;
rxurgnt(8) := 42;
rxurgnt(9) := 66;
IF USER = 'ICU' THEN
endcnt := 50;
ELSIF USER = 'NURSES_STN' THEN
endcnt := 30;
END IF;
FOR i IN 1..endcnt
LOOP
x := TO_NUMBER(SUBSTR(dbms_crypto.randominteger,3,1));
usermsg := aqadmin.message_t(i, USER, rxarray(x));
demo_enqueue(usermsg, rxurgnt(x));
IF USER = 'ICU' THEN
dbms_lock.sleep(3);
ELSIF USER = 'NURSES_STN' THEN
dbms_lock.sleep(1);
END IF;
END LOOP;
END order_rx;
/
CREATE PUBLIC SYNONYM order_rx FOR aqadmin.order_rx;
GRANT execute ON order_rx TO icu;
GRANT execute ON order_rx TO nurses_stn; |
|
Dequeues Message Procedure |
conn aqadmin/aqadmin
TYPE dbms_aq.aq$_agent_list_t IS
TABLE OF aq$_agent
INDEXED BY BINARY_INTEGER;
dbms_aq.listen(
agent_list IN dbms_aq.aq$_agent_list_t,
wait IN BINARY_INTEGER DEFAULT dbms_aq.forever,
agent OUT sys.aq$_agent); |
CREATE OR REPLACE PROCEDURE get_rx_order AUTHID DEFINER IS
qlist dbms_aq.aq$_agent_list_t;
agent_w_msg sys.aq$_agent;
listen_timeout EXCEPTION;
pragma exception_init(listen_timeout, -25254);
BEGIN
qlist(0) := sys.aq$_agent(USER, 'AQADMIN.RX_QUEUE', NULL);
/* if retrieving message for multiple users simultaneously example
qlist(0) := sys.aq$_agent('GenPharm', 'AQADMIN.RX_QUEUE', NULL);
qlist(1) := sys.aq$_agent('ICUPharm', 'AQADMIN.RX_QUEUE', NULL);
*/
LOOP
BEGIN
dbms_aq.listen(agent_list => qlist, wait => 15, agent => agent_w_msg);
IF agent_w_msg.name = USER THEN
demo_dequeue(USER);
END IF;
EXCEPTION
WHEN listen_timeout THEN
EXIT;
END;
END LOOP;
END get_rx_order;
/
CREATE PUBLIC SYNONYM get_rx_order FOR aqadmin.get_rx_order;
GRANT execute ON get_rx_order TO gen_pharmacy;
GRANT execute ON get_rx_order TO icu_pharmacy; |
|
To Run Demo |
The following requires that you simultaneously open four SQL*Plus sessions.
In the first session log on as the ICU and type step 2 but do not execute it. In the second session do the same thing as the Nurses Station,
and in the third as the Pharmacy. Again not executing the stored procedure. Then log on as the aqadmin, set up the SQL*Plus environment and
execute one of the two queries then enter a slash (to repeat the query) but do not press the <Enter> key.
Step |
ICU Orders Meds |
Nurses Station Orders Meds |
Fill Orders |
Fill Orders |
1 |
conn icu/icu |
conn nurses_stn/nurses_stn |
conn gen_pharmacy/gen_pharm |
conn icu_pharmacy/icu_pharm |
2 |
exec order_rx |
exec order_rx |
exec get_rx_order |
exec get_rx_order |
conn aqadmin/aqadmin
SELECT * from rx_inventory
SELECT COUNT(*) FROM rx_queue_table;
col source format a10
col processed_by format a12
col processed_ts format a30
SELECT rx_id, rx_name, source, mprop_priority, mprop_delay, mprop_attempts
FROM rx_processed_data
ORDER BY rx_id;
SELECT rx_id, rx_name, source, mprop_priority, mprop_delay, mprop_attempts
FROM rx_processed_data
ORDER BY processed_ts;
SELECT rx_id, msg_enqueue_time, mprop_state, processed_by, processed_ts
FROM rx_processed_data
ORDER BY processed_ts;
SELECT rx_id, rx_name, source, processed_by
FROM rx_processed_data
ORDER BY processed_ts; |
Then, with everything set up ... go to the first window and hit the
<Enter> key, then the second, then the third and finally the same in the AQADMIN session.
Continue to monitor the AQADMIN session while the ICU and Nurses order medications and the pharmacies fill them. |
|
Cleanup As AQADMIN |
conn aqadmin/aqadmin
-- there is a simpler form of this below
CREATE OR REPLACE PROCEDURE purgeQtable(qtable IN VARCHAR2) AUTHID CURRENT_USER AS
po_t dbms_aqadm.aq$_purge_options_t;
qname VARCHAR2(30);
CURSOR qcur IS
SELECT name
FROM user_queues
WHERE queue_table = UPPER(qtable);
BEGIN
po_t.block := FALSE;
dbms_aqadm.purge_queue_table(USER || '.' || qtable, NULL, po_t);
execute immediate 'ALTER TABLE ' || qtable || ' ENABLE ROW MOVEMENT';
execute immediate 'ALTER TABLE ' || qtable || ' SHRINK SPACE CASCADE';
execute immediate 'ALTER TABLE ' || qtable || ' DISABLE ROW MOVEMENT';
FOR qrec IN qcur LOOP
qname := qrec.name;
IF INSTR(qname, '$') > 0 THEN
dbms_aqadm.start_queue(qname, enqueue=>FALSE);
ELSE
dbms_aqadm.start_queue(qname);
END IF;
END LOOP;
dbms_utility.compile_schema(USER,compile_all=>FALSE);
EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line('PurgeQTable: Error Starting Queue: '||qname||': '||SQLERRM);
END purgeQtable;
/
set serveroutput on
exec purgeQtable('RX_QUEUE_TABLE');
dbms_aqadm.stop_queue (
queue_name IN VARCHAR2,
enqueue IN BOOLEAN DEFAULT TRUE,
dequeue IN BOOLEAN DEFAULT TRUE,
wait IN BOOLEAN DEFAULT TRUE);
dbms_aqadm.purge_queue_table(
queue_table IN VARCHAR2,
purge_condition IN VARCHAR2,
purge_options IN aq$_purge_options_t);
dbms_aqadm.drop_queue (
queue_name IN VARCHAR2,
auto_commit IN BOOLEAN DEFAULT TRUE);
dbms_aqadm.drop_queue_table (
queue_table IN VARCHAR2,
force IN BOOLEAN DEFAULT FALSE,
auto_commit IN BOOLEAN DEFAULT TRUE); |
SELECT name, enqueue_enabled, dequeue_enabled
FROM user_queues;
-- stop Queue rx_queue
exec dbms_aqadm.stop_queue(queue_name => 'rx_queue', wait => TRUE);
SELECT name, enqueue_enabled, dequeue_enabled
FROM user_queues;
SELECT COUNT(*)
FROM rx_queue_table;
-- purge the queue table rx_queue_table
DECLARE
po_t dbms_aqadm.aq$_purge_options_t;
BEGIN
po_t.block := FALSE;
dbms_aqadm.purge_queue_table('RX_QUEUE_TABLE', NULL, po_t);
END;
/
SELECT COUNT(*)
FROM rx_queue_table;
SELECT name, queue_table
FROM user_queues;
-- drop Queue rx_queue
exec dbms_aqadm.drop_queue('RX_QUEUE');
SELECT name, queue_table
FROM user_queues;
-- drop Queue Table
exec dbms_aqadm.drop_queue_table('rx_queue_table', TRUE);
SELECT name, queue_table
FROM user_queues;
-- verify queue infrastructure is dropped
SELECT object_name, object_type
FROM user_objects
ORDER BY 2,1;
conn / as sysdba
drop user gen_pharmacy cascade;
drop user icu_pharmacy cascade;
drop user icu cascade;
drop user nurses_stn cascade;
drop user aqadmin cascade; |
|