SparkStreaming和Drools结合的HelloWord版

关于sparkStreaming的测试Drools框架结合版

package com.dinpay.bdp.rcp.service;

import java.math.BigDecimal;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;

import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.kie.api.KieServices;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;

import com.dinpay.bdp.rcp.metaq.MetaQReceiver;
import com.dinpay.bdp.rcp.streaming.StreamingUtil;
import com.dinpay.bdp.rcp.util.CodisUtil;
import com.dinpay.bdp.rcp.util.Constant;
import com.dinpay.dpp.rcp.po.Order;

import redis.clients.jedis.Jedis;
import scala.Tuple2;

/**
 * 同卡号单日最大交易金额测试 
 * @author ll-t150
 *
 */
public class SparkDroolsTest {
    
    public static Logger logger = Logger.getLogger(SparkDroolsTest.class);
    public static final DateFormat df = new SimpleDateFormat("yyyyMMdd");
    
     public static void main(String[] args) {
         String zkConnect=Constant.METAZK;
         String zkRoot="/meta";
         String topic=Constant.ORDERTOPIC;
         String group=Constant.STREAMGROUP; 
         //屏蔽日志
         Logger.getLogger("org.apache.spark").setLevel(Level.OFF);
         logger.info("metaq configuration:"+zkConnect+"--"+topic+"--"+group);
         SparkConf sparkConf = new SparkConf().setAppName("SparkDroolsTest").setMaster("local[2]");  
         JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); 
         //从metaq取消息
         JavaReceiverInputDStream<Order> lines = ssc.receiverStream(new MetaQReceiver(zkConnect,zkRoot,topic,group));

         JavaDStream<Order> words = lines.flatMap(new FlatMapFunction<Order, Order>() {
            @Override
            public Iterable<Order> call(Order order) throws Exception {
                return Arrays.asList(new Order[]{order});
            }
        });
        
        //同卡号单日交易最大次数 统计包括成功和未成功的订单
        JavaPairDStream<String, Integer> cardCntPairs = getCardJavaPair(words);
        save2Codis(cardCntPairs);
        ssc.start();
        ssc.awaitTermination();
        ssc.close();
     }
     
     @SuppressWarnings({ "unchecked", "serial" })
        public static <T> JavaPairDStream<String, T>  getCardJavaPair(JavaDStream<Order> words){
             JavaPairDStream<String, T> pairs = null;
                     //次数统计
                     pairs = (JavaPairDStream<String, T>) words.mapToPair(new PairFunction<Order, String, Integer>() {
                        @Override
                        public Tuple2<String, Integer> call(Order order) {
                            Jedis jedis = CodisUtil.getJedisPool().getResource();
                            String cardCntkey = order.getSystemId()+"_CNT_"+order.getPayerCardNo()+"_"+df.format(new Date());
                            //拼接key,先到codis里面查找对应的key是否存在,若存在就直接取对应的值,然后取值加1
                            String value = jedis.get(cardCntkey);
                            if (StringUtils.isEmpty(value)) {
                                return new Tuple2<String, Integer>(cardCntkey, 1);
                            } else {
                                return new Tuple2<String, Integer>(cardCntkey, Integer.parseInt(value) + 1);
                            }
                        }
                    });
                    return pairs;
             }
        
        
         /**
         * 将计算出的数据保存到codis中
         * @param pair
         */
        @SuppressWarnings("serial")
        public static <T> void save2Codis(JavaPairDStream<String, T> pair) {
            pair.foreachRDD(new VoidFunction2<JavaPairRDD<String,T>,Time>() {
                @Override
                public void call(JavaPairRDD<String, T> rdd, Time time) throws Exception {
                    rdd.foreach(new VoidFunction<Tuple2<String,T>>() {
                        @Override
                        public void call(Tuple2<String, T> tp) throws Exception {
                                Jedis jedis = CodisUtil.getJedisPool().getResource();
                                jedis.set(tp._1(), String.valueOf(tp._2()));
                                logger.info(tp._1() + ">>>" + tp._2()+",保存到Codis完成!");
                                KieServices kieServices = KieServices.Factory.get();
                                KieContainer kieContainer = kieServices.getKieClasspathContainer();
                                KieSession kieSession = kieContainer.newKieSession("helloworld");
                                ChannAmount objectChannel = new ChannAmount();
                                objectChannel.setAmount(Integer.parseInt(String.valueOf(tp._2())));
                                objectChannel.setChannel(tp._1());
                                kieSession.insert(objectChannel);
                                kieSession.fireAllRules();
                                if(jedis !=null){
                                    jedis.close();
                                }
                        }
                    });
                }
            });
        }
    
}

关于配置文件的设置

kmodule.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<kmodule xmlns="http://jboss.org/kie/6.0.0/kmodule">
    <kbase name="rules" packages="rules">
        <ksession name="helloworld"/>
    </kbase>
    <kbase name="dtables" packages="dtables">
        <ksession name="ksession-dtables"/>
    </kbase>
    <kbase name="process" packages="process">
        <ksession name="ksession-process"/>
    </kbase>
</kmodule>

riskMonitor.drl内容

package rules;

import com.dinpay.bdp.rcp.service.ChannAmount;
//其中m为对象objectChannel 的引用
rule "channel"
    when
        ChannAmount(amount>2)
    then
        System.out.println("Drools规则实现:该渠道最近5分钟交易金额超过2次 ");
end

测试OK!

原文地址:https://www.cnblogs.com/atomicbomb/p/7171512.html