Advanced Queuing

Oracle Database 12c:
Advanced Queuing

Tested on:
Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit Production

References:
http://www.oracle.com/technetwork/database/oracleadvancedqueuingwp-2524215.pdf
https://docs.oracle.com/database/121/ADQUE/jm_exmpl.htm#ADQUE1600

1.) Configure AQ Administrator account
The AQ Administrator user (‘aq_admin’) creates and owns the queuing infrastructure. The role 
AQ_ADMINISTATOR_ROLE that allows for the creation and administration of the queuing infrastructure needs 
to be granted to the ‘aq_admin’ user:

CREATE USER aq_admin IDENTIFIED BY aq_admin
DEFAULT TABLESPACE users 
TEMPORARY TABLESPACE temp; 

ALTER USER aq_admin QUOTA UNLIMITED ON users;

grant connect, resource to aq_admin;
GRANT aq_administrator_role TO aq_admin; 
GRANT connect TO aq_admin

GRANT create type TO aq_admin;

2.) Set up a simple message 
The following steps must be executed as the aq_admin user.

conn aq_admin/aq_admin
 
CREATE TYPE tst_message_type AS OBJECT 
(
id number,
name varchar2(40),
value number,
obs varchar2(4000)
);

3.) Create Queue Table and Queue 
Queues are implemented using a queue table that can hold multiple queues with the same payload type. 
The following creates a queue table ‘TST_QUEUE_TB’ and a queue ‘TST_QUEUE’. 

EXEC DBMS_AQADM.CREATE_QUEUE_TABLE 
(queue_table => 'aq_admin.tst_queue_tb',
queue_payload_type =>'aq_admin.tst_message_type'); 

exec DBMS_AQADM.CREATE_QUEUE 
(queue_name => 'TST_QUEUE', 
queue_table => 'aq_admin.tst_queue_tb',
queue_type => DBMS_AQADM.NORMAL_QUEUE,
max_retries => 0, 
retry_delay => 0,
retention_time => 1209600, 
dependency_tracking => FALSE, 
comment => 'Test AQ',
auto_commit => FALSE);

4.) Start Queue

exec DBMS_AQADM.START_QUEUE('TST_QUEUE');

5.) Configure AQ user account 

conn / as sysdba

CREATE USER aq_user IDENTIFIED BY aq_user
DEFAULT TABLESPACE users
TEMPORARY TABLESPACE temp;

GRANT aq_user_role TO aq_user; 
grant connect, resource to aq_user;
grant select on aq_admin.tst_queue_tb to aq_user;
GRANT EXECUTE ON aq_admin.tst_message_type TO aq_user;
exec DBMS_AQADM.GRANT_QUEUE_PRIVILEGE( privilege =>'ALL', queue_name => 'aq_admin.tst_queue',grantee => 'aq_user', grant_option => FALSE);

6.) ENQUEUE MESSAGE
 
CONNECT aq_user/aq_user

DECLARE
  l_enqueue_options     DBMS_AQ.enqueue_options_t;
  l_message_properties  DBMS_AQ.message_properties_t;
  l_message_handle      RAW(16);
  l_event_msg           AQ_ADMIN.tst_message_type;
BEGIN
  l_event_msg := AQ_ADMIN.tst_message_type(1,'TEST 1',1000,'OBS 1');

  DBMS_AQ.enqueue(queue_name          => 'aq_admin.tst_queue',        
                  enqueue_options     => l_enqueue_options,     
                  message_properties  => l_message_properties,   
                  payload             => l_event_msg,             
                  msgid               => l_message_handle);

  COMMIT;
END;
/


7.) DEQUEUE MESSAGE

CONNECT aq_user/aq_user

SET SERVEROUTPUT ON

DECLARE
  l_dequeue_options     DBMS_AQ.dequeue_options_t;
  l_message_properties  DBMS_AQ.message_properties_t;
  l_message_handle      RAW(16);
  l_event_msg           AQ_ADMIN.tst_message_type;
BEGIN
  DBMS_AQ.dequeue(queue_name          => 'aq_admin.tst_queue',
                  dequeue_options     => l_dequeue_options,
                  message_properties  => l_message_properties,
                  payload             => l_event_msg,
                  msgid               => l_message_handle);

  DBMS_OUTPUT.put_line ('id          : ' || l_event_msg.id);
  DBMS_OUTPUT.put_line ('Name: ' || l_event_msg.name);
  DBMS_OUTPUT.put_line ('Value   : ' || l_event_msg.value);
  DBMS_OUTPUT.put_line ('Obs   : ' || l_event_msg.obs);
  
  COMMIT;
END;
/

id          : 1
Name: TEST 1
Value   : 1000
Obs   : OBS 1

PL/SQL procedure successfully completed.

8.) Test Procedure:

DECLARE
  l_enqueue_options     DBMS_AQ.enqueue_options_t;
  l_message_properties  DBMS_AQ.message_properties_t;
  l_message_handle      RAW(16);
  l_event_msg           AQ_ADMIN.tst_message_type;
  l_dequeue_options     DBMS_AQ.dequeue_options_t;
  
  i number;
  qtd_msgs number;
  
BEGIN
   --- enqueue
  
  for i in 1..10 loop
  
      l_event_msg := AQ_ADMIN.tst_message_type(i,'TEST'||to_char(i),i*100,'OBS'||to_char(i));

      DBMS_AQ.enqueue(queue_name          => 'aq_admin.tst_queue',        
                      enqueue_options     => l_enqueue_options,     
                      message_properties  => l_message_properties,   
                      payload             => l_event_msg,             
                      msgid               => l_message_handle);

    
  end loop;
  commit;
  --- dequeue
   select count(*) into qtd_msgs from aq_admin.tst_queue_tb where state=0;
  
  for i in 1..qtd_msgs loop
  
       DBMS_AQ.dequeue(queue_name          => 'aq_admin.tst_queue',
                      dequeue_options     => l_dequeue_options,
                      message_properties  => l_message_properties,
                      payload             => l_event_msg,
                      msgid               => l_message_handle);

      DBMS_OUTPUT.put_line ('id          : ' || l_event_msg.id);
      DBMS_OUTPUT.put_line ('Name: ' || l_event_msg.name);
      DBMS_OUTPUT.put_line ('Value   : ' || l_event_msg.value);
      DBMS_OUTPUT.put_line ('Obs   : ' || l_event_msg.obs);
  
    end loop;  
  commit;
END;
/
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s