Edit D:\app\Administrator\product\11.2.0\dbhome_1\owb\wf\sql\wfjms92b.pls
REM dbdrv: none --=========================================================================-- -- Copyright (c)2002 Oracle Corporation, Redwood Shores, California, USA -- -- All rights reserved. -- --=========================================================================-- -- FILENAME -- WFJMS92B.pls -- -- DESCRIPTION -- Business Event System OJMS Queue Handler. -- -- DEPENDENCIES -- Requires RDBMS 9.2. -- -- HISTORY -- DD-MON-RR userid comment -- 28-JAN-02 gashford Initial version. -- 26-APR-02 gashford Added support for deserializing messages containing -- all properties of all types (boolean, byte, short, -- int, long, float, double, string). --=========================================================================-- set verify off whenever sqlerror exit failure rollback; whenever oserror exit failure rollback; create or replace package body wf_event_ojmstext_qh as /* $Header: wfjms92b.pls 26.5 2003/12/26 20:10:10 yohuang ship $ */ DATE_MASK constant varchar2(21) := 'YYYY/MM/DD HH24:MI:SS'; PRIORITY constant varchar2(8) := 'PRIORITY'; SEND_DATE constant varchar2(9) := 'SEND_DATE'; RECEIVE_DATE constant varchar2(12) := 'RECEIVE_DATE'; CORRELATION_ID constant varchar2(14) := 'CORRELATION_ID'; EVENT_NAME constant varchar2(10) := 'EVENT_NAME'; EVENT_KEY constant varchar2(9) := 'EVENT_KEY'; FROM_AGENT constant varchar2(10) := 'FROM_AGENT'; TO_AGENT constant varchar2(8) := 'TO_AGENT'; ERROR_SUBSCRIPTION constant varchar2(18) := 'ERROR_SUBSCRIPTION'; ERROR_MESSAGE constant varchar2(13) := 'ERROR_MESSAGE'; ERROR_STACK constant varchar2(11) := 'ERROR_STACK'; navigation binary_integer := dbms_aq.next_message; -------------------------------------------------------------------------------- -- Tranforms a business event into a JMS Text Message. -- -- p_event - the business event to transform -- p_jms_text_message - the JMS Text Message -------------------------------------------------------------------------------- procedure serialize(p_event in wf_event_t, p_jms_text_message out nocopy sys.aq$_jms_text_message) is l_replyto varchar2(2000); i1 integer; i2 integer; l_agent_name varchar2(30); l_address varchar2(1024); l_protocol number; l_aq_agent sys.aq$_agent; l_wf_agent wf_agent_t; l_parameter_list wf_parameter_list_t; begin p_jms_text_message := sys.aq$_jms_text_message.construct(); -- set the JMS properties p_jms_text_message.set_type(p_event.getValueForParameter(JMS_TYPE)); p_jms_text_message.set_userid(p_event.getValueForParameter(JMS_USERID)); p_jms_text_message.set_appid(p_event.getValueForParameter(JMS_APPID)); p_jms_text_message.set_groupid(p_event.getValueForParameter(JMS_GROUPID)); p_jms_text_message.set_groupseq(p_event.getValueForParameter(JMS_GROUPSEQ)); -- parse the replyto attribute which must be in the form -- "name:address:protocol" l_replyto := p_event.getValueforParameter(JMS_REPLYTO); if(l_replyto is not null) then i1 := instr(l_replyto, ':'); i2 := instr(l_replyto, ':', 1, 2); l_agent_name := substr(l_replyto, 1, i1 - 1); l_address := substr(l_replyto, i1 + 1, i2 - i1 - 1); l_protocol := substr(l_replyto, i2 + 1); l_aq_agent := sys.aq$_agent(l_agent_name, l_address, l_protocol); p_jms_text_message.set_replyto(l_aq_agent); end if; -- set the wf_event properties p_jms_text_message.set_int_property(PRIORITY, p_event.getPriority()); p_jms_text_message.set_string_property(SEND_DATE, to_char(p_event.getSendDate(), DATE_MASK)); p_jms_text_message.set_string_property(RECEIVE_DATE, to_char(p_event.getReceiveDate(), DATE_MASK)); p_jms_text_message.set_string_property(CORRELATION_ID, p_event.getCorrelationId()); p_jms_text_message.set_string_property(EVENT_NAME, p_event.getEventName()); p_jms_text_message.set_string_property(EVENT_KEY, p_event.getEventKey()); l_wf_agent := p_event.getFromAgent(); if(l_wf_agent is not null) then p_jms_text_message.set_string_property(FROM_AGENT, l_wf_agent.getName() || '@' || l_wf_agent.getSystem()); end if; l_wf_agent := p_event.getToAgent(); if(l_wf_agent is not null) then p_jms_text_message.set_string_property(TO_AGENT, l_wf_agent.getName() || '@' || l_wf_agent.getSystem()); end if; p_jms_text_message.set_string_property(ERROR_SUBSCRIPTION, p_event.getErrorSubscription()); p_jms_text_message.set_string_property(ERROR_MESSAGE, p_event.getErrorMessage()); p_jms_text_message.set_string_property(ERROR_STACK, p_event.getErrorStack()); -- set the wf_event_t user-defined properties l_parameter_list := p_event.getParameterList(); if(l_parameter_list is not null) then for i in l_parameter_list.first .. l_parameter_list.last loop p_jms_text_message.set_string_property(l_parameter_list(i).getName(), l_parameter_list(i).getValue()); end loop; end if; -- set the text payload p_jms_text_message.set_text(p_event.getEventData()); exception when others then wf_core.context('WF_EVENT_OJMSTEXT_QH', 'serialize', 'SQL error is ' || substr(sqlerrm, 1, 200)); raise; end serialize; -------------------------------------------------------------------------------- -- Tranforms a JMS Text Message into a business event. -- -- p_jms_text_message - the JMS Text Message -- p_event - the business event -------------------------------------------------------------------------------- procedure deserialize(p_jms_text_message in out nocopy sys.aq$_jms_text_message, p_event out nocopy wf_event_t) is i1 integer; l_jms_agent varchar2(2000); l_agent_name varchar2(30); l_system_name varchar2(30); l_from_agent wf_agent_t; l_to_agent wf_agent_t; l_jms_user_properties sys.aq$_jms_userproparray; l_jms_property_name varchar2(100); l_jms_property_value varchar2(2000); l_boolean_value boolean; l_clob clob; begin p_event := wf_event_t(0, null, null, null, null, null, null, null, null, null, null, null, null); if(p_jms_text_message.header.properties.count > 0) then -- set the wf_event properties p_event.setPriority(p_jms_text_message.get_int_property(PRIORITY)); p_event.setSendDate(to_date(p_jms_text_message.get_string_property(SEND_DATE), DATE_MASK)); p_event.setReceiveDate(to_date(p_jms_text_message.get_string_property(RECEIVE_DATE), DATE_MASK)); p_event.setCorrelationId(p_jms_text_message.get_string_property(CORRELATION_ID)); p_event.setEventName(p_jms_text_message.get_string_property(EVENT_NAME)); p_event.setEventKey(p_jms_text_message.get_string_property(EVENT_KEY)); -- parse the from agent which must be in the form "name@system" l_jms_agent := p_jms_text_message.get_string_property(FROM_AGENT); if(l_jms_agent is not null) then i1 := instr(l_jms_agent, '@'); l_agent_name := substr(l_jms_agent, 1, i1 - 1); l_system_name := substr(l_jms_agent, i1 + 1); l_from_agent := wf_agent_t(l_agent_name, l_system_name); p_event.setFromAgent(l_from_agent); end if; -- parse the to agent which must be in the form "name@system" l_jms_agent := p_jms_text_message.get_string_property(TO_AGENT); if(l_jms_agent is not null) then i1 := instr(l_jms_agent, '@'); l_agent_name := substr(l_jms_agent, 1, i1 - 1); l_system_name := substr(l_jms_agent, i1 + 1); l_to_agent := wf_agent_t(l_agent_name, l_system_name); p_event.setToAgent(l_to_agent); end if; p_event.setErrorSubscription(p_jms_text_message.get_string_property(ERROR_SUBSCRIPTION)); p_event.setErrorMessage(p_jms_text_message.get_string_property(ERROR_MESSAGE)); p_event.setErrorStack(p_jms_text_message.get_string_property(ERROR_STACK)); -- set the wf_event user-defined properties l_jms_user_properties := p_jms_text_message.header.properties; if(l_jms_user_properties.count > 0) then for i in l_jms_user_properties.first .. l_jms_user_properties.last loop l_jms_property_name := l_jms_user_properties(i).name; if(l_jms_property_name not in (PRIORITY, SEND_DATE, RECEIVE_DATE, CORRELATION_ID, EVENT_NAME, EVENT_KEY, FROM_AGENT, TO_AGENT, ERROR_SUBSCRIPTION, ERROR_MESSAGE, ERROR_STACK)) then -- since we don't know the property type, try retrieving the value -- as each possible type until we find it (get a non-null value) -- get the property as a string l_jms_property_value := p_jms_text_message.get_string_property(l_jms_property_name); if(l_jms_property_value is not null) then goto found; end if; -- get the property as an int l_jms_property_value := p_jms_text_message.get_int_property(l_jms_property_name); if(l_jms_property_value is not null) then goto found; end if; -- get the property as a boolean l_boolean_value := p_jms_text_message.get_boolean_property(l_jms_property_name); if(l_boolean_value is not null) then if(l_boolean_value) then l_jms_property_value := 'true'; else l_jms_property_value := 'false'; end if; goto found; end if; -- get the property as a byte l_jms_property_value := p_jms_text_message.get_byte_property(l_jms_property_name); if(l_jms_property_value is not null) then goto found; end if; -- get the property as a short l_jms_property_value := p_jms_text_message.get_short_property(l_jms_property_name); if(l_jms_property_value is not null) then goto found; end if; -- get the property as a long l_jms_property_value := p_jms_text_message.get_long_property(l_jms_property_name); if(l_jms_property_value is not null) then goto found; end if; -- get the property as a float l_jms_property_value := p_jms_text_message.get_float_property(l_jms_property_name); if(l_jms_property_value is not null) then goto found; end if; -- get the property as a double l_jms_property_value := p_jms_text_message.get_double_property(l_jms_property_name); if(l_jms_property_value is not null) then goto found; end if; <<found>> null; -- At this point, if l_jms_property_value is null, that means that -- the property value really is null. In that case, do not add the -- property to the parameter list. if(l_jms_property_value is not null) then p_event.addParameterToList(l_jms_property_name, l_jms_property_value); end if; end if; end loop; end if; end if; -- set the event data p_jms_text_message.get_text(l_clob); p_event.setEventData(l_clob); end deserialize; -------------------------------------------------------------------------------- -- Enqueues a business event into a JMS queue. -- -- p_event - the business event to enqueue -- p_out_agent_override - the out agent override -------------------------------------------------------------------------------- procedure enqueue(p_event in wf_event_t, p_out_agent_override in wf_agent_t) is l_jms_text_message sys.aq$_jms_text_message; l_out_agent_name varchar2(30); l_out_system_name varchar2(30); l_out_queue_name varchar2(80); l_to_agent_name varchar2(30); l_to_system_name varchar2(30); l_to_queue_name varchar2(80); l_to_address varchar2(1024); l_to_protocol varchar2(30); l_to_protocol_number number; l_delay number; l_enqueue_options dbms_aq.enqueue_options_t; l_message_properties dbms_aq.message_properties_t; l_msgid raw(16); --Bug 2676549 --Cursor to select the to_agents for the recipient list CURSOR recipients(agent_name varchar2,system_name varchar2) is select agt2.name ,agt2.address, agt2.protocol, agt2.queue_name from wf_agent_groups agp , wf_agents agt1 , wf_agents agt2 , wf_systems sys where agt1.name = agent_name and agp.group_guid = agt1.guid and agt1.type = 'GROUP' and agt1.status = 'ENABLED' and agt2.guid = agp.member_guid and sys.name = system_name and sys.guid = agt2.system_guid; i number := 1; l_type varchar2(8); begin serialize(p_event, l_jms_text_message); -- determine the out queue if(p_out_agent_override is not null) then l_out_agent_name := p_out_agent_override.getName(); l_out_system_name := p_out_agent_override.getSystem(); else l_out_agent_name := p_event.getFromAgent().getName(); l_out_system_name := p_event.getFromAgent().getSystem(); end if; -- get the out queue name select wfa.queue_name into l_out_queue_name from wf_agents wfa, wf_systems wfs where wfa.name = l_out_agent_name and wfs.name = l_out_system_name and wfs.guid = wfa.system_guid; -- if there is a to queue, we need to set the recipient list address if((p_event.getToAgent() is not null) and (l_out_agent_name <> 'WF_DEFERRED')) then l_to_agent_name := p_event.getToAgent().getName(); l_to_system_name := p_event.getToAgent().getSystem(); if (wf_log_pkg.level_statement >= fnd_log.g_current_runtime_level) then wf_log_pkg.string(wf_log_pkg.level_statement, 'wf.plsql.WF_EVENT_OJMSTEXT_QH.enqueue.', 'Enqueuing on queue: ' || l_out_queue_name); end if; --Bug 2676549 --Supporting agent group. So lets get the type of --agent to determine if we would have to get the member --agent details or not. select wfa.address, wfa.protocol, wfa.queue_name, wfa.type into l_to_address, l_to_protocol, l_to_queue_name, l_type from wf_agents wfa, wf_systems wfs where wfa.name = l_to_agent_name and wfs.name = l_to_system_name and wfs.guid = wfa.system_guid; --If type is 'GROUP' then get member agents if (l_type = 'GROUP') then for to_agent in recipients(l_to_agent_name,l_to_system_name) loop -- MJC: We need to make sure the recipient address is in the correct -- format otherwise dequeue will not work. -- -- Rule 1: Local consumer dequeues from same queue as enqueued -- --> Address must be null -- Rule 2: Propagating to local queue -- --> Address must be <schema>.<queue_name> -- Rule 3: Propagating to local database -- --> Address must be <schema>.<queue_name>@dblink if((l_to_agent_name = l_out_agent_name) and (l_to_system_name = l_out_system_name)) then l_to_address := null; elsif((l_to_agent_name <> l_out_agent_name) and (l_to_system_name = l_out_system_name)) then l_to_address := to_agent.queue_name; else null; -- the default end if; if (wf_log_pkg.level_statement >= fnd_log.g_current_runtime_level) then wf_log_pkg.string(wf_log_pkg.level_statement, 'wf.plsql.WF_EVENT_OJMSTEXT_QH.enqueue.group', 'Recipient is name: ' || to_agent.name || ' address: ' || l_to_address|| ' protocol: ' || to_agent.protocol); end if; -- Here is where we will add additional protocol mappings as AQ -- supports new protocols. This bit will form a hardcoded mapping -- from protocol names as used in the event manager and the -- protocol numbers used by AQ. if((to_agent.protocol is null) or (to_agent.protocol not in ('SQLNET'))) then wf_core.context('WF_EVENT_OJMSTEXT_QH', 'Enqueue', 'Bad Protocol', to_agent.protocol,to_agent.queue_name); end if; l_to_protocol_number := 0; l_message_properties.recipient_list(i) := sys.aq$_agent(to_agent.name, l_to_address, l_to_protocol_number); i := i +1; end loop; else -- MJC: We need to make sure the recipient address is in the correct -- format otherwise dequeue will not work. -- -- Rule 1: Local consumer dequeues from same queue as enqueued -- --> Address must be null -- Rule 2: Propagating to local queue -- --> Address must be <schema>.<queue_name> -- Rule 3: Propagating to local database -- --> Address must be <schema>.<queue_name>@dblink if((l_to_agent_name = l_out_agent_name) and (l_to_system_name = l_out_system_name)) then l_to_address := null; elsif((l_to_agent_name <> l_out_agent_name) and (l_to_system_name = l_out_system_name)) then l_to_address := l_to_queue_name; else null; -- the default end if; if (wf_log_pkg.level_statement >= fnd_log.g_current_runtime_level) then wf_log_pkg.string(wf_log_pkg.level_statement, 'wf.plsql.WF_EVENT_OJMSTEXT_QH.enqueue.no_group', 'Recipient is name: ' || l_to_agent_name || ' address: ' || l_to_address || ' protocol: ' || l_to_protocol); end if; -- Here is where we will add additional protocol mappings as AQ -- supports new protocols. This bit will form a hardcoded mapping -- from protocol names as used in the event manager and the -- protocol numbers used by AQ. if((l_to_protocol is null) or (l_to_protocol not in ('SQLNET'))) then wf_core.context('WF_EVENT_OJMSTEXT_QH', 'Enqueue', 'Bad Protocol', l_to_protocol, l_out_queue_name); end if; l_to_protocol_number := 0; l_message_properties.recipient_list(1) := sys.aq$_agent(l_to_agent_name, l_to_address, l_to_protocol_number); end if; --For AGENT GROUP end if; -- set the priority l_message_properties.priority := nvl(p_event.priority, 100); -- set the delay if required; also used for deferred agent if(p_event.getSendDate() > sysdate) then if (wf_log_pkg.level_statement >= fnd_log.g_current_runtime_level) then wf_log_pkg.string(wf_log_pkg.level_statement, 'wf.plsql.WF_EVENT_OJMSTEXT_QH.enqueue.delay', 'Delay Detected'); end if; l_delay := (p_event.getSendDate() - sysdate)*24*60*60; if (wf_log_pkg.level_statement >= fnd_log.g_current_runtime_level) then wf_log_pkg.string(wf_log_pkg.level_statement, 'wf.plsql.WF_EVENT_OJMSTEXT_QH.enqueue', 'delay = ' || to_char(l_delay)); end if; if(l_delay > 1) then -- message_properties.delay is BINARY_INTEGER so check if delay is -- too big, and set the max delay to be 2**31 - 1 if(l_delay >= power(2, 31)) then l_message_properties.delay := power(2, 31) - 1; else l_message_properties.delay := l_delay; end if; end if; end if; -- if we are enqueuing for the deferred agent, must set the account name -- into the correlation id if((l_out_agent_name = 'WF_DEFERRED') or (l_to_agent_name = 'WF_DEFERRED')) then if(wf_event.account_name is null) then wf_event.setAccountName(); end if; l_message_properties.correlation := wf_event.account_name; end if; if (wf_log_pkg.level_statement >= fnd_log.g_current_runtime_level) then wf_log_pkg.string(wf_log_pkg.level_statement, 'wf.plsql.WF_EVENT_OJMSTEXT_QH.enqueue.dbms_aq', 'calling dbms_aq.enqueue'); end if; dbms_aq.enqueue(queue_name => l_out_queue_name, enqueue_options => l_enqueue_options, message_properties => l_message_properties, payload => l_jms_text_message, msgid => l_msgid); if (wf_log_pkg.level_procedure >= fnd_log.g_current_runtime_level) then wf_log_pkg.string(wf_log_pkg.level_procedure, 'wf.plsql.WF_EVENT_OJMSTEXT_QH.enqueue.End', 'Finished calling dbms_aq.enqueue'); end if; exception when others then wf_core.context('WF_EVENT_OJMSTEXT_QH', 'enqueue', l_out_queue_name, 'SQL error is ' || substr(sqlerrm, 1, 200)); raise; end enqueue; -------------------------------------------------------------------------------- -- Dequeues a business event from a JMS queue. -- -- p_agent_guid - the agent GUID -- p_event - the business event -- p_wait - the number of seconds to wait to dequeue the event -------------------------------------------------------------------------------- procedure dequeue(p_agent_guid in raw, p_event out nocopy wf_event_t, p_wait in binary_integer) is l_queue_name varchar2(80); l_agent_name varchar2(30); l_dequeue_options dbms_aq.dequeue_options_t; l_message_properties dbms_aq.message_properties_t; l_jms_text_message sys.aq$_jms_text_message; l_msgid raw(16); no_messages exception; pragma exception_init(no_messages, -25228); begin -- get the agent name select upper(queue_name), upper(name) into l_queue_name, l_agent_name from wf_agents where guid = p_agent_guid; if (wf_log_pkg.level_procedure >= fnd_log.g_current_runtime_level) then wf_log_pkg.string(wf_log_pkg.level_procedure, 'wf.plsql.WF_EVENT_OJMSTEXT_QH.dequeue.Begin', 'Dequeuing '||l_queue_name||' on '||l_agent_name); end if; if(l_agent_name = 'WF_DEFERRED') then if(wf_event.account_name is null) then wf_event.setAccountName(); end if; if (wf_log_pkg.level_statement >= fnd_log.g_current_runtime_level) then wf_log_pkg.string(wf_log_pkg.level_statement, 'wf.plsql.WF_EVENT_OJMSTEXT_QH.dequeue.corr', 'Setting correlation: ' || wf_event.account_name); end if; l_dequeue_options.correlation := wf_event.account_name; end if; -- set the dequeue options l_dequeue_options.consumer_name := l_agent_name; l_dequeue_options.wait := p_wait; l_dequeue_options.navigation := navigation; -- l_dequeue_options.navigation := dbms_aq.FIRST_MESSAGE; begin dbms_aq.dequeue(queue_name => l_queue_name, dequeue_options => l_dequeue_options, message_properties => l_message_properties, -- out payload => l_jms_text_message, -- out msgid => l_msgid); -- out navigation := dbms_aq.next_message; exception when no_messages then if (wf_log_pkg.level_event >= fnd_log.g_current_runtime_level) then wf_log_pkg.string(wf_log_pkg.level_event, 'wf.plsql.WF_EVENT_OJMSTEXT_QH.dequeue.queue_empty', 'No more messages in dequeue.'); end if; navigation := dbms_aq.first_message; p_event := null; return; end; deserialize(l_jms_text_message, p_event); -- set the receive date p_event.setReceiveDate(sysdate); if (wf_log_pkg.level_procedure >= fnd_log.g_current_runtime_level) then wf_log_pkg.string(wf_log_pkg.level_procedure, 'wf.plsql.WF_EVENT_OJMSTEXT_QH.dequeue.End', 'Finished'); end if; exception when others then wf_core.context('WF_EVENT_OJMSTEXT_QH', 'Dequeue', l_queue_name, 'SQL error is ' || substr(sqlerrm, 1, 200)); raise; end dequeue; end wf_event_ojmstext_qh; / -- show errors package body wf_event_ojmstext_qh; commit; exit;
Ms-Dos/Windows
Unix
Write backup
jsp File Browser version 1.2 by
www.vonloesch.de