#!/usr/local/bin/perl # # $Header: emdb/sysman/admin/scripts/streams/pstats.pl /st_emdbsa_11.2/4 2009/03/22 07:52:28 sresrini Exp $ # # pstats.pl # # Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved. # # NAME # pstats.pl - # # DESCRIPTION # # # NOTES # # # MODIFIED (MM/DD/YY) # sadattaw 09/23/08 - # ashomish 09/10/08 - # sadattaw 08/21/08 - # sshastry 06/10/08 - # anosriva 05/27/08 - # sresrini 02/18/08 - streams persistent queue stats # sresrini 02/18/08 - Creation # use strict; use DBI; print("starting...0 \n"); require "emd_common.pl"; require "semd_common.pl"; #require "jdbc_oci_connector.pl"; print("starting 1...\n"); # For TESTING: comment out stdin args my %stdinArgs = get_stdinvars(); my $username = $stdinArgs{"EM_TARGET_USERNAME"}; my $password = $stdinArgs{"EM_TARGET_PASSWORD"}; my $address = $ENV{EM_TARGET_ADDRESS}; my $streamstype = $ENV{STREAMS_TYPE}; my $role = $ENV{EM_TARGET_ROLE}; my $mode = 0; my $stmt; print("starting 2...\n"); if($role =~ /SYSDBA/i) { $mode = 2; } my $verStr = $ENV{VersionCategory}; if (!defined($verStr)) { $verStr = $ENV{VERSIONCATEGORY}; } my $db_version = $verStr; if (index($db_version, '9') == 0) { $db_version = 9; } else { $db_version = substr($db_version, 0, 2); } print("got dbversion.."); print("started...\n dbversion is $db_version"); if($db_version != 9) { print("dbversion is not 9"); } # TESTING: Un-comment for manual testing #my $username = "SYSTEM"; #my $password = "manager"; #my $address = "(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=)(PORT=15043))(CONNECT_DATA=(SID=t102gc)))"; #my $streamstype = "APPLY"; # -------------------------------------------------------------------- # +++ Establish Target DB Connection # -------------------------------------------------------------------- my %connection; $connection{"db_conninfo"} = $address; $connection{"db_username"} = $username; $connection{"db_password"} = $password; $connection{"db_mode"} = $mode; #jdbc_oci_connect(\%connection) # or die (filterOraError("em_error=Could not connect to $username/$address: ".jdbc_oci_returnErrStr(\%connection), jdbc_oci_returnErr(\%connection))); #jdbc_oci_register_metric_call(\%connection); # # -------------------------------------------------------------------- # +++ Establish Target DB Connection # -------------------------------------------------------------------- my $dbconn = DBI->connect('dbi:Oracle:', "$username@".$address, "$password", {ora_session_mode => $mode, PrintError => 0, RaiseError => 0, AutoCommit => 0}) or die "em_error=Could not connect to $username/$address: $DBI::errstr\n"; print("Connected...\n"); #Collect statistics for buffered queue if($db_version != 9) { my $bufsql = "select propagation_name, 'BUFFERED', num_msgs ready, 0 from "; $bufsql .="gv\$buffered_subscribers b,dba_propagation p, dba_queues q, dba_queue_tables t where "; $bufsql .="b.subscriber_name = p.propagation_name and b.subscriber_address = p.destination_dblink and "; $bufsql .="b.queue_schema = p.source_queue_owner and b.queue_name = p.source_queue_name and "; $bufsql .="p.source_queue_name = q.name and p.source_queue_owner = q.owner and "; $bufsql .="q.queue_table = t.queue_table and b.inst_id=t.owner_instance"; # print "\n".$bufsql."\n"; #my $bufcur = jdbc_oci_prepare(\%connection, $bufsql) # or die (filterOraError("em_error=prepare($bufsql): ".jdbc_oci_returnErrStr(\%connection), jdbc_oci_returnErr(\%connection))); #jdbc_oci_execute(\%connection, $bufcur) # or die (filterOraError("em_error=cur->execute(): ".jdbc_oci_returnErrStr(\%connection,$bufcur), jdbc_oci_returnErr(\%connection,$bufcur))); $stmt = $dbconn->prepare( $bufsql ); $stmt->execute(); my( $col1, $col2, $col3, $col4); $stmt->bind_columns( undef, \$col1, \$col2, \$col3, \$col4); my @fetch_bufqrow; ##while ( @fetch_bufqrow = jdbc_oci_fetchrow_array(\%connection, $bufcur) ) while ( $stmt->fetch() ) { # print "em_result=".$fetch_bufqrow[0]."|".$fetch_bufqrow[1]."|".$fetch_bufqrow[3]."|".$fetch_bufqrow[4]."\n"; print "em_result=".$col1."|".$col2."|".$col3."|".$col4."\n"; } } #Collect persistent queue statistics my $sql; if($streamstype eq "APPLY") { $sql = "select apply_name streams_name,'APPLY' streams_type,'' address,queue_table,queue_owner,queue_name from dba_queues, dba_apply where owner=queue_owner and queue_name=name and apply_captured='NO'"; } else { $sql .="select propagation_name streams_name,'PROPAGATION' streams_type,'\"'||destination_queue_owner||'\".\"'||destination_queue_name||'\""."\@'||destination_dblink address,queue_table,owner,source_queue_name from dba_queues, dba_propagation where owner=SOURCE_QUEUE_OWNER and SOURCE_QUEUE_NAME=name"; } # print "\n".$sql."\n"; #my $cur = jdbc_oci_prepare(\%connection, $sql) # or die (filterOraError("em_error=prepare($sql): ".jdbc_oci_returnErrStr(\%connection), jdbc_oci_returnErr(\%connection))); #jdbc_oci_execute(\%connection, $cur) # or die (filterOraError("em_error=cur->execute(): ".jdbc_oci_returnErrStr(\%connection,$cur), jdbc_oci_returnErr(\%connection,$cur))); my $stmt2 = $dbconn->prepare( $sql ); $stmt2->execute(); my $streams_name; my $streams_type; my $queue_table; my $queue_owner; my $queue_name; my $prop_address; $stmt2->bind_columns( undef, \$streams_name, \$streams_type, \$prop_address, \$queue_table, \$queue_owner, \$queue_name); my @all_streams_names; my @all_streams_types; my @all_queue_tables; my @all_queue_owners; my @all_queue_names; my @all_prop_addresses; my $all_streams_count = 0; my $all_msgdata_count = 0; my $numqts = 0; my @all_msg_states; my @all_queues; my @all_counts; my @all_queue_ow; my @qts_unique; my @all_addresses; #my @fetch_row; #@fetch_row = jdbc_oci_fetchrow_array(\%connection, $ro_cur); ##while ( @fetch_row = jdbc_oci_fetchrow_array(\%connection, $cur) ) { while ( $stmt2->fetch() ) { $all_streams_count++; push (@all_streams_names, $streams_name); push (@all_streams_types,$streams_type ); print "\npushing queue table...".$queue_table; push (@all_queue_tables, $queue_table); push (@all_queue_owners, $queue_owner); push (@all_queue_names, $queue_name); push(@all_prop_addresses,$prop_address); # print "$streams_name, $streams_type, $queue_table, $queue_owner, $queue_name, $prop_address \n"; #print "em_result=$streams_name|APPLY|123\n"; } QTLOOP: for ( my $i = 0; $i < $all_streams_count; $i++ ) { my $qtable = "\"".$all_queue_owners[$i]."\".\"AQ\$".$all_queue_tables[$i]."\""; print "\n".$qtable."\n"; for(my $j = 0; $j < $numqts ;$j ++) { my $qt = $qts_unique[$j]; if ($qt eq $qtable) { next QTLOOP; } } push(@qts_unique,$qtable ); $numqts++; } my $qclause = ""; for(my $q = 0; $q < $all_streams_count; $q++) { if ($q == 0) { $qclause = "queue in (" ; } $qclause .= "'".$all_queue_names[$q]."'"; if ($q == $all_streams_count - 1) { $qclause .= ") and " ; } else { $qclause .= ","; } } for(my $k = 0; $k < $numqts ;$k ++) { my $qtable = $qts_unique[$k]; my $qtsql; $qtsql = "select queue,address,msg_state,nvl(count(msg_state),0) from ".$qtable." where ".$qclause." msg_state in ('READY','WAITING') group by queue,address,msg_state"; print "\n".$qtsql."\n"; # my $qtcur = jdbc_oci_prepare(\%connection, $qtsql) # or die (filterOraError("em_error=prepare($sql): ".jdbc_oci_returnErrStr(\%connection), jdbc_oci_returnErr(\%connection))); # jdbc_oci_execute(\%connection, $qtcur) # or die (filterOraError("em_error=cur->execute(): ".jdbc_oci_returnErrStr(\%connection,$qtcur), jdbc_oci_returnErr(\%connection,$qtcur))); my $stmt3 = $dbconn->prepare( $qtsql ); $stmt3->execute(); my( $queue, $address, $msg_state, $count); $stmt3->bind_columns( undef, \$queue, \$address, \$msg_state, \$count); # my @fetch_row_qt; # while ( @fetch_row_qt = jdbc_oci_fetchrow_array(\%connection, $qtcur)) while( $stmt3->fetch() ) { push(@all_queues,$queue); push(@all_msg_states,$msg_state); push(@all_counts,$count); push(@all_addresses,$address); $all_msgdata_count++; } $stmt3->finish(); } print "\nall streams count :".$all_streams_count."\n"; if ($all_streams_count == 0) { print "em_result=\n"; } else { for (my $l = 0; $l < $all_streams_count; $l++) { my $streams_name = $all_streams_names[$l]; my $streams_type = $all_streams_types[$l]; my $queue_table = $all_queue_tables[$l]; my $prop_address = $all_prop_addresses[$l]; my $queue_owner = $all_queue_owners[$l]; my $queue_name = $all_queue_names[$l]; my $result="em_result=$streams_name|PERSISTENT|"; my $c = 0; MSG_LOOP: for (my $m = 0;$m<$all_msgdata_count ;$m++) { my $queue = $all_queues[$m] ; my $address = $all_addresses[$m] ; my $msg_state = $all_msg_states[$m] ; my $count = $all_counts[$m] ; if(($queue eq $queue_name) && ($address eq $prop_address)) { if($msg_state eq 'READY') { $result.=$count."|"; $c++; for (my $n = 0;$n<$all_msgdata_count ;$n++) { if(($queue eq $all_queues[$n]) && ($all_msg_states[$n] eq 'WAITING')) { $result.=$all_counts[$n]."\n"; $c++; next MSG_LOOP; } if($n == $all_msgdata_count -1) { $result.="0\n"; } } } else { if($msg_state eq 'WAITING') { for (my $n = 0;$n<$all_msgdata_count ;$n++) { if(($queue eq $all_queues[$n]) && ($all_msg_states[$n] eq 'READY')) { $result.=$all_counts[$n]."|"; $c++; next MSG_LOOP; } if($n == $all_msgdata_count -1) { $result.="0|"; } } $result.=$count."\n"; $c++; } } } } if($c > 0) { print $result; } } } $stmt->finish(); $stmt2->finish(); $dbconn->disconnect(); exit 0;