pipelinedb--流、滑动窗口测试

https://blog.csdn.net/liuxiangke0210/article/details/74010951

https://yq.aliyun.com/articles/166

一、pipelineDB默认的用户不是postgres而是pipeline。

pipeline=# c
You are now connected to database "pipeline" as user "steven".

  

进入数据库 命令:pipeline  pipeline

[steven@steven1 ~]$ pipeline pipeline
pipeline (9.5.3)
Type "help" for help.

pipeline=#

  

创建一个流 stream,一个stream就是一个FDW,其实不存储任何数据。

pipeline=# create stream stream_test(x integer, y integer, z text);
CREATE FOREIGN TABLE

查看流结构

pipeline=# d stream_test;
                   Foreign table "public.stream_test"
      Column       |           Type           | Modifiers | FDW Options
-------------------+--------------------------+-----------+-------------
 x                 | integer                  |           |
 y                 | integer                  |           |
 z                 | text                     |           |
 arrival_timestamp | timestamp with time zone |           |
Server: pipelinedb

  

  

创建一个CONTINUOUS 连续视图

pipeline=# create continuous view v_sum as select sum (x + y) from stream_test;
CREATE VIEW
pipeline=# create continuous view v_group as select count(*) as coun,x,y,z from stream_test group by x,y,z;
CREATE VIEW

pipeline=# create continuous view v_single as select x,z from stream_test;
CREATE VIEW

  

  

stream 只能被continuous查询,如果直接查询会报错,被告知只能被continous view读取。

查看continues  views结构

pipeline=# d v_group
    View "public.v_group"
 Column |  Type   | Modifiers
--------+---------+-----------
 coun   | bigint  |
 x      | integer |
 y      | integer |
 z      | text    |
pipeline=# d v_single
    View "public.v_single"
 Column |  Type   | Modifiers
--------+---------+-----------
 x      | integer |
 z      | text    |

  

创建好continuous,会附带创建一些别的东西。

pipeline=# d
                 List of relations
 Schema |       Name       |     Type      | Owner
--------+------------------+---------------+--------
 public | v                | view          | steven
 public | v_group          | view          | steven
 public | v_group_mrel     | table         | steven
 public | v_group_osrel    | foreign table | steven
 public | v_group_seq      | sequence      | steven
 public | v_mrel           | table         | steven
 public | v_osrel          | foreign table | steven
 public | v_seq            | sequence      | steven
 public | v_single         | view          | steven
 public | v_single_mrel    | table         | steven
 public | v_single_osrel   | foreign table | steven
 public | v_single_seq     | sequence      | steven
 public | v_sum            | view          | steven
 public | v_sum_mrel       | table         | steven
 public | v_sum_osrel      | foreign table | steven
 public | v_sum_seq        | sequence      | steven
(34 rows)

v_group  这个跟数据库中普通的View很类似,不存储任何东西,可以把他理解成一个materialized view,并且是非常高吞吐量,realtime的物化视图。

*_mrel,这个就是存储具体数据的,跟pg中的物理表是一样一样的。上面的cv就是这个物理表的一个壳子,不过这个物理表存储的内容可能是HLL格式。

*_seq,这个是给物理表创建的一个PK,看看cv_mrel发现默认会有个$pk字段。

*cv_osrel  这个是internal relation representing an output stream

插入数据到stream

pipeline=# insert into stream_test (x,y,z) values(1,2,'a'),(3,4,'b'),(5,6,'c'),(7,8,'d'),(1,2,'a');
INSERT 0 5

  

查询

pipeline=# select * from v_sum;
 sum
-----
  39
(1 row)


pipeline=# select * from v_group;
 coun | x | y | z
------+---+---+---
    1 | 7 | 8 | d
    1 | 5 | 6 | c
    2 | 1 | 2 | a
    1 | 3 | 4 | b
(4 rows)
pipeline=# select * from v_group_mrel;
 coun | x | y | z | $pk
------+---+---+---+-----
    1 | 7 | 8 | d |   1
    1 | 5 | 6 | c |   2
    2 | 1 | 2 | a |   3
    1 | 3 | 4 | b |   4
(4 rows)

cvcv_mrel只是多了个$pk,这是在普通情况下,数据是这样的,如果做agg可能数据存储为HLL格式.

滑动窗口

我们来看看滑动窗口,在流计算中,窗口是个很重要的东西,例如最近5分钟,最近1小时,最近1天的汇总。  

1、创建一个流,列名time,数据类型timestamp;

pipeline=# create stream sliding (time timestamp);

  

2、创建一个滑动窗口(流动视图)

pipeline=# create continuous view cv_sliding with(sw='1 minute') as select time from sliding;
CREATE VIEW

  

3、插入一条当前时间数据

pipeline=# insert into sliding(time) values(now());
INSERT 0 1

  

4、查询

pipeline=# select * from cv_sliding;
            time
----------------------------
 2018-05-18 08:46:58.771057     
(1 row)

  

5、过一会再插入两条时间数据,再次查询

pipeline=# insert into sliding(time) values(now());
INSERT 0 1
pipeline=# insert into sliding(time) values(now());
INSERT 0 1

  

pipeline=# select * from cv_sliding;
            time
----------------------------
 2018-05-18 08:46:58.771057
 2018-05-18 08:47:22.253052
 2018-05-18 08:47:29.265144
(3 rows)

  可以看到三条数据

6、过一会查询,少了一条,再过一会全部消失

pipeline=# select * from cv_sliding;
            time
----------------------------
 2018-05-18 08:47:22.253052
 2018-05-18 08:47:29.265144
(2 rows)

  

pipeline=# select * from cv_sliding;
 time
------
(0 rows)

  

ttl功能

pipeline=# create continuous view v_ttl with (ttl = '10 minute',ttl_column= 'minute') as select minute(arrival_timestamp), count(*) from sliding group by minute;
CREATE VIEW

  

pipeline=# insert into sliding values(now());
INSERT 0 1
pipeline=# insert into sliding values(now());
INSERT 0 1
pipeline=# insert into sliding values(now());
INSERT 0 1
pipeline=# insert into sliding values(now());
INSERT 0 1

pipeline=# select * from v_ttl;
         minute         | count
------------------------+-------
 2018-05-18 09:04:00+00 |     4

  

pipeline=# insert into sliding values(now());
INSERT 0 1
pipeline=# select * from v_ttl;
         minute         | count
------------------------+-------
 2018-05-18 09:04:00+00 |     4
 2018-05-18 09:06:00+00 |     1
(2 rows)

  

transform

1、创建流和相对应的流动视图

pipeline=# create stream str1(x bigint,y text,z timestamp);
CREATE FOREIGN TABLE
pipeline=# create stream str2(x bigint,y text,z timestamp);
CREATE FOREIGN TABLE
pipeline=# create continuous view cv_1 as select x,y,z from str1;
CREATE VIEW
pipeline=# create continuous view cv_2 as select x,y,z from str2;
CREATE VIEW
pipeline=#

  

2、创建transform

pipeline=# create continuous transform tran_1 as select x,y,z from str1 then execute procedure pipeline_stream_insert('str2');
CREATE VIEW
pipeline=# insert into str1(x,y,z) values(1,'hi,i from str1',now());
INSERT 0 1
pipeline=# select * from cv_1;
 x |       y        |             z
---+----------------+---------------------------
 1 | hi,i from str1 | 2018-05-18 09:21:01.11329
(1 row)

pipeline=# select * from cv_2;
 x |       y        |             z
---+----------------+---------------------------
 1 | hi,i from str1 | 2018-05-18 09:21:01.11329
(1 row)

  

在创建Transform用到的pipeline_stream_insertPipelineDB自己提供的一个函数,这个我们可以自己定义一个函数。

pipeline=# create table t(x bigint,y text,z timestamp);

CREATE TABLE

 

pipeline=# CREATE OR REPLACE FUNCTION insert_into_t()

pipeline-#   RETURNS trigger AS

pipeline-#   $$

pipeline$#   BEGIN

pipeline$#     INSERT INTO t (x, y,z) VALUES (NEW.x, NEW.y,NEW.z);

pipeline$#     RETURN NEW;

pipeline$#   END;

pipeline$#   $$

pipeline-#   LANGUAGE plpgsql;

CREATE FUNCTION

 

pipeline=# CREATE CONTINUOUS TRANSFORM tran_t AS

pipeline-#   SELECT x,y,z FROM str1

pipeline-#   THEN EXECUTE PROCEDURE insert_into_t();

CREATE CONTINUOUS TRANSFORM

 

pipeline=# insert into str1(x,y,z) values(10,'I want insert table t',now());

INSERT 0 1

pipeline=# select * from t;

 x  |           y           |             z

----+-----------------------+---------------------------

 10 | I want insert table t | 2017-05-15 14:01:48.17516

(1 row)

 

自己写了一个trigger,然后把数据插入到表T中。

  

原文地址:https://www.cnblogs.com/sunshine-long/p/9057125.html