REM $Header: wfq_up.sql 26.0 2004/12/08 02:04:47 kma noship $ REM /*======================================================================+ REM | Copyright @ 2004, Oracle. All rights reserved. REM +======================================================================+ REM | FILENAME REM | wfq_up.sql REM | DESCRIPTION REM | Create and populate Workflow Inbound, Outbound, and REM | Deferred queue. REM | USAGE REM | sqlplus wfusr/wfpwd @wfq_up.sql wfusr wfpwd recordnum REM | EXAMPLE REM | sqlplus owf_mgr/owf_mgr @wfq_up.sql owf_mgr owf_mgr 5000 REM *======================================================================*/ set verify off set arraysize 1 WHENEVER SQLERROR EXIT FAILURE ROLLBACK; connect &1/&2 set serveroutput on size 100000; -- create queue table and queue declare queue_exists exception; queue_table_exists exception; pragma EXCEPTION_INIT(queue_exists, -24006); pragma EXCEPTION_INIT(queue_table_exists, -24001); begin dbms_output.put_line('Creating WF_DEFERRED queue table and queue'); dbms_aqadm.create_queue_table ( queue_table => 'WF_DEFERRED_TABLE_M', queue_payload_type => '&&1..WF_PAYLOAD_T', sort_list => 'PRIORITY,ENQ_TIME', multiple_consumers => TRUE, comment => 'Workflow Deferred Queue Table', compatible => '8.1' ); dbms_aqadm.create_queue ( queue_name => 'WF_DEFERRED_QUEUE_M', queue_table => 'WF_DEFERRED_TABLE_M', comment => 'Workflow Deferred Queue' ); exception when queue_table_exists then dbms_output.put_line('queue_table_exists'); null; when queue_exists then dbms_output.put_line('queue_exists'); null; when others then raise_application_error(-20000, 'Oracle Error Mkr1= ' ||to_char(sqlcode)||' - '||sqlerrm); end; / declare queue_exists exception; queue_table_exists exception; pragma EXCEPTION_INIT(queue_exists, -24006); pragma EXCEPTION_INIT(queue_table_exists, -24001); begin dbms_output.put_line('Creating WF_INBOUND queue table and queue'); dbms_aqadm.create_queue_table ( queue_table => 'WF_INBOUND_TABLE', queue_payload_type => '&&1..WF_PAYLOAD_T', sort_list => 'PRIORITY,ENQ_TIME', comment => 'Workflow Inbound Queue Table', compatible => '8.1' ); dbms_aqadm.create_queue ( queue_name => 'WF_INBOUND_QUEUE', queue_table => 'WF_INBOUND_TABLE', comment => 'Workflow Inbound Queue' ); exception when queue_table_exists then dbms_output.put_line('queue_table_exists'); null; when queue_exists then dbms_output.put_line('queue_exists'); null; when others then raise_application_error(-20000, 'Oracle Error Mkr2= ' ||to_char(sqlcode)||' - '||sqlerrm); end; / declare queue_exists exception; queue_table_exists exception; pragma EXCEPTION_INIT(queue_exists, -24006); pragma EXCEPTION_INIT(queue_table_exists, -24001); begin dbms_output.put_line('Creating WF_OUTBOUND queue table and queue'); dbms_aqadm.create_queue_table ( queue_table => 'WF_OUTBOUND_TABLE', queue_payload_type => '&&1..WF_PAYLOAD_T', sort_list => 'PRIORITY,ENQ_TIME', comment => 'Workflow Outbound Queue Table', compatible => '8.1' ); dbms_aqadm.create_queue ( queue_name => 'WF_OUTBOUND_QUEUE', queue_table => 'WF_OUTBOUND_TABLE', comment => 'Workflow Outbound Queue' ); exception when queue_table_exists then dbms_output.put_line('queue_table_exists'); null; when queue_exists then dbms_output.put_line('queue_exists'); null; when others then raise_application_error(-20000, 'Oracle Error Mkr3= ' ||to_char(sqlcode)||' - '||sqlerrm); end; / -- fixed "ORA-24033: no recipients for message" error declare app1 sys.aq$_agent; begin app1 := sys.aq$_agent('&&1', NULL, NULL); dbms_aqadm.add_subscriber('WF_DEFERRED_QUEUE_M',app1); exception when others then dbms_output.put_line('error found - '||sqlerrm); null; end; / begin dbms_output.put_line('Starting WF_DEFERRED_QUEUE_M'); dbms_aqadm.start_queue(queue_name => 'WF_DEFERRED_QUEUE_M'); exception when others then raise_application_error(-20000, 'Oracle Error Mkr4= ' ||to_char(sqlcode)||' - '||sqlerrm); end; / begin dbms_output.put_line('Starting WF_INBOUND_QUEUE'); dbms_aqadm.start_queue(queue_name => 'WF_INBOUND_QUEUE'); exception when others then raise_application_error(-20000, 'Oracle Error Mkr5= ' ||to_char(sqlcode)||' - '||sqlerrm); end; / begin dbms_output.put_line('Starting WF_OUTBOUND_QUEUE'); dbms_aqadm.start_queue(queue_name => 'WF_OUTBOUND_QUEUE'); exception when others then raise_application_error(-20000, 'Oracle Error Mkr6= ' ||to_char(sqlcode)||' - '||sqlerrm); end; / declare queue_name varchar2(30); ctr pls_integer := 0; mesg_payload wf_payload_t; mesg_handle raw(16) := null; mesg_prop dbms_aq.message_properties_t; enq_options dbms_aq.enqueue_options_t; agent sys.aq$_agent; cursor c_get_in_out_payload is select queue_name, msgid, itemtype, itemkey, actid, function_name, param_list, result from wf_queue_tmp where queue_name <> 'WF_DEFERRED_QUEUE_M'; begin dbms_output.put_line('Populating inbound and outbound queues'); open c_get_in_out_payload; mesg_payload := wf_payload_t(null, -- itemtype null, -- item key to_number(null), -- act id null, -- function name null, -- param list null -- result ); -- populate queue <> loop fetch c_get_in_out_payload into queue_name, mesg_handle, mesg_payload.itemtype, mesg_payload.itemkey, mesg_payload.actid, mesg_payload.function_name, mesg_payload.param_list, mesg_payload.result; if (c_get_in_out_payload%notfound) then exit queue_loop; end if; ctr := ctr + 1; dbms_aq.enqueue ( queue_name => queue_name, enqueue_options => enq_options, message_properties=> mesg_prop, payload=> mesg_payload, msgid=> mesg_handle ); dbms_output.put_line('in-out enqueued ['||mesg_handle||']'); end loop queue_loop; close c_get_in_out_payload; dbms_output.put_line('WF_INBOUND_QUEUE and WF_OUTBOUND_QUEUE enqueued, '||to_char(ctr)|| ' messages found.'); end; / -- populating defer queue declare q_version varchar2(200); event wf_payload_t; enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; msg_id raw(16); itt_tbl wf_q.ItemTypeTable; itk_tbl wf_q.ItemKeyTable; pac_tbl wf_q.ProcessActivityTable; begdate_tbl wf_q.BeginDateTable; arIdx pls_integer; delay binary_integer; record_limit pls_integer := to_number(&3); -- less than 2147483647, limited by MESSAGE_PROPERTIES_ARRAY_T Type enq_count pls_integer; mesg_payload_arr wf_payload_arr := wf_payload_arr(); mesg_prop_arr dbms_aq.message_properties_array_t; mesg_handle_arr dbms_aq.msgid_array_t; cursor curs_deferred is select CWIAS.ITEM_TYPE, CWIAS.ITEM_KEY, CWIAS.PROCESS_ACTIVITY, CWIAS.BEGIN_DATE from WF_ITEM_ACTIVITY_STATUSES CWIAS, WF_PROCESS_ACTIVITIES CWPA, WF_ACTIVITIES CWA, WF_ITEMS WI, WF_PROCESS_ACTIVITIES PWPA, WF_ITEM_ACTIVITY_STATUSES PWIAS where CWIAS.ACTIVITY_STATUS = 'DEFERRED' and CWIAS.PROCESS_ACTIVITY = CWPA.INSTANCE_ID and CWPA.ACTIVITY_ITEM_TYPE = CWA.ITEM_TYPE and CWPA.ACTIVITY_NAME = CWA.NAME and CWIAS.ITEM_TYPE = WI.ITEM_TYPE and CWIAS.ITEM_KEY = WI.ITEM_KEY and WI.BEGIN_DATE >= CWA.BEGIN_DATE and WI.BEGIN_DATE < nvl(CWA.END_DATE, WI.BEGIN_DATE+1) and CWPA.PROCESS_NAME = PWPA.ACTIVITY_NAME and CWPA.PROCESS_ITEM_TYPE = PWPA.ACTIVITY_ITEM_TYPE and PWPA.INSTANCE_ID = PWIAS.PROCESS_ACTIVITY and PWIAS.ITEM_TYPE = CWIAS.ITEM_TYPE and PWIAS.ITEM_KEY = CWIAS.ITEM_KEY and PWIAS.ACTIVITY_STATUS <> 'SUSPEND' and (CWA.FUNCTION_TYPE is not null or CWA.FUNCTION_TYPE = 'PLSQL') order by CWIAS.BEGIN_DATE, CWIAS.EXECUTION_TIME; begin -- dont make the data visible on the queue until a commit is issued -- this way queue data and normal table data (wf statuses) are in synch. enqueue_options.visibility := DBMS_AQ.ON_COMMIT; mesg_prop_arr := dbms_aq.message_properties_array_t(); -- re-seed everything to the queue open curs_deferred; -- populate queue <> loop fetch curs_deferred bulk collect into itt_tbl, itk_tbl, pac_tbl, begdate_tbl limit record_limit; if (itt_tbl.count = 0) then dbms_output.put_line('Nothing to enqueue to the deferred queue.'); exit queue_loop; end if; for arIdx in itt_tbl.first..itt_tbl.last loop dbms_output.put_line('itk_tbl('||arIdx||') '||itk_tbl(arIdx)); event := wf_payload_t(itt_tbl(arIdx), itk_tbl(arIdx), pac_tbl(arIdx), null, null, null); delay := greatest(round((begdate_tbl(arIdx) - sysdate)*86400+0.5),0); -- Set the delay if any if (delay < 0) then message_properties.delay := 0; elsif (delay >= power(2,31)) then -- message_properties.delay is BINARY_INTEGER, so check if delay is -- too big, and set the max delay to be (2**31)-1. message_properties.delay := power(2,31)-1; else message_properties.delay := delay; end if; message_properties.correlation := '&&1'||itt_tbl(arIdx); -- check the correlation is always set to something -- else it wil never be dequeued because we always default the dequeue -- corellation to '%' if message_properties.correlation is null then -- this shouldnt happen. message_properties.correlation := '%'; end if; mesg_payload_arr.extend; mesg_payload_arr(arIdx) := event; mesg_prop_arr.extend; mesg_prop_arr(arIdx) := message_properties; end loop; dbms_output.put_line('def enqing reclim['||to_char(record_limit)||'] payloadarr #['||mesg_payload_arr.count||'] proparr ['||mesg_prop_arr.count||']'); enq_count := dbms_aq.enqueue_array ( queue_name => 'WF_DEFERRED_QUEUE_M', enqueue_options => enqueue_options, array_size => mesg_payload_arr.count, -- array_size => record_limit, message_properties_array => mesg_prop_arr, payload_array => mesg_payload_arr, msgid_array => mesg_handle_arr ); -- might want to turn these message to debug log messages -- dbms_output.put_line('final msgid arr size is '||mesg_handle_arr.count); -- dbms_output.put_line('final payload size is '||mesg_payload_arr.count); -- dbms_output.put_line('final msgprop size is '||mesg_prop_arr.count); if (mesg_handle_arr.count = mesg_payload_arr.count and mesg_handle_arr.count = mesg_prop_arr.count) then -- update the message handle forall j in itt_tbl.FIRST..itt_tbl.LAST update wf_item_activity_statuses set outbound_queue_id = mesg_handle_arr(j) where item_type = itt_tbl(j) and item_key = itk_tbl(j) and process_activity = pac_tbl(j); end if; -- initialize varrays for next fetch mesg_handle_arr.delete; mesg_payload_arr.delete; mesg_prop_arr.delete; end loop queue_loop; close curs_deferred; end; / begin dbms_output.put_line('Dropping wf_queue_tmp'); execute immediate 'drop table wf_queue_tmp'; end; / REM Now update the Queue Version REM This is needed to prevent any old wf260dat.sql from running. REM The script only operates on LANGUAGE 'US'. begin update WF_RESOURCES set TEXT = '26.18' where NAME = 'WF_QUEUE_VERSION'; if (sql%rowcount = 0) then insert into WF_RESOURCES ( TYPE, NAME, ID, TEXT, PROTECT_LEVEL, CUSTOM_LEVEL, LANGUAGE, SOURCE_LANG ) values ( 'WFTKN', 'WF_QUEUE_VERSION', 0, '26.18', 0, 0, 'US', 'US' ); end if; end; / exit;