博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
oracle的并行管道函数
阅读量:2437 次
发布时间:2019-05-10

本文共 2818 字,大约阅读时间需要 9 分钟。

并行管道函数

这个例子中要使用两个表:T1和T2。T1是先读的表,T2表用来插入这个信息。我们要用的两个表如下:

sys@JINGYONG> create table t1  2  as  3  select object_id id,object_name text  4  from all_objects;表已创建。sys@JINGYONG> begin  2  dbms_stats.set_table_stats  3  (user,'T1',numrows=>100000000,numblks=>100000);  4  end;  5  /PL/SQL 过程已成功完成。sys@JINGYONG> create table t2  2  as  3  select t1.*,0 session_id  4  from t1  5  where 1=0;表已创建。

这里使用DBMS_STATS来骗过优化器,让它以为输入表中有10,000,000行,而且占用了100,000个数据库块。在此模拟 一个大表。第二个表T2是第一个表的一个副本,只是在结构中增加了一个SESSION_ID列。可以通过它具体看到是否发生了并行化。接下来,需要建立管道函数返回的对象类型。在这个例子中,对象类型类似于T2:

sys@JINGYONG> create or replace type t2_type  2  as object  3  (  4  id number,  5  text varchar2(30),  6  session_id number  7  );  8  /类型已创建。sys@JINGYONG> create or replace type t2_tab_type as table of t2_type;  2  /类型已创建。

现在这个过程是一个生成行的函数。它接收数据作为输入,并在一个引用游标(ref cursor)中处理。这个函数返回一个 T2_TAB_TYPE,这就是我们刚才创建的对象类型。这是一个PARALLEL_ENABLED(启用子并行)的管道函数。在此使用了分区 (partition)子句,这就告诉Oracle:以任何最合适的方式划分或分解数据。我们不需要对数据的顺序做任何假设。

在此,我们只想划分数据。数据如何划分对于我们的处理并不重要,所以定义如下:

sys@JINGYONG> create or replace function parallel_pipelined(l_cursor in sys_refcursor)  2  return t2_tab_type  3  pipelined  4  parallel_enable(partition l_cursor by any)  5  is  6   l_session_id number;  7   TYPE type_t1_data IS TABLE OF t1%ROWTYPE INDEX BY PLS_INTEGER;  8   l_t1  type_t1_data;  9 10  begin 11  select sid into l_session_id 12  from v$mystat 13  where rownum=1; 14  loop 15    fetch l_cursor bulk collect into l_t1;--用bulk collect来一次性获取数据 16    exit when l_t1.count=0; 17    for i in 1 .. l_t1.count loop 18          pipe row(t2_type(l_t1(i).id,l_t1(i).text,l_session_id)); 19    end loop; 20    null; 21  end loop; 22  close l_cursor; 23  return; 24  end; 25  /Function created

或者用下面的过程来一行一行来获取

create or replacefunction parallel_pipelined( l_cursor in sys_refcursor )return t2_tab_typepipelinedparallel_enable ( partition l_cursor by any )is l_session_id number; l_rec t1%rowtype;begin select sid into l_session_id from v$mystat where rownum =1; loop fetch l_cursor into l_rec; exit when l_cursor%notfound; pipe row(t2_type(l_rec.id,l_rec.text,l_session_id)); end loop; close l_cursor; return;end;

这样就创建了函数。我们准备并行地处理数据,让Oracle根据可用的资源来确定最合适的并行度:

SQL> insert /*+ append */  2  into t2(id,text,session_id)  3   select *  4   from table(parallel_pipelined  5   (CURSOR(select /*+ parallel(t1) */ *  6   from t1 )  7  ))  8  ;50333 rows insertedSQL> commit;Commit complete

为了查看这里发生了什么,可以查询新插入的数据,并按SESSION_ID分组,先来看使用了多少个并行执行服务器,再看每个并行 执行服务器处理了多少行:

SQL> select session_id,count(*) from t2 group by session_id;SESSION_ID   COUNT(*)---------- ----------       136      31006       145      19327

显然,对于这个并行操作的SELECT部分,我们使用了2个并行执行服务器,可以看到,Oracle对我们的过程进行了并行化

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/26015009/viewspace-773711/,如需转载,请注明出处,否则将追究法律责任。

转载于:http://blog.itpub.net/26015009/viewspace-773711/

你可能感兴趣的文章
SICP 练习 1.3
查看>>
pg 数据库HA 启动脚本的两个假设
查看>>
PG9.2.3 发布
查看>>
sql_log_bin在GTID复制下的一个现象
查看>>
双主+haproxy手工切换的一个注意点
查看>>
利用binlog2sql实现闪回
查看>>
mongos分片集群下db数量过多导致服务不可用
查看>>
mysql唯一索引的一个小常识--Duplicate entry 'XXX' for key 'XXX'
查看>>
故障处理--mongos count不准
查看>>
大量短连接导致haproxy服务器端口耗尽
查看>>
mongo3.0.9库命名的一个S级bug
查看>>
跨版本导入数据导致mysqld崩溃
查看>>
xtrabackup对于flush tables with read lock操作的设置
查看>>
Gone away故障原因排查
查看>>
Server has authorization schema version 3,but found a schema version 1 user
查看>>
WebSphere的池设置——线程池、连接池
查看>>
caffe-ssd调试问题总结
查看>>
用户态调测工具(二):perror和man
查看>>
机器学习&深度学习入门历程
查看>>
LTP(Linux Test Project)学习(一)——LTP介绍
查看>>