大数据离线分析平台 JavaSDK数据收集引擎编写

JavaSDK设计规则

 

 JavaSDK提供两个事件触发方法,分别为onChargeSuccess和onChargeRefund。我们在java sdk中通过一个单独的线程来发送线程数据,这样可以减少对业务系统的延时性。

SDK测试

  启动集群上的hdfs+nginx+flume进程,通过模拟数据的发送然后将数据发送到nginx服务器中,查看最终是否在hdfs中有数据的写入。

命令:

   start-dfs.sh: 启动hdfs命令

   su root:切换用户

   service nginx restart: 启动nginx进程

   启动flume进程:

       进入flume安装根目录,执行命令:


flume-ng agent --conf ./conf/ --conf-file ./conf/test2.conf --name agent &


 工程目录结构

AnalyticsEngineSDK如下:
package com.kk.ae.sdk;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

/*
 * 分析引擎sdk java服务器数据收集
 * */
public class AnalyticsEngineSDK {
    
    //日志记录对象
    private static final Logger log=Logger.getGlobal();
    //请求url的主体部分
    public static final String accessUrl="http://hadoop-001:8090/kkImg.gif";
    public static final String platformName="java_server";
    public static final String sdkName="jdk";
    private static final String version = "1";
    /**
     * 触发订单支付成功事件,发送事件数据到服务器
     * 
     * @param orderId
     *            订单支付id
     * @param memberIdd
     *            订单支付会员id
     * @return 如果发送数据成功(加入到发送队列中),那么返回true;否则返回false(参数异常&添加到发送队列失败).
     * @throws InterruptedException 
     */
    public static boolean chargeSuccess(String orderId,String memberId) throws InterruptedException {
        
        if (orderId!=null&&!orderId.isEmpty()&&memberId!=null&&!memberId.isEmpty()) {
            Map<String, String> map=new HashMap<String,String>();
            map.put("u_mid", memberId);
            map.put("oid", orderId);
            map.put("c_time", String.valueOf(System.currentTimeMillis()));
            map.put("ver", version);
            map.put("en", "e_cs");
            map.put("p1", platformName);
            map.put("sdk", sdkName);
            
            //创建url
            String url= buildUrl(map);
            // 发送url&将url加入到队列
            SendDataMonitor.addSendUrl(url);
            System.out.println(url);
            return true;
        } else {
            log.log(Level.WARNING, "订单id和会员id不能为空");
            return false;
        }
        
    }
    /**
     * 触发订单退款事件,发送退款数据到服务器
     * 
     * @param orderId
     *            退款订单id
     * @param memberIdd
     *            退款会员id
     * @return 如果发送数据成功,返回true。否则返回false。
     * @throws InterruptedException 
     */
    public static boolean chargeRefund(String orderId,String memberId) throws InterruptedException {
        if (orderId!=null&&!orderId.isEmpty()&&memberId!=null&&!memberId.isEmpty()) {
            Map<String, String> map=new HashMap<String,String>();
            map.put("u_mid", memberId);
            map.put("oid", orderId);
            map.put("c_time", String.valueOf(System.currentTimeMillis()));
            map.put("ver", version);
            map.put("en", "e_cr");
            map.put("p1", platformName);
            map.put("sdk", sdkName);
            
            //创建url
            String url= buildUrl(map);
            // 发送url&将url加入到队列
            SendDataMonitor.addSendUrl(url);
            System.out.println(url);
            return true;
        } else {
            log.log(Level.WARNING, "订单id和会员id不能为空");
            return false;
        }
        
    }
    private static String buildUrl(Map<String, String> map) {
        
        StringBuffer stringBuffer=new StringBuffer();
        stringBuffer.append(accessUrl).append("?");
        for(Map.Entry<String, String> entry:map.entrySet()) {
            if (entry.getKey()!=null&&!entry.getKey().isEmpty()&&entry.getValue()!=null&&!entry.getValue().isEmpty()) {
                {
                    try {
                        stringBuffer.append(entry.getKey().trim()).append("=").append(URLEncoder.encode(entry.getValue().trim(),"utf-8")).append("&");
                    } catch (UnsupportedEncodingException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                }
            }
            
        }
        return stringBuffer.substring(0, stringBuffer.length() - 1);
    }
        
    
    
}
SendDataMonitor 如下:
package com.kk.ae.sdk;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.ProtocolException;
import java.net.URL;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * 发送url数据的监控者,用于启动一个单独的线程来发送数据
 * 
 * @author gerry
 *
 */
public class SendDataMonitor {
    //收集日志
    public static final Logger log=Logger.getGlobal();
    // 队列,用户存储发送url
    public static final BlockingQueue<String> queue=new LinkedBlockingQueue<String>();
    //用于单例的一个类对象
    private static SendDataMonitor monitor=null;
    
    private SendDataMonitor() {
        // 私有构造方法,进行单列模式的创建
    }
    
    
    public static SendDataMonitor getMonitor() {
        if (monitor==null) {
            synchronized (SendDataMonitor.class) {
                if (monitor==null) {
                    monitor=new SendDataMonitor();
                    Thread thread=new Thread(new Runnable() {
                        
                        @Override
                        public void run() {
                        // TODO Auto-generated method stub
                        SendDataMonitor.monitor.run();    
                        
                        }
                    });
                    thread.start();
                }    
            }
        } 
        return monitor;
    }


    protected void run() {
        while (true) {
            try {
                String url=this.queue.take();
                // 正式的发送url
                HttpRequestUtil.sendData(url);
            } catch (Throwable e) {
                log.log(Level.WARNING, "发送url异常", e);
            }    
        }
    }


    public static void setMonitor(SendDataMonitor monitor) {
        SendDataMonitor.monitor = monitor;
        
    }


    /**
     * 添加一个url到队列中去
     * 
     * @param url
     * @throws InterruptedException
     */
    public static void addSendUrl(String url) throws InterruptedException {
         getMonitor().queue.put(url);
    
    }
    /**
     * 内部类,用户发送数据的http工具类
     * 
     * @author gerry
     *
     */
    public static class HttpRequestUtil{
        /**
         * 具体发送url的方法
         * 
         * @param url
         * @throws IOException
         */
        public static void sendData(String url) throws IOException {
            HttpURLConnection con=null;
            BufferedReader bf=null;
            try {
                URL obj=new URL(url);
                con=(HttpURLConnection) obj.openConnection();
                // 设置连接参数
                con.setConnectTimeout(5000);//连接过期时间
                con.setReadTimeout(5000);//读取数据过期时间
                con.setRequestMethod("GET");//设置请求类型为get
                System.out.println("发送url:" + url);
                // 发送连接请求
                bf=new BufferedReader(new InputStreamReader(con.getInputStream()));
                
            } finally {
                try {
                    if (bf!=null) {
                        bf.close();
                        
                    }
                } catch (Throwable e) {
                    // TODO: handle exception
                    
                }
                try {
                    con.disconnect();
                } catch (Throwable e) {
                    // TODO: handle exception
                }
            }
        }
    
    }

}

测试类:

package com.kk.ae.sdk;

public class Test {
    
public static void main(String[] args) {
    try {
        AnalyticsEngineSDK.chargeSuccess("order3516", "0958");
        AnalyticsEngineSDK.chargeRefund("kk3", "9009");
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
}
原文地址:https://www.cnblogs.com/Transkai/p/10724020.html