Esper系列(一)初探

Esper介绍

Esper是一个Java开发并且开源的轻量级和可扩展的事件流处理和复合事件处理引擎,并提供了定制的事件处理语言(EPL)。

应用场景

某个用户在请求登录服务时,n秒内连续m次未登录成功,可视为该ip在暴力破解密码。又或者:用户在页面上的操作间隔超过n秒即认为该用户已关闭该网页。也许上面的几个例子不够好或者已经有别的方式实现,但是Esper确实能够将其抽象成多个关联的事件进行处理。

源码地址: http://www.espertech.com/esper/distributions/

 

事件的分类

简单事件处理(SEP):基于单个事件,即:触发并响应,通常采用点对点(Queue)和发布和订阅(Topic)[类似观察者模式];
事件流处理(ESP):事件的出发需要分析事件流,分析采用基于事件窗口和事件数量窗口的方式;
复合事件处理(CEP):先捕获各种细微基础事件,然后分析整体找出的更有意义的事件。

 

EPL与SQL的区别

SQL:每执行一次SQL语句就会执行一次查询,存储的是具体的数据。
EPL:当满足到设定的执行条件后才触发执行,存储的是具体的功能性操作(如:查询、删除、插入等)而非操作所需的数据。

 

Esper的适配器

输入输出适配器,API提供对实时数据流输入到Esper容器,或事件输出的各种途径或方式,该过程需将实时数据流转成实时事件流,并以Object、Map、Node形式的事件发送的引擎;

输入适配种类:
CVS,Spring JMS,HTTP,Socket、关系型数据库

输出适配种类:
Spring JMS,HTTP,XML,JSON

 

事件类型

POJO、Map、Object Array,XML

 

示例程序:

orderBean.java

// 引用数据类型类定义 
public class orderBean { 
    private String key; 
    private String value; 
    public String getKey() { 
        return key; 
    } 
    public void setKey(String key) { 
        this.key = key; 
10      } 
11      public String getValue() { 
12          return value; 
13      } 
14      public void setValue(String value) { 
15          this.value = value; 
16      } 
17  }

orderEvent.java

// 事件流单元实现类 
public class orderEvent { 
    //基本数据类型 
    private String name; 
    private int salary; 
    // 集合数据类型,数据项成员为引用数据类型 
    private List<orderBean> orderBeans; 
    // 集合数据类型,KEY为基本数据类型,VALUE为引用数据类型 
    private Map<Integer, orderBean> orderMap; 
10      // 引用数据类型 
11      private orderBean bean; 
12      // 对应事件流中orderBeans属性获取 
13      public String getOrderBeans(int index) { 
14          return orderBeans.get(index).getValue(); 
15      } 
16      // 对应事件流中orderBeans属性设置 
17      public void setOrderBeans(List<orderBean> orderBeans) { 
18          this.orderBeans = orderBeans; 
19      } 
20      // 测试使用 
21      public String getOrderBeanListString(int index){ 
22          orderBean bean = orderBeans.get(index); 
23          return bean.getKey()+":"+bean.getValue(); 
24      } 
25      // 对应事件流中orderMap属性获取 
26      public String getOrderMap(int id) { 
27          return orderMap.get(id).getValue(); 
28      } 
29      // 对应事件流中orderMap属性设置 
30      public void setOrderMap(Map<Integer, orderBean> orderMap) { 
31          this.orderMap = orderMap; 
32      } 
33      public orderBean getBean() { 
34          return bean; 
35      } 
36      public void setBean(orderBean bean) { 
37          this.bean = bean; 
38      } 
39      public orderEvent() { 
40      } 
41      public orderEvent(String name, int salary) { 
42          this.name = name; 
43          this.salary = salary; 
44      } 
45      public String getName() { 
46          return name; 
47      } 
48      public void setName(String name) { 
49          this.name = name; 
50      } 
51      public int getSalary() { 
52          return salary; 
53      } 
54      public void setSalary(int salary) { 
55          this.salary = salary; 
56      } 
57  }

orderListener.java

/** 
* 用于监听某个EPL在引擎中的运行情况,事件进入并产生结果后会回调UpdateListener 
* 必须实现 UpdateListener 接口 
*/ 
public class orderListener implements UpdateListener { 
 
    /** 
     * arg0对应newEvent,arg1对应oldEvent 
     */ 
10      @Override 
11      public void update(EventBean[] arg0, EventBean[] arg1) { 
12          if (null != arg0) { 
13              System.out.println("orderEvent Count is "+arg0[0].get("result")); 
14          } else { 
15              System.out.println("EventBean is Null "); 
16          } 
17      } 
18  }
19   
20   
21  orderMainTest.java
22   
23  public class orderMainTest { 
24      public static void main(String[] args){ 
25          // 添加配置(包所在路劲),方面后面的引用自动添加包名前缀 
26          Configuration config = new Configuration(); 
27          config.addEventTypeAutoName("cn.chenx.esper"); 
28          // 
29          EPServiceProvider epServiceProvider = EPServiceProviderManager.getDefaultProvider(config); 
30          EPAdministrator epAdmin = epServiceProvider.getEPAdministrator(); 
31          // 计算平均数值 
32          String epsql = "select avg(salary) as result from orderEvent.win:length_batch(3)"; 
33          epAdmin.createEPL(ctsql); 
34          EPStatement epstate = epAdmin.createEPL(epsql); 
35          epstate.addListener(new orderListener()); 
36          EPRuntime epRuntime = epServiceProvider.getEPRuntime(); 
37          // 
38          for (int i=0;i<3;i++){ 
39              int seed = (int)(Math.random()*100); 
40              orderEvent event = new orderEvent(""+seed, 100+seed); 
41              System.out.println("seed name:"+event.getName()+",salary:"+event.getSalary()); 
42              orderBean bean = new orderBean(); 
43              bean.setKey("BeanKey:"+i); 
44              bean.setValue("BeanValue:"+i); 
45              event.setBean(bean); 
46              List<orderBean> list = new ArrayList<orderBean>(); 
47              for (int j=0;j<10;j++){ 
48                  bean = new orderBean(); 
49                  bean.setKey("ListKey:"+j); 
50                  bean.setValue("ListValue:"+j); 
51                  list.add(bean); 
52              } 
53              event.setOrderBeans(list); 
54              Map<Integer, orderBean> map = new HashMap<Integer, orderBean>(); 
55              for (int k=0; k<10;k++){ 
56                  bean = new orderBean(); 
57                  bean.setKey("MapKey"+k); 
58                  bean.setValue("MapValue"+k); 
59                  map.put(k, bean); 
60              } 
61              event.setOrderMap(map); 
62              epRuntime.sendEvent(event); 
63          } 
64      } 
65  }

输出结果:

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
className is orderEvent
epsql:select avg(salary) as result from orderEvent.win:length_batch(3)
seed name:张50,salary:150
seed name:张59,salary:159
seed name:张86,salary:186
orderEvent Count is 165.0

 

原文地址:https://www.cnblogs.com/jianyuan/p/4402928.html