Edit D:\app\Administrator\product\11.2.0\dbhome_1\owb\wf\sql\wfqueb.pls
REM dbdrv: sql ~PROD ~PATH ~FILE none none none package &phase=plb \ REM dbdrv: checkfile:~PROD:~PATH:~FILE /*=======================================================================+ | Copyright (C) 1995 Oracle Corporation Redwood Shores, California, Usa| | All Rights Reserved. | +=======================================================================+ | FILENAME | wfqueb.pls | DESCRIPTION | Handles all processing for wf queues. | MODIFICATION LOG: | 06 JUN 2001 JWSMITH BUG 1819232 ADA ENHANCEMENT | - Added alt attr for IMG tag | - Added summary attr for table tags | - Added ID attr for TD tag | 02 JAN 2002 JWSMITH BUG 2001012 - Changed schema_name, username, | admin_role, l_schema_name to varchar2(320) | display_name to varchar2(360) | 21 FEB 2003 RRAHEJA BUG 2786474 - Tuned getCntMsgCt code for perf *=======================================================================*/ SET VERIFY OFF WHENEVER SQLERROR EXIT FAILURE ROLLBACK; WHENEVER OSERROR EXIT FAILURE ROLLBACK; set scan off; CREATE OR REPLACE PACKAGE BODY WF_QUEUE AS /* $Header: wfqueb.pls 26.47 2005/02/28 11:30:28 dmani ship $ */ -- -- Exceptions -- dequeue_timeout exception; pragma EXCEPTION_INIT(dequeue_timeout, -25228); dequeue_disabled exception; pragma EXCEPTION_INIT(dequeue_disabled, -25226); dequeue_outofseq exception; pragma EXCEPTION_INIT(dequeue_outofseq, -25237); no_queue exception; pragma EXCEPTION_INIT(no_queue, -24010); shutdown_pending exception; no_savepoint exception; pragma EXCEPTION_INIT(no_savepoint, -1086); msgid_notfound exception; pragma EXCEPTION_INIT(msgid_notfound, -25263); -- ==================================================================== -- -- Private Routine to check for shutdown -- -- ==================================================================== function check_instance return boolean as shutdown varchar2(3); begin select shutdown_pending into shutdown from v$instance; if shutdown = 'YES' then return(TRUE); else return(FALSE); end if; end; -- ==================================================================== -- -- Private Procedure which processes the payload dequeued off the -- Inbound queue . -- -- ==================================================================== -- Process_Inbound_Event (PRIVATE) -- Executes the payload dequeued off the inbound queue -- IN -- itemtype - itemtype,itemkey,actid to uniquely identify the -- itemkey - activity -- actid - -- message_handle - pointer to queue message -- p_payload - the message payload . Lets have it as in/out parameter -- so that if callback (for which it is in/out) changes something -- we can have it. procedure Process_Inbound_Event(itemtype in varchar2, itemkey in varchar2, actid in number, message_handle in raw, p_payload in out nocopy wf_payload_t) as colon number; status varchar2(30); plist varchar2(4000); attrpair varchar2(4000); delimiter number; aname varchar2(40); avalue varchar2(4000); lcorrelation varchar2(80); nvalue number; --required but not used by wf_engine.CB dvalue date; --required but not used by wf_engine.CB begin --process the parameter list. plist:= p_payload.param_list; if plist is not null then loop -- if plist is null then EXIT; end if; delimiter:=instr(plist,'^'); if delimiter = 0 then attrpair:=plist; else attrpair:=substr(plist,1,delimiter-1); end if; aname := upper(substr(attrpair,1,instr(attrpair,'=')-1)); avalue := substr(attrpair,instr(attrpair,'=')+1); begin --Set the value for the attribute wf_engine.SetItemAttrText(itemtype, itemkey, aname, avalue); exception when others then if ( wf_core.error_name = 'WFENG_ITEM_ATTR' ) then --If the attribute does not exist first create it --and then add the value Wf_Engine.AddItemAttr(itemtype, itemkey, aname); Wf_Engine.SetItemAttrText(itemtype, itemkey, aname, avalue); else raise; end if; end; exit when delimiter = 0; plist := substr(plist,delimiter+1); end loop; end if; --if payload contains a colon, then its ERROR else its COMPLETE status colon:= instr(p_payload.result,':'); if colon=0 or p_payload.result is null then -- check if activity is already complete wf_item_activity_status.status(itemtype,itemkey,actid,status); if (status is not null) and (status <> 'COMPLETE') then -- mark activity as Complete:<result> wf_engine.CB(command => 'COMPLETE', context =>itemtype||':'|| itemkey ||':'|| actid, text_value => p_payload.result, number_value => nvalue, date_value => dvalue); end if; else -- at the moment we only accept :ERROR:<error text> (may add other statuses later) if substr(p_payload.result,colon+1,5) = 'ERROR' then begin wf_core.clear; -- set the function name for courtesy. wf_core.token('FUNCTION_NAME', Wf_Activity.activity_function(itemtype, itemkey,actid)); wf_core.raise('WF_EXT_FUNCTION'); exception when others then null; end; --function name on payload is upto 200 char so use it to record error wf_core.error_stack := p_payload.function_name; wf_engine.CB(command => 'ERROR', context =>itemtype||':'|| itemkey ||':'|| actid, text_value => p_payload.result, number_value => nvalue, date_value => dvalue); end if; end if; --If we came successfully till here let us purge off the --data from the Q wf_queue.PurgeEvent(wf_queue.InboundQueue, message_handle, FALSE); exception when others then Wf_Core.Context('Wf_Queue', 'Process_Inbound_Event', itemtype,itemkey); raise; end Process_Inbound_Event; -- ==================================================================== -- Queue Setup Functions (PUBLIC) -- ==================================================================== function DeferredQueue return varchar2 as begin if (wf_queue.name_init = FALSE) then wf_queue.set_queue_names; end if; return (wf_queue.deferred_queue_name); exception when others then Wf_Core.Context('Wf_Queue', 'DeferredQueue'); raise; end; function OutboundQueue return varchar2 as begin if (wf_queue.name_init = FALSE) then wf_queue.set_queue_names; end if; return (wf_queue.outbound_queue_name); exception when others then Wf_Core.Context('Wf_Queue', 'OutboundQueue'); raise; end; function InboundQueue return varchar2 as begin if (wf_queue.name_init = FALSE) then wf_queue.set_queue_names; end if; return (wf_queue.inbound_queue_name); exception when others then Wf_Core.Context('Wf_Queue', 'InboundQueue'); raise; end; -- NAME: Set_queue_names (PRIVATE) -- called once at the beginning of a session to set up queue names -- when AQ supports db synonyms, remove this and use synonyms instead procedure set_queue_names as schema_name varchar2(320); begin --dont bother re-executing this if already initialized. if wf_queue.name_init then return; end if; schema_name := wf_core.translate('WF_SCHEMA'); -- Do not determine account name by STANDALONE vs. EMBEDDED any more -- Current_schema is the schema in effect. -- Sys_context is an 8i feature. Below allows us to tag on the -- intended schema whether the install is with invoker's right or -- definer's right (default). begin select sys_context('USERENV', 'CURRENT_SCHEMA') into wf_queue.account_name from sys.dual; exception when OTHERS then wf_queue.account_name := NULL; end; wf_queue.deferred_queue_name := schema_name||'.WF_DEFERRED_QUEUE_M'; wf_queue.outbound_queue_name := schema_name||'.WF_OUTBOUND_QUEUE'; wf_queue.inbound_queue_name := schema_name||'.WF_INBOUND_QUEUE'; wf_queue.name_init := TRUE; exception when others then Wf_Core.Context('Wf_Queue', 'Set_queue_names'); raise; end set_queue_names; -- ==================================================================== -- Public routines -- ==================================================================== -- NAME: PurgeEvent -- removes the event from the specified queue WITHOUT PROCESSING -- queuename - the queue to purge -- message_handle - the specific event to purge -- procedure PurgeEvent(queuename in varchar2, message_handle in raw, multiconsumer in boolean default FALSE) as event wf_payload_t; dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; msg_id raw(16); begin if message_handle is not null then dequeue_options.dequeue_mode := dbms_aq.REMOVE; dequeue_options.msgid := message_handle; dequeue_options.wait := dbms_aq.NO_WAIT; dequeue_options.navigation := dbms_aq.FIRST_MESSAGE; -- check if we need to have a consumer if (multiconsumer) then dequeue_options.consumer_name := wf_queue.account_name; end if; dbms_aq.dequeue ( queue_name => queuename, dequeue_options => dequeue_options, message_properties => message_properties, payload => event, msgid => msg_id ); end if; exception when dequeue_timeout then null; -- not found on queue so must already be removed. when msgid_notfound then null; -- Already purged from the queue. when others then Wf_Core.Context('Wf_Queue', 'PurgeEvent', queuename, rawtohex(message_handle)); raise; end PurgeEvent; -- NAME: PurgeItemtype -- removes all events belonging to an itemtype from the specified queue -- ** WARNING ** IT DOES NOT PROCESS THE EVENT -- queuename - the queue to purge -- itemtype - the itemtype to purge -- procedure PurgeItemtype(queuename in varchar2, itemtype in varchar2 default null, correlation in varchar2 default null ) as event wf_payload_t; dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; msg_id raw(16); begin dequeue_options.dequeue_mode := dbms_aq.REMOVE; dequeue_options.wait := dbms_aq.NO_WAIT; dequeue_options.navigation := dbms_aq.FIRST_MESSAGE; wf_queue.set_queue_names; if correlation is not null then dequeue_options.correlation := correlation; else dequeue_options.correlation := wf_queue.account_name||nvl(itemtype,'%'); end if; LOOP dbms_aq.dequeue ( queue_name => queuename, dequeue_options => dequeue_options, message_properties => message_properties, payload => event, msgid => msg_id ); END LOOP; exception when dequeue_timeout then null; -- nothing left on queue to remove when others then Wf_Core.Context('Wf_Queue', 'PurgeItemtype', queuename, itemtype); raise; end PurgeItemtype; -- ProcessDeferredEvent (PRIVATE) -- Executes the event payload dequeued off the deferred queue -- IN -- itemtype - itemtype,itemkey,actid to uniquely identify the -- itemkey - activity -- actid - -- message_handle - pointer to queue message -- minthreshold - threshold levels of the background engine -- maxthreshold -- procedure ProcessDeferredEvent(itemtype in varchar2, itemkey in varchar2, actid in number, message_handle in raw, minthreshold in number, maxthreshold in number) as begin Wf_Item_Activity_Status.Create_Status(itemtype, itemkey, actid, wf_engine.eng_active, null, null, null); -- Continue processing on activity begin begin begin savepoint wf_savepoint; Wf_Engine_Util.Process_Activity(itemtype, itemkey, actid, maxthreshold, TRUE); -- we successfully processed the activity so dequeue it. wf_queue.PurgeEvent(wf_queue.DeferredQueue, message_handle, TRUE); Exception when others then -- In the unlikely event this process thread raises an exception: -- 1. rollback any work in this process thread -- raise an error for the next excption handler to complete -- remaining steps. rollback to wf_savepoint; raise; end; exception when NO_SAVEPOINT then -- Catch any savepoint error in case of a commit happened. Wf_Core.Token('ACTIVITY', Wf_Engine.GetActivityLabel(actid)); Wf_Core.Raise('WFENG_COMMIT_IN_PROCESS'); end; exception when OTHERS then -- Remaining steps for proces thread raises an exception: -- 2. set this activity to error status -- 3. execute the error process (if any) -- 4. clear the error to continue with next activity -- **note the error stack will refer to the actid that has been -- rolled back! Wf_Core.Context('Wf_Queue', 'ProcessDeferredEvent', itemtype, to_char(minthreshold), to_char(maxthreshold)); Wf_Item_Activity_Status.Set_Error(itemtype, itemkey, actid, wf_engine.eng_exception, FALSE); Wf_Engine_Util.Execute_Error_Process(itemtype, itemkey, actid, wf_engine.eng_exception); Wf_Core.Clear; end; -- Commit work to insure this activity thread doesn't interfere -- with others. commit; Fnd_Concurrent.Set_Preferred_RBS; exception when others then Wf_Core.Context('Wf_Queue', 'ProcessDeferredEvent', itemtype, to_char(minthreshold), to_char(maxthreshold)); raise; end ProcessDeferredEvent; --Name: EnqueueInbound (PUBLIC) --Enqueues the result from an outbound event onto --the inbound queue. Wf will mark this as complete with the --given result when it processes the queue. procedure EnqueueInbound( itemtype in varchar2, itemkey in varchar2, actid in number, result in varchar2 default null, attrlist in varchar2 default null, correlation in varchar2 default null, error_stack in varchar2 default null) as handle raw(16); lcorrelation varchar2(80); lresult varchar2(30); begin if correlation is not null then lcorrelation := correlation; else wf_queue.set_queue_names; lcorrelation := wf_queue.account_name||itemtype; end if; -- if error stack is defined then set result to ERROR. if error_stack is null then lresult := result; else lresult := ':ERROR'; end if; wf_queue.Enqueue_Event(queuename =>wf_queue.InboundQueue, itemtype =>enqueueInbound.itemtype, itemkey =>enqueueInbound.itemkey, actid =>enqueueInbound.actid, funcname =>enqueueInbound.error_stack, correlation =>lcorrelation, paramlist =>enqueueInbound.attrlist, result =>lresult, message_handle =>handle); exception when others then Wf_Core.Context('Wf_Queue', 'EnqueueInbound', itemtype, itemkey, actid); raise; end EnqueueInbound; function Get_param_list (itemtype in varchar2, itemkey in varchar2, actid in number) return varchar2 as startdate date; paramlist varchar2(4000); lvalue varchar2(4000); cursor attr_list is select aa.name, aa.value_type, -- CONSTANT or ITEMATTR aa.type, -- NUMBER/TEXT/DATE etc aa.format, av.TEXT_VALUE, av.NUMBER_VALUE, av.DATE_VALUE from wf_activity_attr_values av, wf_activity_attributes aa, wf_activities a, wf_process_activities pa where pa.activity_item_type = a.item_type and pa.activity_name = a.name and pa.instance_id=actid and a.begin_date< startdate and nvl(a.end_date,startdate) >= startdate and a.item_type = aa.activity_item_type and a.name = aa.activity_name and a.version = aa.activity_version and av.process_activity_id = actid and av.name=aa.name order by aa.sequence; begin paramlist:=null; startdate:=wf_item.active_date(itemtype,itemkey); for attr_row in attr_list loop if (attr_row.value_type = 'ITEMATTR' and attr_row.text_value is not null) then -- null itemattr text_value means null value, not an error lvalue := wf_engine.GetItemAttrText(itemtype,itemkey, attr_row.text_value); else --must be CONSTANT if (attr_row.type = 'NUMBER') then if (attr_row.format is null) then lvalue := to_char(attr_row.NUMBER_VALUE); else lvalue := to_char(attr_row.NUMBER_VALUE, attr_row.format); end if; elsif (attr_row.type = 'DATE') then if (attr_row.format is null) then lvalue := to_char(attr_row.DATE_VALUE); else lvalue := to_char(attr_row.DATE_VALUE, attr_row.format); end if; else lvalue := attr_row.text_value; end if; end if; if paramlist is not null then -- Overflow, cannot hold anymore attributes. if (lengthb(paramlist||'^') > 4000) then exit; else paramlist := paramlist||'^'; end if; end if; if (lengthb(paramlist||attr_row.name||'='||lvalue) > 4000) then -- Overflow, cannot hold anymore attributes. paramlist:=substrb(paramlist||attr_row.name||'='||lvalue, 1, 4000); exit; else paramlist := paramlist||attr_row.name||'='||lvalue; end if; end loop; return(paramlist); exception when others then Wf_Core.Context('Wf_Queue', 'Get_param_list', itemtype, itemkey, actid); raise; end Get_param_list; --Name: DequeueOutbound (PUBLIC) procedure DequeueOutbound( dequeuemode in number, navigation in number default 1, correlation in varchar2 default null, itemtype in varchar2 default null, payload out NOCOPY wf_payload_t, message_handle in out NOCOPY raw, timeout out NOCOPY boolean) as lcorrelation varchar2(80); begin wf_queue.set_queue_names; if correlation is not null then lcorrelation := correlation; else lcorrelation := wf_queue.account_name||nvl(itemtype,'%'); end if; wf_queue.Dequeue_Event(queuename =>wf_queue.OutboundQueue, dequeuemode =>DequeueOutbound.dequeuemode, navigation =>DequeueOutbound.navigation, correlation =>lcorrelation, payload =>DequeueOutbound.payload, message_handle =>DequeueOutbound.message_handle, timeout =>DequeueOutbound.timeout); exception when others then Wf_Core.Context('Wf_Queue', 'DequeueOutbound', payload.itemtype, payload.itemkey, payload.actid); raise; end DequeueOutbound; --Name: DequeueEventDetail (PUBLIC) -- --Wrapper to Dequeue_Event in which the payload is EXPanded out to avoid --use of type itemtypes. procedure DequeueEventDetail( dequeuemode in number, navigation in number default 1, correlation in varchar2 default null, itemtype in out NOCOPY varchar2, itemkey out NOCOPY varchar2, actid out NOCOPY number, function_name out NOCOPY varchar2, param_list out NOCOPY varchar2, message_handle in out NOCOPY raw, timeout out NOCOPY boolean) as event wf_payload_t; lcorrelation varchar2(80); begin wf_queue.set_queue_names; --use the correlation or default it if null if DequeueEventDetail.correlation is not null then lcorrelation := DequeueEventDetail.correlation; else lcorrelation := wf_queue.account_name||nvl(itemtype,'%'); end if; -- call dequeue to retrieve the event wf_queue.Dequeue_Event(queuename=>wf_queue.OutboundQueue, dequeuemode=>DequeueEventDetail.dequeuemode, navigation =>DequeueEventDetail.navigation, correlation=>lcorrelation, payload=>event, message_handle=>DequeueEventDetail.message_handle, timeout =>DequeueEventDetail.timeout); --expand the payload structure DequeueEventDetail.itemtype:=event.itemtype; DequeueEventDetail.itemkey:=event.itemkey; DequeueEventDetail.actid:=event.actid; DequeueEventDetail.function_name:=event.function_name; DequeueEventDetail.param_list:=event.param_list; exception when others then Wf_Core.Context('Wf_Queue', 'DequeueEventDetail', itemtype||':'||itemkey,to_char(actid)); raise; end DequeueEventDetail; --Dequeue_Event (PRIVATE) -- --Dequeues an event (message) from any queue --IN -- QueueName - the queue name, may contain owner or database -- DeQueueMode - either 1 (Browse), 2 (Locked) or 3 (Remove) -- Navigation - either First or Next -- Correlation - helps restrict the queue -- Payload - the event actually dequeued -- message_handle - id for the event -- timeout - determines if anything was found or if the q timedout. procedure Dequeue_Event(queuename in varchar2, dequeuemode in number, navigation in number default 1, correlation in varchar2 default null, payload out NOCOPY wf_payload_t, message_handle in out NOCOPY raw, timeout out NOCOPY boolean, multiconsumer in boolean default FALSE) as dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; snap_too_old exception; pragma exception_init(snap_too_old, -1555); begin -- find out the schema name wf_queue.set_queue_names; dequeue_options.dequeue_mode := dequeuemode; dequeue_options.wait := dbms_aq.NO_WAIT; dequeue_options.navigation := navigation; -- if message_handle is set then use it instead of correlation -- NOTE: if message_handle is set FIRST/NEXT_MESSAGE dont have effect if message_handle is not null then dequeue_options.msgid := message_handle; dequeue_options.correlation := null; message_handle := null; else -- set correlation to item_type or % if its null if correlation is null then dequeue_options.correlation := '%'; else dequeue_options.correlation := correlation; end if; end if; -- check if we need to have a consumer if (multiconsumer) then dequeue_options.consumer_name := wf_queue.account_name; end if; begin dbms_aq.dequeue( queue_name => Dequeue_Event.queuename, dequeue_options => dequeue_options, message_properties => message_properties, payload => Dequeue_Event.payload, msgid => message_handle ); exception when snap_too_old then --Workaround for AQ when receiving ORA-01555 using NEXT_MESSAGE as --navigation. We will try to set to FIRST_MESSAGE and dequeue to --silently handle this exception. if (dequeue_options.navigation = dbms_aq.FIRST_MESSAGE) then raise; else dequeue_options.navigation := dbms_aq.FIRST_MESSAGE; dbms_aq.dequeue( queue_name => Dequeue_Event.queuename, dequeue_options => dequeue_options, message_properties => message_properties, payload => Dequeue_Event.payload, msgid => message_handle ); end if; when OTHERS then raise; end; timeout:= FALSE; exception when dequeue_timeout then timeout := TRUE; when others then if correlation is null then Wf_Core.Context('WF_QUEUE', 'Dequeue_Event', queuename, '%'); else Wf_Core.Context('WF_QUEUE', 'Dequeue_Event', queuename, correlation); end if; timeout := FALSE; raise; end Dequeue_Event; -- Activity_Valid (PRIVATE) -- checks the deferred activity is valid for processing -- -- IN -- event - the event to check -- message_handle of event in the deferred queue -- maxthreshold - the threshold level -- minthreshold -- function activity_valid (event in wf_payload_t, message_handle in raw, maxthreshold in number default null, minthreshold in number default null) return BOOLEAN is cost pls_integer; litemtype varchar2(8); l_begdate date; -- <dlam:3070112> resource_busy exception; pragma exception_init(resource_busy, -00054); begin -- Activity must be valid if -- 1) in given cost range -- 2) parent is not suspended -- note: suspendprocess/resumeprocess will remove/add deferred jobs -- <dlam:3070112> check begin date as well -- move the BEGIN_DATE, SYSDATE comparion to a separate clause SELECT CWA.COST, CWIAS.BEGIN_DATE into cost, l_begdate FROM WF_ITEM_ACTIVITY_STATUSES CWIAS, WF_PROCESS_ACTIVITIES CWPA, WF_ITEMS WI, WF_ACTIVITIES CWA where CWIAS.ACTIVITY_STATUS = 'DEFERRED' and CWIAS.PROCESS_ACTIVITY = CWPA.INSTANCE_ID and CWPA.ACTIVITY_ITEM_TYPE = CWA.ITEM_TYPE and CWPA.ACTIVITY_NAME = CWA.NAME and CWIAS.ITEM_TYPE = WI.ITEM_TYPE and CWIAS.ITEM_KEY = WI.ITEM_KEY and WI.BEGIN_DATE >= CWA.BEGIN_DATE and WI.BEGIN_DATE < nvl(CWA.END_DATE, WI.BEGIN_DATE+1) and CWIAS.ITEM_TYPE = event.itemtype and CWIAS.ITEM_KEY = event.itemkey and CWIAS.PROCESS_ACTIVITY = event.actid; -- dont bother locking: the original msg has been locked on the queue -- for update of CWIAS.ACTIVITY_STATUS NOWAIT; -- dont bother checking if parent is suspended. -- the suspend process should remove any jobs from the queue, -- but if any get through, process_activity will manage it. -- <dlam:3070112> -- begin date has not reached yet, leave the message alone. -- this is to work around a problem where the aq delay seems to -- to be shorter than expected if (l_begdate > sysdate) then return(FALSE); end if; if cost < nvl(minthreshold,cost) or cost > nvl(maxthreshold,cost) then return(FALSE); else return(TRUE); end if; exception when no_data_found then -- this event is no longer valid so remove it from the queue -- happens when a rewind moved activity to history table -- or the activity status is no longer defered wf_queue.PurgeEvent(wf_queue.DeferredQueue,message_handle,TRUE); return(FALSE); when resource_busy then return(FALSE); when others then Wf_Core.Context('Wf_Queue', 'Activity_valid', 'Invalid', event.itemtype||':'||event.itemkey, to_char(event.actid)); return(FALSE); end activity_valid; -- -- ==================================================================== -- -- Enqueue_Event (PRIVATE) -- Enqueues a message onto any WF queue (because all queues have same payload) -- procedure Enqueue_Event(queuename in varchar2, itemtype in varchar2, itemkey in varchar2, actid in number, correlation in varchar2 default null, delay in number default 0, funcname in varchar2 default null, paramlist in varchar2 default null, result in varchar2 default null, message_handle in out NOCOPY raw, priority in number default null) as event wf_payload_t; enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; begin wf_queue.set_queue_names; -- construct the event object event:=wf_payload_t(itemtype,itemkey,actid,funcname,paramlist,result); -- dont make the data visible on the queue until a commit is issued -- this way queue data and normal table data (wf statuses) are in synch. enqueue_options.visibility := DBMS_AQ.ON_COMMIT; -- Set the delay if any if delay <0 then message_properties.delay := 0; else -- message_properties.delay is BINARY_INTEGER, so check if delay is -- too big, and set the max delay to be (2**31)-1. if (delay >= power(2,31)) then message_properties.delay := power(2,31)-1; else message_properties.delay := delay; end if; end if; if correlation is not null then message_properties.correlation := enqueue_event.correlation; else message_properties.correlation := wf_queue.account_name||itemtype; end if; -- check the correlation is always set to something -- else it wil never be dequeued because we always default the dequeue -- corellation to '%' if message_properties.correlation is null then -- this shouldnt happen. message_properties.correlation := '%'; end if; -- Set the priority so that we can dequeue by priority if priority is not null then message_properties.priority := priority; end if; dbms_aq.enqueue ( queue_name => Enqueue_Event.queuename, enqueue_options => enqueue_options, message_properties => message_properties, payload => event, msgid => message_handle ); exception when others then Wf_Core.Context('Wf_Queue', 'Enqueue_event', itemtype, itemkey, to_char(actid), to_char(delay)); raise; end; -- ProcessInboundQueue (PUBLIC) -- reads everythig off the Inbound queue and records it as complete -- with the given result and updates item attributes as specified in -- the paramlist procedure ProcessInboundQueue (itemtype in varchar2 default null, correlation in varchar2 default null) as payload wf_payload_t; navigation varchar2(10); timeout boolean:= FALSE; cursor_name number; row_processed integer; message_handle raw(16); -- first_time boolean := TRUE; plist varchar2(4000); lcorrelation varchar2(80); nothing_processed boolean := TRUE; begin commit; Fnd_Concurrent.Set_Preferred_RBS; wf_queue.set_queue_names; if correlation is not null then lcorrelation := correlation; else lcorrelation := wf_queue.account_name||nvl(itemtype,'%'); end if; -- loop through the inbound queue. loop --Process until nothing left on the queue navigation := dbms_aq.FIRST_MESSAGE; nothing_processed :=TRUE; loop -- Process till timeout message_handle:=null; --Lets set a savepoint here --We would use this savepoint to rollback if we found that a --lock is not possible in this session for the reterived itemytype key wf_queue.Dequeue_Event(wf_queue.InboundQueue, dbms_aq.LOCKED, navigation, lcorrelation, payload, message_handle, timeout); -- if no message is found, the message may be enqueued with the -- old correlation format, so reset the correlation id and retry. if (navigation = dbms_aq.FIRST_MESSAGE and message_handle is null and correlation is null and lcorrelation <> nvl(itemtype,'%')) then lcorrelation := nvl(itemtype,'%'); goto nextmesg; end if; --else check timeout if (timeout) then EXIT; end if; --Bug 2607770 --Ensure that we have got a message --Now try to acquire the lock --Check the parameterlist null/not within Process_Inbound_Event if wf_item.acquire_lock(payload.itemtype, payload.itemkey) then --Process the payload wf_queue.Process_Inbound_Event(itemtype=>payload.itemtype, itemkey=>payload.itemkey, actid=>payload.actid, message_handle=>ProcessInboundQueue.message_handle, p_payload => payload); nothing_processed:=FALSE; end if; -- commit any processing or any clean up commit; Fnd_Concurrent.Set_Preferred_RBS; navigation := dbms_aq.NEXT_MESSAGE; <<nextmesg>> -- This is for the case when we reset the corrid and verify null; end loop; -- process till timeout exit when nothing_processed; end loop; exception when others then Wf_Core.Context('Wf_Queue', 'ProcessInboundQueue'); raise; end ProcessInboundQueue; procedure ProcessDeferredQueue (itemtype in varchar2 default null, minthreshold in number default null, maxthreshold in number default null, correlation in varchar2 default null) as payload wf_payload_t; timeout boolean:= FALSE; navigation varchar2(10); row_processed integer; message_handle raw(16); -- first_time boolean := TRUE; nothing_processed boolean:=TRUE; lcorrelation varchar2(80); begin wf_queue.set_queue_names; if correlation is not null then lcorrelation := correlation; -- for standalone, we first try the old correlation id format. elsif (wf_core.translate('WF_INSTALL') = 'STANDALONE' and itemtype is not null) then lcorrelation := itemtype; -- for embedded, there was never the old format, so we are fine. -- or it is standalone with null item type, we cannot support the -- old correlation id format; otherwise, it will pick up everything. else lcorrelation := wf_queue.account_name||nvl(itemtype,'%'); end if; loop -- keep processing the queue until there is nothing left navigation := dbms_aq.FIRST_MESSAGE; nothing_processed :=TRUE; loop -- keep processing until a timeout. message_handle:=null; wf_queue.Dequeue_Event( wf_queue.DeferredQueue, dbms_aq.LOCKED, navigation, lcorrelation, payload, message_handle, timeout, TRUE); -- if no message is found, the message may be enqueued with the -- new correlation format, so reset the correlation id and retry. if (navigation = dbms_aq.FIRST_MESSAGE and message_handle is null and correlation is null and lcorrelation = itemtype) then lcorrelation := wf_queue.account_name||nvl(itemtype,'%'); -- otherwise, process the message else if (timeout) then EXIT; end if; -- -- Execute the PL/SQL call stored in the payload if this is valid -- if activity_valid (payload, message_handle, maxthreshold, minthreshold ) AND wf_item.acquire_lock(payload.itemtype,payload.itemkey) then wf_queue.ProcessDeferredEvent(itemtype=>payload.itemtype, itemkey=>payload.itemkey, actid=>payload.actid, message_handle=>ProcessDeferredQueue.message_handle, minthreshold=>ProcessDeferredQueue.minthreshold, maxthreshold=>ProcessDeferredQueue.maxthreshold); nothing_processed:=FALSE; end if; -- commit any processing or any clean up from activity_valid commit; Fnd_Concurrent.Set_Preferred_RBS; -- -- Test for Instance Shutdown -- if wf_queue.check_instance then raise shutdown_pending; end if; navigation := dbms_aq.NEXT_MESSAGE; end if; end loop; -- process till time out exit when nothing_processed; end loop; exception when dequeue_disabled then Wf_Core.Context('Wf_Queue', 'ProcessDeferredQueue', 'Queue shutdown'); raise; when shutdown_pending then Wf_Core.Context('Wf_Queue', 'ProcessDeferredQueue', 'DB shutting down'); raise; when others then Wf_Core.Context('Wf_Queue', 'ProcessDeferredQueue'); raise; end ProcessDeferredQueue; --============================================================ -- Support utilities. not sure if we want to release these --============================================================ -- GetMessageHandle -- does a sequential search through the queue for the message handle function GetMessageHandle(queuename in varchar2, itemtype in varchar2, itemkey in varchar2, actid in number, correlation in varchar2 default null, multiconsumer in boolean default FALSE) return raw is event wf_payload_t; dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; msg_id raw(16); begin dequeue_options.dequeue_mode := dbms_aq.BROWSE; dequeue_options.wait := dbms_aq.NO_WAIT; wf_queue.set_queue_names; if correlation is not null then dequeue_options.correlation := correlation; else dequeue_options.correlation := wf_queue.account_name||nvl(itemtype,'%'); end if; if (multiconsumer) then dequeue_options.consumer_name := wf_queue.account_name; end if; --execute first read dequeue_options.navigation := dbms_aq.FIRST_MESSAGE; dbms_aq.dequeue ( queue_name => queuename, dequeue_options => dequeue_options, message_properties => message_properties, payload => event, msgid => msg_id ); if event.itemtype = itemtype and event.itemkey = itemkey and event.actid = nvl(actid,event.actid) then return (msg_id); end if; -- loop with next message LOOP dequeue_options.navigation := dbms_aq.NEXT_MESSAGE; dbms_aq.dequeue ( queue_name => queuename, dequeue_options => dequeue_options, message_properties => message_properties, payload => event, msgid => msg_id ); if event.itemtype = itemtype and event.itemkey = itemkey and event.actid = actid then return (msg_id); end if; END LOOP; return(null); exception -- timeout will fall to here when others then return(null); end GetMessageHandle; --============================================================= -- PUBLIC API to dequeue from exception queue to wf_error -- queue --============================================================= procedure DequeueException (queuename in varchar2) is l_event wf_event_t; x_dequeue_options dbms_aq.dequeue_options_t; x_message_properties dbms_aq.message_properties_t; x_msgid RAW(16); erragt wf_agent_t; lsysname varchar2(30); cmd varchar2(1000); no_messages exception; pragma exception_init (no_messages, -25228); begin -- To Dequeue from Exception Queue, consumer name must be null x_dequeue_options.consumer_name := null; x_dequeue_options.wait := 1; loop begin dbms_aq.dequeue(queue_name => queuename, dequeue_options => x_dequeue_options, message_properties => x_message_properties, /* OUT */ payload => l_event, /* OUT */ msgid => x_msgid); /* OUT */ /* ** Update the event to let everyone know it expired */ l_event.SetErrorMessage(wf_core.translate('WFE_MESSAGE_EXPIRED')); l_event.addParameterToList('ERROR_NAME', wf_core.translate('WFE_MESSAGE_EXPIRED') ); l_event.addParameterToList('ERROR_TYPE', 'ERROR'); /* ** As we can't use the private API SaveErrorToQueue ** we copy a little bit of code to do it */ select name into lsysname from wf_systems where guid = wf_core.translate('WF_SYSTEM_GUID'); erragt := wf_agent_t('WF_ERROR', lsysname); cmd := 'begin WF_ERROR_QH.enqueue(:v1, :v2); end;'; execute immediate cmd using in l_event, in erragt; commit; exception when no_messages then if (wf_log_pkg.level_event >= fnd_log.g_current_runtime_level) then wf_log_pkg.string(wf_log_pkg.level_event, 'wf.plsql.WF_QUEUE.DequeueException.queue_empty', 'No more messages in ExceptionDequeue.'); end if; exit; end; end loop; exception when others then Wf_Core.Context('Wf_Queue', 'DequeueException',queuename); raise; end DequeueException; --============================================================= -- Declare all developer APIs for Inbound queue manipulation -- --============================================================= -- -- ClearMsgStack -- Clears runtime cache procedure ClearMsgStack is begin wf_queue.stck_itemtype(1) := ''; wf_queue.stck_itemkey(1) := ''; wf_queue.stck_actid(1) := 0; wf_queue.stck_ctr := 0; exception when others then Wf_Core.Context('Wf_Queue', 'ClearMsgStack'); raise; end ClearMsgStack; --Name: WriteMsg --writes a message from stack to the queue procedure WriteMsg ( itemtype in varchar2, itemkey in varchar2, actid in number) is i pls_integer; begin i := wf_queue.SearchMsgStack(itemtype,itemkey,actid); wf_queue.EnqueueInbound( itemtype=>wf_queue.stck_itemtype(i), itemkey =>wf_queue.stck_itemkey(i), actid =>wf_queue.stck_actid(i), result =>wf_queue.stck_result(i), attrlist=>wf_queue.stck_attrlist(i)); exception when others then Wf_Core.Context('Wf_Queue', 'WriteMsg'); raise; end WriteMsg; --Name: CreateMsg --creates a message on the stack -- procedure CreateMsg ( itemtype in varchar2, itemkey in varchar2, actid in number) is i pls_integer; begin i := wf_queue.SearchMsgStack(itemtype,itemkey,actid); exception when others then Wf_Core.Context('Wf_Queue', 'CreateMsg'); raise; end CreateMsg; --Name: SetMsgAttr (PUBLIC) --Appends message attributes. -- procedure SetMsgAttr( itemtype in varchar2, itemkey in varchar2, actid in number, attrName in varchar2, attrValue in varchar2) is i pls_integer; begin i := SearchMsgStack (itemtype, itemkey, actid); if wf_queue.stck_attrlist(i) is null then wf_queue.stck_attrlist(i) := upper(attrName)||'='||AttrValue; else wf_queue.stck_attrlist(i) := wf_queue.stck_attrlist(i) ||'^'||attrName||'='||AttrValue; end if; exception when others then Wf_Core.Context('Wf_Queue', 'SetMsgAttr', itemtype, itemkey, actid, to_char(stck_ctr)); raise; end SetMsgAttr; --Name: SetMsgResult (PUBLIC) --Sets the result value for this message. -- procedure SetMsgResult( itemtype in varchar2, itemkey in varchar2, actid in number, result in varchar2) is i pls_integer; begin i := SearchMsgStack (itemtype, itemkey, actid); wf_queue.stck_result(i) :=result; exception when others then Wf_Core.Context('Wf_Queue', 'AddResult', itemtype, itemkey, actid, to_char(stck_ctr)); raise; end SetMsgResult; -- -- AddNewMsg (PRIVATE) -- Add a new message to the stack -- IN -- itemtype - item itemtype -- itemkey - item itemkey -- actid - instance id of process -- procedure AddNewMsg( itemtype in varchar2, itemkey in varchar2, actid in number) is begin -- Add the process to the stack wf_queue.stck_ctr := wf_queue.stck_ctr + 1; wf_queue.stck_itemtype(wf_queue.stck_ctr) := itemtype; wf_queue.stck_itemkey(wf_queue.stck_ctr) := itemkey; wf_queue.stck_actid(wf_queue.stck_ctr) := actid; wf_queue.stck_result(wf_queue.stck_ctr) := null; wf_queue.stck_AttrList(wf_queue.stck_ctr) := null; exception when others then Wf_Core.Context('Wf_Queue', 'AddNewMsg', itemtype, itemkey, actid, to_char(stck_ctr)); raise; end AddNewMsg; --Name: SearchMsgStack (PRIVATE) --Desc: sequential search of the message stack -- starting from the top -- function SearchMsgStack ( itemtype in varchar2, itemkey in varchar2, actid in number) RETURN number is i pls_integer; begin if ( nvl(wf_queue.stck_ctr, 0) > 0) then for i in reverse 1 .. wf_queue.stck_ctr loop if ((itemtype = wf_queue.stck_itemtype(i)) and (itemkey = wf_queue.stck_itemkey(i)) and (actid = wf_queue.stck_actid(i))) then -- Found a match. return(i); end if; end loop; end if; -- not in the Stack so add it. AddNewMsg(itemtype,itemkey,actid); return (stck_ctr); end SearchMsgStack; -- -- Generic_Queue_Display -- Produce list of generic_queues -- -- MODIFICATION LOG: -- 06-JUN-2001 JWSMITH BUG 1819232 - added alt attrib for IMG tag for ADA -- - Added summary attr for table tags for ADA -- - Added ID attr for TD tags for ADA -- procedure Generic_Queue_Display is username varchar2(320); -- Username to query admin_role varchar2(320); -- Role for admin mode admin_mode varchar2(1) := 'N'; realname varchar2(360); -- Display name of username s0 varchar2(2000); -- Dummy l_error_msg varchar2(240); l_url varchar2(240); l_media varchar2(240) := wfa_html.image_loc; l_icon varchar2(40); l_text varchar2(240); l_onmouseover varchar2(240); cursor queues_cursor is select wfq.protocol, wfq.inbound_outbound, wfq.description, wfq.queue_count from wf_queues wfq where NVL(wfq.disable_flag, 'N') = 'N' order by wfq.protocol, wfq.inbound_outbound; rowcount number; begin -- Check current user has admin authority wfa_sec.GetSession(username); username := upper(username); wf_directory.GetRoleInfo(username, realname, s0, s0, s0, s0); admin_role := wf_core.translate('WF_ADMIN_ROLE'); if (admin_role = '*' or Wf_Directory.IsPerformer(username, admin_role)) then admin_mode := 'Y'; else l_error_msg := wf_core.translate('WFPREF_INVALID_ADMIN'); end if; -- Set page title htp.htmlOpen; htp.headOpen; htp.p('<BASE TARGET="_top">'); htp.title(wf_core.translate('WFGENERIC_QUEUE_TITLE')); wfa_html.create_help_function('wf/links/dmr.htm?DMREP'); htp.headClose; wfa_sec.Header(FALSE, '',wf_core.translate('WFGENERIC_QUEUE_TITLE'), FALSE); htp.br; IF (admin_mode = 'N') THEN htp.center(htf.bold(l_error_msg)); return; END IF; -- Column headers htp.tableOpen(cattributes=>'border=1 cellpadding=3 bgcolor=white width="100%" summary=""'); htp.tableRowOpen(cattributes=>'bgcolor=#006699'); htp.tableHeader(cvalue=>'<font color=#FFFFFF>'|| wf_core.translate('PROTOCOL')||'</font>', calign=>'Center', cattributes=>'id="' || wf_core.translate('PROTOCOL') || '"'); htp.tableHeader(cvalue=>'<font color=#FFFFFF>'|| wf_core.translate('QUEUE_DESCRIPTION')||'</font>', calign=>'Center', cattributes=>'id="' || wf_core.translate('QUEUE_DESCRIPTION') || '"'); htp.tableHeader(cvalue=>'<font color=#FFFFFF>'|| wf_core.translate('INBOUND_PROMPT')||'</font>', calign=>'Center', cattributes=>'id="' || wf_core.translate('INBOUND_PROMPT') || '"'); htp.tableHeader(cvalue=>'<font color=#FFFFFF>'|| wf_core.translate('QUEUE_COUNT')||'</font>', calign=>'Center', cattributes=>'id="' || wf_core.translate('QUEUE_COUNT') || '"'); htp.tableHeader(cvalue=>'<font color=#FFFFFF>'|| wf_core.translate('VIEW_DETAIL')||'</font>', calign=>'Center', cattributes=>'id="' || wf_core.translate('VIEW_DETAIL') || '"'); htp.tableHeader(cvalue=>'<font color=#FFFFFF>'|| wf_core.translate('DELETE')||'</font>', calign=>'Center', cattributes=>'id="' || wf_core.translate('DELETE') || '"'); htp.tableRowClose; htp.tableRowOpen; htp.tableRowClose; -- Show all nodes for queues in queues_cursor loop htp.tableRowOpen(null, 'TOP'); htp.tableData(htf.anchor2( curl=>wfa_html.base_url|| '/wf_queue.generic_queue_edit?p_protocol='|| queues.protocol||'&p_inbound_outbound='|| queues.inbound_outbound, ctext=>queues.protocol, ctarget=>'_top'), 'Left', cattributes=>'headers="' || wf_core.translate('PROTOCOL') || '"'); htp.tableData(queues.description, 'left', cattributes=>'headers="' || wf_core.translate('QUEUE_DESCRIPTION') || '"'); htp.tableData(queues.inbound_outbound, 'left', cattributes=>'headers="' || wf_core.translate('INBOUND_PROMPT') || '"'); htp.tableData(queues.queue_count, 'left', cattributes=>'headers="' || wf_core.translate('QUEUE_COUNT') || '"'); htp.tableData(htf.anchor2(curl=>wfa_html.base_url|| '/wf_queue.Generic_Queue_View_Detail?p_protocol='|| queues.protocol||'&p_inbound_outbound='|| queues.inbound_outbound, ctext=>'<IMG SRC="'||wfa_html.image_loc||'affind.gif" alt="'||wf_core.translate('FIND') || '"BORDER=0>'), 'center', cattributes=>'valign="MIDDLE" headers="' || wf_core.translate('VIEW_DETAIL') || '"'); htp.tableData(htf.anchor2(curl=>wfa_html.base_url|| '/wf_queue.generic_queue_confirm_delete?p_protocol='|| queues.protocol||'&p_inbound_outbound='|| queues.inbound_outbound, ctext=>'<IMG SRC="'||wfa_html.image_loc||'FNDIDELR.gif" alt="' || wf_core.translate('WFRTG_DELETE') || '" BORDER=0>'), 'center', cattributes=>'valign="MIDDLE" headers="' || wf_core.translate('DELETE') || '"'); end loop; htp.tableclose; htp.br; htp.tableopen(calign=>'CENTER',cattributes=>'summary=""'); --Add new node Button htp.tableRowOpen; l_url := wfa_html.base_url||'/wf_queue.generic_queue_edit'; l_icon := 'FNDJLFOK.gif'; l_text := wf_core.translate ('WFQUEUE_CREATE'); l_onmouseover := wf_core.translate ('WFQUEUE_CREATE'); htp.p('<TD ID="">'); wf_pref.create_reg_button (l_url, l_onmouseover, l_media, l_icon, l_text); htp.p('</TD>'); htp.tableRowClose; htp.tableclose; wfa_sec.Footer; htp.htmlClose; exception when others then wf_core.context('FND_DOCUMENT_MANAGEMENT', 'Generic_Queue_Display'); raise; end Generic_Queue_Display; -- -- Generic_Queue_View_Detail -- Produce list of generic_queues -- -- MODIFICATION LOG: -- 06-JUN-2001 JWSMITH BUG 1819232 - added alt attrib for IMG tag for ADA -- - Added summary attribute for table tags for ADA -- procedure Generic_Queue_View_Detail ( p_protocol IN VARCHAR2 DEFAULT NULL, p_inbound_outbound IN VARCHAR2 DEFAULT NULL ) IS l_count number := 0; username varchar2(320); -- Username to query admin_role varchar2(320); -- Role for admin mode admin_mode varchar2(1) := 'N'; realname varchar2(360); -- Display name of username s0 varchar2(2000); -- Dummy l_error_msg varchar2(240); l_url varchar2(240); l_media varchar2(240) := wfa_html.image_loc; l_icon varchar2(40); l_text varchar2(240); l_onmouseover varchar2(240); l_sql varchar2(1000); begin -- Check current user has admin authority wfa_sec.GetSession(username); username := upper(username); wf_directory.GetRoleInfo(username, realname, s0, s0, s0, s0); admin_role := wf_core.translate('WF_ADMIN_ROLE'); if (admin_role = '*' or Wf_Directory.IsPerformer(username, admin_role)) then admin_mode := 'Y'; else l_error_msg := wf_core.translate('WFPREF_INVALID_ADMIN'); end if; -- Set page title htp.htmlOpen; htp.headOpen; htp.p('<BASE TARGET="_top">'); htp.title(wf_core.translate('WFGENERIC_QUEUE_TITLE')); wfa_html.create_help_function('wf/links/dmr.htm?DMREP'); htp.headClose; wfa_sec.Header(FALSE, '',wf_core.translate('WFGENERIC_QUEUE_TITLE'), FALSE); htp.br; IF (admin_mode = 'N') THEN htp.center(htf.bold(l_error_msg)); return; END IF; SELECT queue_count INTO l_count FROM wf_queues WHERE UPPER(p_protocol) = protocol AND p_inbound_outbound = inbound_outbound; -- Column headers htp.tableOpen(cattributes=>'border=1 cellpadding=3 bgcolor=white width="100%" summary=""'); htp.tableRowOpen(cattributes=>'bgcolor=#006699'); htp.tableHeader(cvalue=>'<font color=#FFFFFF>'|| wf_core.translate('PROTOCOL')||'</font>', calign=>'Center', cattributes=>'id="' || wf_core.translate('PROTOCOL') || '"'); htp.tableHeader(cvalue=>'<font color=#FFFFFF>'|| wf_core.translate('QUEUE_NUMBER')||'</font>', calign=>'Center', cattributes=>'id="' || wf_core.translate('QUEUE_NUMBER') || '"'); htp.tableHeader(cvalue=>'<font color=#FFFFFF>'|| wf_core.translate('QUEUE_NAME')||'</font>', calign=>'Center', cattributes=>'id="' || wf_core.translate('QUEUE_NAME') || '"'); htp.tableHeader(cvalue=>'<font color=#FFFFFF>'|| wf_core.translate('QUEUE_COUNT')||'</font>', calign=>'Center', cattributes=>'id="' || wf_core.translate('QUEUE_COUNT') || '"'); htp.tableHeader(cvalue=>'<font color=#FFFFFF>'|| wf_core.translate('VIEW_DETAIL')||'</font>', calign=>'Center', cattributes=>'id="' || wf_core.translate('VIEW_DETAIL') || '"'); htp.tableRowClose; htp.tableRowOpen; htp.tableRowClose; -- Show all queues for the given protocol for ii in 1..l_count loop htp.tableRowOpen(null, 'TOP'); htp.tableData(p_protocol, 'left', cattributes=>'headers="' || wf_core.translate('PROTOCOL') || '"'); htp.tableData(to_char(ii), 'left', cattributes=>'headers="' || wf_core.translate('QUEUE_NUMBER') || '"'); -- p_protocol and p_inbound_outbound were verified above -- ii must be a number -- BINDVAR_SCAN_IGNORE htp.tableData(wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_QUEUE', 'left', cattributes=>'headers="' || wf_core.translate('QUEUE_NAME') || '"'); /* ** Check to see if there are any messages in the specified queue */ l_sql := 'SELECT COUNT(1) FROM WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_TABLE'; execute immediate l_sql INTO l_count; htp.tableData(to_char(l_count), 'left', cattributes=>'headers="' || wf_core.translate('QUEUE_COUNT') || '"'); htp.tableData(htf.anchor2(curl=>wfa_html.base_url|| '/wf_queue.generic_queue_display_contents?p_protocol='|| p_protocol||'&p_inbound_outbound='|| p_inbound_outbound||'&p_queue_number='|| to_char(ii)||'&p_message_number=1', ctext=>'<IMG SRC="'||wfa_html.image_loc||'affind.gif" alt="' || wf_core.translate('FIND') || '" BORDER=0>'), 'center', cattributes=>'valign="MIDDLE" headers="' || wf_core.translate('VIEW_DETAIL') || '"'); end loop; htp.tableclose; htp.br; wfa_sec.Footer; htp.htmlClose; exception when others then wf_core.context('FND_DOCUMENT_MANAGEMENT', 'Generic_Queue_View_Detail'); raise; end Generic_Queue_View_Detail; -- MODIFICATION LOG: -- 06-JUN-2001 JWSMITH BUG 1819232 - Added summary attr for table tag for ADA -- - Added ID attr for TD tags for ADA procedure generic_queue_display_contents (p_protocol IN VARCHAR2 DEFAULT NULL, p_inbound_outbound IN VARCHAR2 DEFAULT NULL, p_queue_number IN NUMBER DEFAULT NULL, p_message_number IN NUMBER DEFAULT 1) IS username varchar2(320); -- Username to query admin_role varchar2(320); -- Role for admin mode admin_mode varchar2(1) := 'N'; l_media varchar2(240) := wfa_html.image_loc; l_icon varchar2(40) := 'FNDILOV.gif'; l_text varchar2(240) := ''; l_onmouseover varchar2(240) := wf_core.translate ('WFPREF_LOV'); l_url varchar2(4000); l_error_msg varchar2(240); l_more_data BOOLEAN := TRUE; dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; ii number := 0; l_loc number := 1; l_queue_name varchar2(30); l_msg_id RAW(16); begin /* wf_message_payload_t is obsolete from 2.6.4 onwards */ null; exception when others then Wf_Core.Context('Wf_Queue', 'generic_queue_display_contents', p_protocol, p_inbound_outbound); raise; end generic_queue_display_contents; -- MODIFICATION LOG: -- 06-JUN-2001 JWSMITH BUG 1819232 - Added summary attr for table tag for ADA -- - Added ID attr for TD tags procedure Generic_Queue_Edit ( p_protocol IN VARCHAR2 DEFAULT NULL, p_inbound_outbound IN VARCHAR2 DEFAULT NULL ) IS username varchar2(320); -- Username to query admin_role varchar2(320); -- Role for admin mode admin_mode varchar2(1) := 'N'; l_inbound_selected varchar2(1) := 'N'; l_outbound_selected varchar2(1) := 'N'; l_description VARCHAR2(240); l_queue_count NUMBER; l_media varchar2(240) := wfa_html.image_loc; l_icon varchar2(40) := 'FNDILOV.gif'; l_text varchar2(240) := ''; l_onmouseover varchar2(240) := wf_core.translate ('WFPREF_LOV'); l_url varchar2(4000); l_error_msg varchar2(240); BEGIN -- Check current user has admin authority wfa_sec.GetSession(username); username := upper(username); admin_role := wf_core.translate('WF_ADMIN_ROLE'); if (admin_role = '*' or Wf_Directory.IsPerformer(username, admin_role)) then admin_mode := 'Y'; else l_error_msg := wf_core.translate('WFPREF_INVALID_ADMIN'); end if; /* ** If this protocol already exists then go fetch the definition */ IF (p_protocol IS NOT NULL) THEN SELECT description, queue_count INTO l_description, l_queue_count FROM wf_queues WHERE protocol = p_protocol AND inbound_outbound = p_inbound_outbound; END IF; -- Set page title htp.htmlOpen; htp.headOpen; htp.title(wf_core.translate('WFQUEUE_EDIT_QUEUE_TITLE')); wfa_html.create_help_function('wf/links/dmr.htm?DMREP'); wf_lov.OpenLovWinHtml; htp.headClose; -- Page header wfa_sec.Header(FALSE, '', wf_core.translate('WFQUEUE_EDIT_QUEUE_TITLE'), TRUE); IF (admin_mode = 'N') THEN htp.center(htf.bold(l_error_msg)); return; END IF; htp.tableopen(calign=>'CENTER',cattributes=>'summary="' || wf_core.translate('WFQUEUE_EDIT_QUEUE_TITLE') || '"'); htp.p('<FORM NAME="FND_GENERIC_QUEUE" ACTION="wf_queue.generic_queue_update" METHOD="POST">'); -- Protocol Name htp.tableRowOpen; htp.tableData(cvalue=>'<LABEL FOR="i_protocol">' || wf_core.translate('PROTOCOL') || '</LABEL>', calign=>'right', cattributes=>'id=""'); htp.tableData(htf.formText(cname=>'p_protocol', csize=>'30', cvalue=>p_protocol, cmaxlength=>'30', cattributes=>'id="i_protocol"'), cattributes=>'id=""'); htp.tableRowClose; -- Inbound/outbound htp.tableRowOpen; htp.tableData(cvalue=>'<LABEL FOR="i_inbound_outbound">' || wf_core.translate('INBOUND_OUTBOUND') || '</LABEL>', calign=>'right', cattributes=>'id=""'); if (NVL(p_inbound_outbound, 'OUTBOUND') = 'INBOUND') then l_inbound_selected := 'Y'; l_outbound_selected := NULL; else l_inbound_selected := NULL; l_outbound_selected := 'Y'; end if; htp.p('<TD ID="">'); htp.formSelectOpen(cname=>'p_inbound_outbound',cattributes=>'id="i_inbound_outbound"'); htp.formSelectOption(cvalue=>wf_core.translate('INBOUND'), cattributes=>'value=INBOUND', cselected=>l_inbound_selected); htp.formSelectOption(cvalue=>wf_core.translate('OUTBOUND'), cattributes=>'value=OUTBOUND', cselected=>l_outbound_selected); htp.formSelectClose; htp.p('</TD>'); htp.tableRowClose; -- Description htp.tableRowOpen; htp.tableData(cvalue=>'<LABEL FOR="i_description">' || wf_core.translate('DESCRIPTION') || '"', calign=>'right', cattributes=>'id=""'); htp.tableData(htf.formText(cname=>'p_description', csize=>'30', cvalue=>l_description, cmaxlength=>'240', cattributes=>'id="i_description"'), cattributes=>'id=""'); htp.tableRowClose; -- Count htp.tableRowOpen; htp.tableData(cvalue=>'<LABEL FOR="i_count">' || wf_core.translate('COUNT') || '"', calign=>'right', cattributes=>'id=""'); htp.tableData(htf.formText(cname=>'p_queue_count', csize=>'10', cvalue=>l_queue_count, cmaxlength=>'20', cattributes=>'id="i_count"'), cattributes=>'id=""'); htp.tableRowClose; -- keep track of the original protocol and the inbound/outbound -- value in case the name changes htp.formHidden(cname=>'p_original_protocol', cvalue=>p_protocol); htp.formHidden(cname=>'p_original_inbound', cvalue=>p_inbound_outbound); htp.tableclose; htp.br; htp.tableopen(calign=>'CENTER',cattributes=>'summary=""'); --Submit Button htp.tableRowOpen; l_url := 'javascript:document.FND_GENERIC_QUEUE.submit()'; l_icon := 'FNDJLFOK.gif'; l_text := wf_core.translate ('WFMON_OK'); l_onmouseover := wf_core.translate ('WFMON_OK'); htp.p('<TD ID="">'); wf_pref.create_reg_button (l_url, l_onmouseover, l_media, l_icon, l_text); htp.p('</TD>'); l_url := wfa_html.base_url||'/fnd_document_management.Generic_Queue_Display'; l_icon := 'FNDJLFCN.gif'; l_text := wf_core.translate ('CANCEL'); l_onmouseover := wf_core.translate ('CANCEL'); htp.p('<TD ID="">'); wf_pref.create_reg_button (l_url, l_onmouseover, l_media, l_icon, l_text); htp.p('</TD>'); htp.tableRowClose; htp.tableclose; htp.formClose; wfa_sec.Footer; htp.htmlClose; exception when others then wf_core.context('FND_DOCUMENT_MANAGEMENT', 'Generic_Queue_edit'); raise; END Generic_Queue_Edit; procedure generic_queue_delete_check (p_protocol in varchar2, p_inbound_outbound in varchar2, p_queue_start_range in number, p_queue_end_range in number) IS ii NUMBER := 0; l_count NUMBER := 0; l_sql varchar2(1000); BEGIN /* ** Check to make sure there are no messages in the queue before ** you delete it. */ for ii in p_queue_start_range..p_queue_end_range loop /* ** Check to see if there are any messages in the specified queue */ -- p_protocol and p_inbound was verified before coming here. -- BINDVAR_SCAN_IGNORE l_sql := 'SELECT COUNT(1) INTO :a FROM WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_TABLE'; execute immediate l_sql using l_count; /* ** If you find a row then error this call */ if (l_count > 0) then wf_core.token('PROTOCOL', p_protocol); wf_core.token('INBOUND_OUTBOUD', p_inbound_outbound); wf_core.token('QUEUE_NUMBER', to_char(ii)); wf_core.raise('WFQUEUE_QUEUE_CONTENT'); end if; end loop; exception when others then Wf_Core.Context('Wf_Queue', 'generic_queue_delete_check', p_protocol, p_inbound_outbound); raise; end generic_queue_delete_check; procedure generic_queue_delete_queues (p_protocol in varchar2, p_inbound_outbound in varchar2, p_queue_start_range in number, p_queue_end_range in number) IS ii NUMBER := 0; l_count NUMBER := 0; BEGIN /* ** Delete the queues and queue tables */ for ii in p_queue_start_range..p_queue_end_range loop /* ** Stop the queue */ dbms_aqadm.stop_queue(queue_name => wf_core.translate('WF_SCHEMA')||'.'||'WF_'|| p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'|| to_char(ii)||'_QUEUE'); /* ** Delete the Queues */ dbms_aqadm.drop_queue( queue_name => wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'|| substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_QUEUE'); /* ** Delete the Queue Table */ dbms_aqadm.drop_queue_table ( queue_table => wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'||substr(p_inbound_outbound, 1, 1)||'_'||to_char(ii)||'_TABLE'); end loop; exception when others then Wf_Core.Context('Wf_Queue', 'generic_queue_delete_queues', p_protocol, p_inbound_outbound); raise; end generic_queue_delete_queues; -- MODIFICATION LOG: -- 06-JUN-2001 JWSMITH BUG 1819232 - Added summary attr for table tag for ADA -- - Added ID attr for TD tags procedure Generic_Queue_Update ( p_protocol IN VARCHAR2 DEFAULT NULL, p_inbound_outbound IN VARCHAR2 DEFAULT NULL, p_description IN VARCHAR2 DEFAULT NULL, p_queue_count IN VARCHAR2 DEFAULT NULL, p_original_protocol IN VARCHAR2 DEFAULT NULL, p_original_inbound IN VARCHAR2 DEFAULT NULL ) IS username varchar2(320); -- Username to query admin_role varchar2(320); -- Role for admin mode admin_mode varchar2(1) := 'N'; l_count number := 0; l_media varchar2(240) := wfa_html.image_loc; l_icon varchar2(30) := 'FNDILOV.gif'; l_text varchar2(240) := ''; l_onmouseover varchar2(240) := wf_core.translate ('WFPREF_LOV'); l_url varchar2(4000); l_error_msg varchar2(240); BEGIN -- Check current user has admin authority wfa_sec.GetSession(username); username := upper(username); admin_role := wf_core.translate('WF_ADMIN_ROLE'); if (admin_role = '*' or Wf_Directory.IsPerformer(username, admin_role)) then admin_mode := 'Y'; else l_error_msg := wf_core.translate('WFPREF_INVALID_ADMIN'); end if; IF (admin_mode = 'N') THEN htp.center(htf.bold(l_error_msg)); return; END IF; -- Check to make sure the protocol does not already exist IF (p_original_protocol IS NULL) THEN SELECT count(1) INTO l_count FROM wf_queues WHERE UPPER(p_protocol) = protocol AND p_inbound_outbound = inbound_outbound; if (l_count > 0) then htp.p('<BODY bgcolor=#cccccc>'); htp.center(htf.bold(wf_core.translate('WFQUEUE_ALREADY_EXISTS'))); htp.br; htp.tableopen(calign=>'CENTER',cattributes=>'summary=""'); --Submit Button htp.tableRowOpen; l_url := wfa_html.base_url|| '/wf_queue.generic_queue_edit'; l_icon := 'FNDJLFOK.gif'; l_text := wf_core.translate ('WFMON_OK'); l_onmouseover := wf_core.translate ('WFMON_OK'); htp.p('<TD ID="">'); wf_pref.create_reg_button (l_url, l_onmouseover, l_media, l_icon, l_text); htp.p('</TD>'); htp.tablerowclose; htp.tableclose; htp.p('</BODY>'); return; else wf_queue.create_generic_queue (p_protocol=>p_protocol, p_inbound_outbound => p_inbound_outbound, p_description => p_description, p_queue_count => to_number(p_queue_count)); end if; else null; /* wf_queue.update_generic_queue (p_protocol=>p_protocol, p_inbound_outbound => p_inbound_outbound, p_description => p_description, p_queue_count => to_number(p_queue_count), p_original_protocol=> p_original_protocol, p_original_inbound=> p_original_inbound); */ end if; -- use owa_util.redirect_url to redirect the URL to the home page owa_util.redirect_url(curl=>wfa_html.base_url || '/wf_queue.Generic_Queue_Display', bclose_header=>TRUE); exception when others then wf_core.context('FND_DOCUMENT_MANAGEMENT', 'Generic_Queue_update'); raise; END Generic_Queue_Update; /* ** Create a generic queue with the object type of WF_MESSAGE_PAYLOAD_T which ** is basically just a clob */ procedure create_generic_queue (p_protocol IN VARCHAR2, p_inbound_outbound IN VARCHAR2, p_description IN VARCHAR2, p_queue_count IN NUMBER) IS l_count NUMBER := 0; begin /* wf_message_payload_t is obsolete from 2.6.4 onwards */ null; exception when others then Wf_Core.Context('Wf_Queue', 'create_generic_queue', p_protocol, p_inbound_outbound); raise; end create_generic_queue; /* ** delete a generic queue with the object type of WF_MESSAGE_PAYLOAD_T which ** is basically just a clob */ procedure delete_generic_queue (p_protocol IN VARCHAR2, p_inbound_outbound IN VARCHAR2) IS l_queue_count NUMBER := 0; begin /* wf_message_payload_t is obsolete from 2.6.4 onwards */ null; exception when others then Wf_Core.Context('Wf_Queue', 'delele_generic_queue', p_protocol, p_inbound_outbound); raise; end delete_generic_queue; /* ** Procedure: get_hash_queue_name ** ** Description: Load all queue definitions into memory. The use a hashing algorithm ** to return a queue name */ procedure get_hash_queue_name (p_protocol in varchar2, p_inbound_outbound in varchar2, p_queue_name out NOCOPY varchar2) IS qii number := 1; ii number := 1; l_index number := 0; l_queue_name varchar2(30) := null; cursor get_queues is select protocol, inbound_outbound, queue_count from wf_queues order by protocol, inbound_outbound; begin /* ** Check to see if queues loaded into memory. If they are not ** already loaded */ if (wf_queue.queue_names_index.count < 1) then -- Show all nodes for wf_queues_list in get_queues loop wf_queue.queue_names_index(ii).protocol := wf_queues_list.protocol; wf_queue.queue_names_index(ii).inbound_outbound := wf_queues_list.inbound_outbound; wf_queue.queue_names_index(ii).queue_count := wf_queues_list.queue_count; ii := ii + 1; end loop; end if; -- Go find the locator in the queue list that matches the request for ii in 1..wf_queue.queue_names_index.count loop if (wf_queue.queue_names_index(ii).protocol = p_protocol AND wf_queue.queue_names_index(ii).inbound_outbound = p_inbound_outbound) THEN -- If there is more than 1 queue then choose the queue based on a random -- number generator if (wf_queue.queue_names_index(ii).queue_count > 1) then l_index := mod(to_number(wf_core.random), wf_queue.queue_names_index(ii).queue_count) + 1; else l_index := 1; end if; end if; end loop; if (l_index > 0) then p_queue_name := wf_core.translate('WF_SCHEMA')||'.'||'WF_'||p_protocol||'_'|| SUBSTR(p_inbound_outbound, 1, 1)|| '_'||to_char(l_index)||'_QUEUE'; end if; exception when others then Wf_Core.Context('Wf_Queue', 'get_hash_generic_queue', p_protocol, p_inbound_outbound); raise; end get_hash_queue_name; -- -- Function: enable_exception_queue -- -- Enable the exception queue for the queue table for dequing -- Returns the name of the exception queue for the given queue name -- function enable_Exception_Queue(p_queue_name in varchar2) return varchar2 is l_schema_name varchar(320); l_queue_name varchar2(30); l_pos integer := 0; l_queue_table varchar2(30); l_dequeue_enabled varchar2(7) := ''; l_exception_queue varchar2(100) := ''; begin -- Check to see if the name has a schema. Rove it for the check. l_pos := instrb(p_queue_name,'.'); if l_pos > 0 then l_schema_name := substrb(p_queue_name, 1, l_pos-1); l_queue_name := substrb(p_queue_name, l_pos+1); else l_schema_name := wf_core.translate('WF_SCHEMA'); l_queue_name := p_queue_name; end if; begin select queue_table, dequeue_enabled into l_queue_table, l_dequeue_enabled from all_queues where owner = l_schema_name and name = l_queue_name; l_exception_queue := l_schema_name||'.'||'AQ$_'|| l_queue_table||'_E'; exception when no_data_found then l_exception_queue := ''; l_dequeue_enabled := ''; when others then raise; end; if l_exception_queue <> '' and l_dequeue_enabled = 'NO' then dbms_aqadm.start_queue(queue_name => l_exception_queue, enqueue => FALSE, dequeue => TRUE); end if; return l_exception_queue; exception when others then WF_CORE.Context('WF_QUEUE','Enable_Exception_Queue',p_queue_name); raise; end enable_Exception_Queue; -- ==================================================================== -- Add Subscriber to Queue (PUBLIC) -- ==================================================================== procedure AddSubscriber(queuename in varchar2, name in varchar2) as lagent sys.aq$_agent; begin lagent := sys.aq$_agent(name,'',0); DBMS_AQADM.Add_Subscriber( queue_name=>queuename, subscriber=>lagent, rule=>'CORRID like '''||name||'%''' ); exception when OTHERS then Wf_Core.Context('WF_QUEUE','AddSubscriber',queuename, name); raise; end AddSubscriber; -- Bug 2307428 -- ==================================================================== -- Enable Inbound and defrerred queues for Background Engine. -- ==================================================================== procedure EnableBackgroundQueues as schema varchar2(320); queue_name varchar2(80); l_qname varchar2(80); CURSOR q_disabled (schema varchar2, queue_name varchar2) is SELECT name FROM all_queues WHERE name = queue_name AND owner = schema AND ((trim(enqueue_enabled) = 'NO') OR (trim(dequeue_enabled) = 'NO')); begin --If the queue names haven't been set,initialise them if (wf_queue.name_init = FALSE) then wf_queue.set_queue_names; end if; --Obtain the schema schema := wf_core.translate('WF_SCHEMA'); --Enable deferred queue queue_name := substr(wf_queue.deferred_queue_name,length(schema)+2); OPEN q_disabled (schema, queue_name); LOOP FETCH q_disabled into l_qname; EXIT WHEN q_disabled%NOTFOUND; DBMS_AQADM.START_QUEUE(wf_queue.deferred_queue_name); END LOOP; CLOSE q_disabled; --Enable inbound queue queue_name := substr(wf_queue.inbound_queue_name,length(schema)+2); OPEN q_disabled (schema, queue_name); LOOP FETCH q_disabled into l_qname; EXIT WHEN q_disabled%NOTFOUND; DBMS_AQADM.START_QUEUE(wf_queue.inbound_queue_name); END LOOP; CLOSE q_disabled; exception when others then Wf_core.Context('WF_QUEUE','EnableBackgroundQueues'); raise; end EnableBackgroundQueues; -- ==================================================================== -- get Count Message States (PUBLIC) -- ==================================================================== procedure getCntMsgSt (p_agent IN VARCHAR2 DEFAULT '%', p_ready OUT NOCOPY NUMBER, p_wait OUT NOCOPY NUMBER, p_processed OUT NOCOPY NUMBER, p_expired OUT NOCOPY NUMBER, p_undeliverable OUT NOCOPY NUMBER, p_error OUT NOCOPY NUMBER) is TYPE cntmsgst_t IS REF CURSOR; l_cntmsgst cntmsgst_t; l_sqlstmt varchar2(4000); l_count number := 0; l_msgstate varchar2(50); l_pos number := 0; l_qt varchar2(100); l_owner varchar2(100); -- <rraheja:2786474> Gather schema and queue name once rather than in every call for perf. l_schema varchar2(100); l_qname varchar2(100); -- <rraheja:2786474> Changed upper(name) to name as queue_name should be recorded in upper case. cursor c_localagents(p_agent varchar2) is select queue_name from wf_agents where system_guid = wf_core.translate('WF_SYSTEM_GUID') and name like upper(p_agent); /* cursor c_qt is select owner from all_queue_tables where queue_table = l_qt; */ -- <rraheja:2786474> Changed non-cursor single row query to cursor based for improved perf. cursor c_qtable is select queue_table from all_queues where owner = l_schema and name = l_qname; --and queue_type = 'NORMAL_QUEUE'; TABLE_NOTFOUND exception; pragma EXCEPTION_INIT(TABLE_NOTFOUND,-942); INVALID_TABLE exception; pragma EXCEPTION_INIT(INVALID_TABLE,-903); begin -- Initialize Out Parameters p_ready := 0; p_wait := 0; p_processed := 0; p_expired := 0; p_undeliverable := 0; p_error := 0; for i in c_localagents(p_agent) loop -- Get the Queue Table plus owner l_pos := nvl(instr(i.queue_name,'.',1,1),0); -- <rraheja:2786474> Changed non-cursor single row query to cursor and used vars for freq used data l_schema := substr(i.queue_name,1,l_pos-1); l_qname := substr(i.queue_name,l_pos+1); open c_qtable; fetch c_qtable into l_qt; close c_qtable; -- Get the Owner of the Queue Table -- <rraheja:2786474> queue owner should be = queue table owner, so commenting out the code /* open c_qt; fetch c_qt into l_owner; exit when c_qt%notfound; close c_qt; */ l_owner := l_schema; -- l_owner and l_qt are selected/derived from our own cursor -- BINDVAR_SCAN_IGNORE[2] l_sqlstmt := 'select msg_state, count(*) from '||l_owner||'.'||'aq$'||l_qt ||' where (queue = :q or queue = :r) group by msg_state'; begin --If the queue tables are not found then the --select should throw ORA 942. --Put the begin catch block of exception at the end --so that u don't have to use goto's to get out of loop open l_cntmsgst for l_sqlstmt using l_qname,'AQ$_'|| l_qname ||'_E'; loop fetch l_cntmsgst into l_msgstate, l_count; if l_msgstate = 'READY'then --Bug 2382594 --If the agent is WF_ERROR do not count p_error. if l_qname = 'WF_ERROR' and p_agent = '%' then p_error := p_error + l_count; else p_ready := p_ready + l_count; end if; elsif l_msgstate = 'WAIT' then p_wait := p_wait + l_count; elsif l_msgstate = 'PROCESSED' then p_processed := p_processed + l_count; elsif l_msgstate = 'EXPIRED' then p_expired := p_expired + l_count; elsif l_msgstate = 'UNDELIVERABLE' then p_undeliverable := p_undeliverable + l_count; end if; l_count := 0; exit when l_cntmsgst%notfound; end loop; close l_cntmsgst; exception when table_notfound then --return 0 count instead of throwing error to UI --all the returns are at their initialized value of 0 --just ensure that the cursor is closed if (l_cntmsgst%ISOPEN) then close l_cntmsgst; end if; when invalid_table then --return 0 count instead of throwing error to UI --all the returns are at their initialized value of 0 --just ensure that the cursor is closed if (l_cntmsgst%ISOPEN) then close l_cntmsgst; end if; end; end loop; -- end loop for c_localagents exception when OTHERS then if (l_cntmsgst%ISOPEN) then close l_cntmsgst; end if; Wf_Core.Context('WF_QUEUE','getCntMsgSt',p_agent); raise; end getCntMsgSt; -- -- move_msgs_excep2normal (CONCURRENT PROGRAM API) -- API to move messages from the exception queue to the normal queue -- of the given agent. Handles wf_event_t and JMS_TEXT_MESSAGE payloads. -- -- OUT -- errbuf - CP error message -- retcode - CP return code (0 = success, 1 = warning, 2 = error) -- IN -- p_agent_name - Agent name -- procedure move_msgs_excep2normal(errbuf out nocopy varchar2, retcode out nocopy varchar2, p_agent_name in varchar2) as l_queue_name varchar2(100); l_queue_handler varchar2(100); l_schema varchar2(100); l_qname varchar2(100); l_excp_qname varchar2(100); l_object_type varchar2(100); l_obj_type varchar2(100); l_pos number := 0; l_timeout integer; l_dequeue_options dbms_aq.dequeue_options_t; l_enqueue_options dbms_aq.enqueue_options_t; l_message_properties dbms_aq.message_properties_t; l_payload_evt wf_event_t; l_payload_jms sys.aq$_JMS_TEXT_MESSAGE; l_msg_id raw(16); invalid_agent exception; invalid_type exception; pragma EXCEPTION_INIT(invalid_agent, -20201); pragma EXCEPTION_INIT(invalid_type, -20202); begin begin SELECT TRIM(queue_name), TRIM(queue_handler) INTO l_queue_name, l_queue_handler FROM wf_agents WHERE name = upper(p_agent_name) AND SYSTEM_GUID = wf_event.local_system_guid; exception when no_data_found then raise_application_error(-20201, 'Agent not found'); when others then raise; end; l_pos := instr(l_queue_name, '.', 1, 1); l_schema := substr(l_queue_name, 1, l_pos-1); l_qname := substr(l_queue_name, l_pos+1); l_excp_qname := 'AQ$_' || l_qname || '_E'; SELECT TRIM(object_type) INTO l_object_type FROM all_queue_tables WHERE queue_table in ( SELECT queue_table FROM all_queues WHERE name = l_qname AND owner = l_schema ) AND owner=l_schema; l_pos := instr(l_object_type, '.', 1, 1); l_obj_type := substr(l_object_type, l_pos+1); l_timeout := 0; l_dequeue_options.dequeue_mode := dbms_aq.REMOVE; l_dequeue_options.wait := dbms_aq.NO_WAIT; l_dequeue_options.consumer_name := null; l_enqueue_options.visibility := dbms_aq.ON_COMMIT; if l_obj_type = 'WF_EVENT_T' then wf_event_t.Initialize(l_payload_evt); while (l_timeout = 0) loop begin --Dequeue the message from the exception queue dbms_aq.Dequeue(queue_name => l_schema || '.' || l_excp_qname, dequeue_options => l_dequeue_options, message_properties => l_message_properties, payload => l_payload_evt, msgid => l_msg_id); l_timeout := 0; --Enqueue the message in the normal queue l_message_properties.expiration := dbms_aq.never; if (upper(p_agent_name) = 'WF_ERROR' OR upper(p_agent_name) = 'WF_IN' OR upper(p_agent_name) = 'WF_OUT') then l_message_properties.recipient_list(1) := sys.aq$_agent(upper(p_agent_name), null, 0); end if; dbms_aq.enqueue(queue_name => l_queue_name, enqueue_options => l_enqueue_options, message_properties => l_message_properties, payload => l_payload_evt, msgid => l_msg_id); commit; exception when dequeue_timeout then l_timeout := 1; when others then raise; end; end loop; --End of while loop that handles wf_event_t payload elsif l_obj_type = 'AQ$_JMS_TEXT_MESSAGE' then while (l_timeout = 0) loop begin --Dequeue the message from the exception queue dbms_aq.Dequeue(queue_name => l_schema || '.' || l_excp_qname, dequeue_options => l_dequeue_options, message_properties => l_message_properties, payload => l_payload_jms, msgid => l_msg_id); l_timeout := 0; --Enqueue the message in the normal queue of the given agent l_message_properties.expiration := dbms_aq.never; dbms_aq.enqueue(queue_name => l_queue_name, enqueue_options => l_enqueue_options, message_properties => l_message_properties, payload => l_payload_jms, msgid => l_msg_id); commit; exception when dequeue_timeout then l_timeout := 1; when others then raise; end; end loop; --End of while loop that handles AQ$_JMS_TEXT_MESSAGE payload else -- Payload not supported by this API, raise application error raise_application_error(-20202, 'Invalid payload type'); end if; errbuf := ''; retcode := '0'; exception when invalid_agent then errbuf := 'The agent ' || p_agent_name || ' is not found '; retcode := '2'; when invalid_type then errbuf := 'This API does not support payload of type ' || l_obj_type || ' for agent ' || p_agent_name; retcode := '2'; when others then errbuf := sqlerrm; retcode := '2'; end move_msgs_excep2normal; -- -- Overloaded Procedure 1 : Definition without the AGE parameter -- -- clean_evt -- Procedure to purge the messages in the READY state of a Queue -- of WF_EVENT_T payload type. Supports correlation id based purge. -- -- IN -- p_agent_name - Agent Name -- p_correlation - Correlation ID (Default Value : NULL) -- p_commit_frequency - Commit Level (Default Value : 500) -- -- OUT -- p_msg_count - Count of the number of purged messages -- procedure clean_evt(p_agent_name in varchar2, p_correlation in varchar2 default NULL, p_commit_frequency in number default 500, p_msg_count out nocopy number) as l_xcount integer; l_timeout integer; l_queue_name varchar2(80); l_account_name varchar2(30); l_payload wf_event_t; l_msgid raw(16); l_message_handle raw(16) := NULL; l_dequeue_options dbms_aq.dequeue_options_t; l_message_properties dbms_aq.message_properties_t; --Define the snapshot too old error snap_too_old exception; pragma exception_init(snap_too_old, -1555); begin p_msg_count := 0; l_timeout := 0; l_xcount := 0; SELECT queue_name INTO l_queue_name FROM wf_agents WHERE name = upper(p_agent_name) AND SYSTEM_GUID = wf_event.local_system_guid; --No processing is done on the payload data --So dequeue is done in the REMOVE_NODATA mode l_dequeue_options.navigation := dbms_aq.FIRST_MESSAGE; l_dequeue_options.dequeue_mode := dbms_aq.REMOVE_NODATA; l_dequeue_options.wait := dbms_aq.NO_WAIT; l_dequeue_options.consumer_name := upper(p_agent_name); --Set the Correlation ID for dequeue only if available --If the given agent is a Workflow Agent then append the --Account Name before the Correlation ID if ((p_correlation is not null) or (p_correlation <> '')) then -- Seeded WF agents if (upper(p_agent_name) like 'WF_%') then if (wf_event.account_name is null) then wf_event.SetAccountName; end if; l_dequeue_options.correlation := wf_event.account_name || ':' || p_correlation; else l_dequeue_options.correlation := p_correlation; end if; end if; -- All the messages with the given correlation id are to be purged -- In this case, the $fnd/sql/wfevqcln.sql script logic is followed -- The dequeue is based on the given correlation id while (l_timeout = 0) loop begin dbms_aq.Dequeue(queue_name => l_queue_name, dequeue_options => l_dequeue_options, message_properties => l_message_properties, /* OUT */ payload => l_payload, /* OUT */ msgid => l_message_handle); /* OUT */ l_xcount := l_xcount + 1; l_timeout := 0; exception when dequeue_disabled then --Incase dequeue has been disabled on the queue --Enable the same and re-try the operation. dbms_aqadm.start_queue( queue_name =>l_queue_name, enqueue =>FALSE, dequeue =>TRUE); dbms_aq.Dequeue(queue_name => l_queue_name, dequeue_options => l_dequeue_options, message_properties => l_message_properties, /* OUT */ payload => l_payload, /* OUT */ msgid => l_message_handle); /* OUT */ l_xcount := l_xcount + 1; l_timeout := 0; when dequeue_timeout then l_timeout := 1; --Capture the snapshot too old error when snap_too_old then --Workaround for AQ when receiving ORA-01555 using NEXT_MESSAGE as --navigation. We will try to set to FIRST_MESSAGE and dequeue to --silently handle this exception. if (l_dequeue_options.navigation = dbms_aq.FIRST_MESSAGE) then raise; else l_dequeue_options.navigation := dbms_aq.FIRST_MESSAGE; dbms_aq.Dequeue(queue_name => l_queue_name, dequeue_options => l_dequeue_options, message_properties => l_message_properties, /* OUT */ payload => l_payload, /* OUT */ msgid => l_message_handle); /* OUT */ l_xcount := l_xcount + 1; l_timeout := 0; end if; when others then raise; end; l_dequeue_options.navigation := dbms_aq.NEXT_MESSAGE; --Commit if commit frequency if l_xcount >= p_commit_frequency then commit; p_msg_count := p_msg_count + l_xcount; l_xcount := 0; end if; end loop; commit; p_msg_count := p_msg_count + l_xcount; exception when others then Wf_core.Context('WF_QUEUE', 'Clean_evt', p_agent_name, p_correlation, to_char(p_commit_frequency)); raise; end clean_evt; -- -- Overloaded Procedure 2 : Definition with the AGE parameter -- -- clean_evt -- Procedure to purge the messages in the READY state of a Queue -- of WF_EVENT_T payload type. Supports time-based selective -- purge with correlation id. -- -- IN -- p_agent_name - Agent Name -- p_correlation - Correlation ID (Default Value : NULL) -- p_commit_frequency - Commit Level (Default Value : 500) -- p_age - Age of the Messages (No default value -- as this is a overloaded procedure) -- -- OUT -- p_msg_count - Count of the number of purged messages -- procedure clean_evt(p_agent_name in varchar2, p_correlation in varchar2 default NULL, p_commit_frequency in number default 500, p_msg_count out nocopy number, p_age in number) as l_xcount integer; l_pos integer; l_schema varchar2(80); l_qname varchar2(80); l_corrid varchar2(128); l_queue_name varchar2(80); l_account_name varchar2(30); l_payload wf_event_t; l_msgid raw(16); l_message_handle raw(16) := NULL; l_dequeue_options dbms_aq.dequeue_options_t; l_message_properties dbms_aq.message_properties_t; -- Cursor to get all messages from the queue that were enqueued before -- a given date. TYPE c_msgs_typ IS REF CURSOR; c_msgs c_msgs_typ; begin p_msg_count := 0; l_xcount := 0; SELECT queue_name INTO l_queue_name FROM wf_agents WHERE name = upper(p_agent_name) AND SYSTEM_GUID = wf_event.local_system_guid; l_pos := instr(l_queue_name, '.', 1, 1); l_schema := substr(l_queue_name, 1, l_pos-1); l_qname := substr(l_queue_name, l_pos+1); -- Query from the AQ view table l_qname := l_schema || '.AQ$' || l_qname; --No processing is done on the payload data --So dequeue is done in the REMOVE_NODATA mode l_dequeue_options.navigation := dbms_aq.FIRST_MESSAGE; l_dequeue_options.dequeue_mode := dbms_aq.REMOVE_NODATA; l_dequeue_options.wait := dbms_aq.NO_WAIT; l_dequeue_options.consumer_name := upper(p_agent_name); -- --Set the Correlation ID for dequeue only if available --If the given agent is a Workflow Agent then append the --Account Name before the Correlation ID -- -- All the message ids older than the specified age are queried -- and the dequeue is done on the retrieved message ids -- if ((p_correlation is not null) or (p_correlation <> '')) then -- Seeded WF agents if (upper(p_agent_name) like 'WF_%') then if (wf_event.account_name is null) then wf_event.SetAccountName; end if; l_corrid := wf_event.account_name || ':' || p_correlation; else l_corrid := p_correlation; end if; -- The dequeue should be based on the msg ids retrieved in -- the following query, not on any correlation id. -- So the l_dequeue_options.correlation is not set. OPEN c_msgs FOR 'SELECT msg_id FROM ' || l_qname || ' WHERE msg_state = ''' || 'READY'' ' || ' AND enq_time < (sysdate - ' || p_age || ') ' || ' AND corr_id like ''' || l_corrid || ''' '; else -- If the given correlation is null then the query do not -- need it, as we consider a null correlation to be % -- The dequeue_options.correlation will be null by default OPEN c_msgs FOR 'SELECT msg_id FROM ' || l_qname || ' WHERE msg_state = ''' || 'READY'' ' || ' AND enq_time < (sysdate - ' || p_age || ') '; end if; -- Dequeue messages based on the msg id loop fetch c_msgs into l_msgid; exit when c_msgs%notfound; l_dequeue_options.msgid := l_msgid; begin dbms_aq.Dequeue(queue_name => l_queue_name, dequeue_options => l_dequeue_options, message_properties => l_message_properties, payload => l_payload, msgid => l_message_handle); l_xcount := l_xcount + 1; exception when dequeue_disabled then -- Incase dequeue has been disabled on the queue enable -- the same and re-try the operation. dbms_aqadm.start_queue( queue_name => l_queue_name, enqueue => FALSE, dequeue => TRUE); dbms_aq.Dequeue(queue_name => l_queue_name, dequeue_options => l_dequeue_options, message_properties => l_message_properties, payload => l_payload, msgid => l_message_handle); l_xcount := l_xcount + 1; when others then raise; end; -- Commit if commit frequency if l_xcount >= p_commit_frequency then commit; p_msg_count := p_msg_count + l_xcount; l_xcount := 0; end if; end loop; commit; p_msg_count := p_msg_count + l_xcount; exception when others then Wf_core.Context('WF_QUEUE', 'Clean_evt', p_agent_name, p_correlation, to_char(p_commit_frequency), to_char(p_age)); raise; end clean_evt; end WF_QUEUE; / --show error package body WF_QUEUE; commit; exit;
Ms-Dos/Windows
Unix
Write backup
jsp File Browser version 1.2 by
www.vonloesch.de