Esper系列(二)时间窗口、长度窗口、cast、注解、自定义函数、静态方法

长度窗口实现原理图

image 说明:

上图长度窗口为5,事件W1至W5进入引擎后属于NewEvents队列,事件W6进入引擎后,W2至W6就属于NewEvents队列,而事件W1就属于OldEvents队列了。NewEvents为先进先出队列,队列长度为EPL语句中制定的长度窗口大小,OldEvent队列为过期数据的存放队列。

EPL长度窗口示例

select count(*) as result from orderEvent.win:time_batch(3 sec)

时间窗口实现原理图

image

说明:

从此图可以看出,随着时间推移,每个进入到引擎的W事件都是newEvents,即Insert Stream。W后括号里的值为属性值(可忽略),超过EPL语句设置的时间窗口值的事件将进入OldEvent队列.

 

时间周期格式

year-part : (number|variable_name) ("years" | "year")
month-part : (number|variable_name) ("months" | "month")
week-part : (number|variable_name) ("weeks" | "week")
day-part : (number|variable_name) ("days" | "day")
hour-part : (number|variable_name) ("hours" | "hour")
minute-part : (number|variable_name) ("minutes" | "minute" | "min")
seconds-part : (number|variable_name) ("seconds" | "second" | "sec")
milliseconds-part : (number|variable_name) ("milliseconds" | "millisecond" |  "msec")

 

EPL时间窗口示例

// 统计最近三秒内获取的事件中salary的平均值
String epsql = "select avg(salary) as result from orderEvent.win:time(3 sec)";

数据类型转换(cast)

示例:

// avg(salary)默认返回为Double类型,cast(avg(salary),int)转换为int类型;
// 30秒内 salary 的平均值
String epsql = "select bean,name,cast(avg(salary),int) from orderEvent.win:time(3 msec)";

Annotation(注解)

  1. @Name 指定EPL的名称,参数只有一个。例如:@Name("MyEPL")
  2. @Description 对EPL进行描述,参数只有一个。例如:@Description("Hello World")
  3. @Tag 对EPL进行额外的说明,参数有两个,分别为Tag的名称和Tag的值,用逗号分隔。例如:@Tag(name="author",value="luonanqin")
  4. @Priority 指定EPL的优先级,参数只有一个,并且整数(可负可正)。例如:@Priority(10)
  5. @Drop 指定事件经过此EPL后不再参与其他的EPL计算,该注解无参数。
  6. @Hint 为EPL加上某些标记,让引擎对此EPL产生其它的操作,会改变EPL实例的内存占用,但通常不会改变输出。其参数固定,由Esper提供。
  7. @Audit EPL添加此注解后,可以额外输出EPL运行情况,有点类似日志的感觉(当然没有日志的功能全啦)。
  8. @Hook 与SQL相关。
  9. @EventRepresentation 这是用来指定EPL产生的计算结果事件包含的数据形式。参数只有一个,即array=true或array=false。false为默认值,代表数据形式为Map,若为true,则数据形式为数组。

示例:

String epsql = "@Name("EsperEvent")select avg(salary) from OrderEventMap.win:length_batch(2)";
 
    EPStatement epstate = epAdmin.createEPL(epsql);
    epstate.addListener(new orderListener());
    System.out.println("Name is ["+epstate.getName()+"]");

Expression(自定义函数)

格式:

Expression expression_name { expression_body }

expression_name为自定义的Expression名,expression_body为Expression的具体内容。

expression_body表现形式为:(input_param[,…] ]) => expression

input_param为事件流别名(不能和事件流同名)

示例:

// 创建转换函数 add
String expSql = "create expression add { x => x.salary+500 }";
// 将oe(orderEvent)事件流中的salary属性值加500
String epsql = "select add(oe) as s from orderEvent.win:length_batch(1) as oe";
 
EPStatement expstate = epAdmin.createEPL(expSql);
expstate.addListener(new orderListener());
EPStatement epstate = epAdmin.createEPL(epsql);
epstate.addListener(new orderListener());

自定义静态方法的应用

示例(事件流过滤)

// 判断总数是否等于0  
public class IsZero  
{  
    public static boolean isZero(int sum)  
    {  
        return sum==0;  
    }  
}  
  
10  // 加载  
11  epService.getEPAdministrator().getConfiguration().addImport(IsZero.class);  
12    
13  // 查询没有钱的用户的name值(User包含name和money属性)  
14  select name from orderEvent(IsZero.isZero(salary))

注意:

1、要过滤的属性只能是数字和字符串。

2、过滤表达式中不能使用聚合函数。

示例(事件流转换输出)

通过自定义类的静态方法转换事件流的输出属性。

BaseUntil.java(静态方法实现类)

public class BaseUntil {
   
    public static int Add(int n){
        return n+100;
    }
   
    public static String UpdataText(String str){
        return str+",你好!";
    }
10  }
11   
12  // 字节码加载  
13  epService.getEPAdministrator().getConfiguration().addImport(BaseUntil.class);
14   
15  String epsql = "select BaseUntil.UpdataText(name) as result from orderEvent  ";
原文地址:https://www.cnblogs.com/jianyuan/p/4827771.html