InAction-根据LBS数据手机用户移动轨迹


 看了以后学了不少通信运营商关于用户数据记录的知识啊。

本来想从网上找真实数据集的,但是网上的数据不合这个DEMO的场景要求,于是用作者提供的python脚本生成一定数据量的数据来实践(当然,这些数据结构是简化了的)。


生成数据:

查看数据:

第一个是位置数据,是手机定期想基站通信报告情况产生的;第二个是上网数据,是手机上网产生的记录;

数据格式:

 

两种数据都共同包含了分析计算用到的IMSI、LOC、TIME。形成时间互补。


目标是根据数据分析计算指定日期指定时间分割里用户在不同基站的停留时间(我觉得这种计算结果没什么用,要说有用的话估计是一种分析用户下一个地点的预测应用的基础数据)


分析过程:

不同文件提取需要字段并转换时间格式的工具对象类:

  1 package org.admln.LBS;
  2 
  3 import java.text.ParseException;
  4 import java.text.SimpleDateFormat;
  5 import java.util.Date;
  6 
  7 import org.apache.hadoop.io.Text;
  8 
  9 
 10 /**
 11  * @author admln
 12  * 定义异常类
 13  * 
 14  */
 15 class LineException extends Exception {
 16     
 17     private static final long serialVersionUID = 1L;
 18     int flag;
 19     public LineException(String msg,int flag) {
 20         super(msg);
 21         this.flag = flag;
 22     }
 23     public int getFlag() {
 24         return flag;
 25     }
 26 }
 27 
 28 public class DataLine {
 29     private String IMSI,LOC,TIME,TIMEFLAG;
 30     private Date day;
 31     private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 32     
 33     public static DataLine parser(String line,boolean source,String date,String[] timeFlag) throws LineException {
 34         DataLine dataLine = new DataLine();
 35         String[] lineSplit = line.split("	");
 36         if(source) {
 37             dataLine.setIMSI(lineSplit[0]);
 38             dataLine.setLOC(lineSplit[3]);
 39             dataLine.setTIME(lineSplit[4]);
 40         }else {
 41             dataLine.setIMSI(lineSplit[0]);
 42             dataLine.setLOC(lineSplit[2]);
 43             dataLine.setTIME(lineSplit[3]);
 44         }
 45         
 46         //检查日期合法性
 47         if(!dataLine.getTIME().startsWith(date)) {
 48             throw new LineException("",-1);
 49         }
 50         try {
 51             dataLine.setDay(dataLine.getFormatter().parse(dataLine.getTIME()));
 52         }catch(ParseException e) {
 53             throw new LineException("",0);
 54         }
 55         
 56         //计算所属时段
 57 //        int hour = Integer.valueOf(dataLine.getTIME().split(" ")[1].split(":")[1]);
 58 //        System.out.println(hour + " " + Integer.valueOf(timeFlag[0]));
 59 //        for(int i=0;i<timeFlag.length;i++) {
 60 //            if(i==0 && hour<Integer.valueOf(timeFlag[0])) {
 61 //                dataLine.setTIMEFLAG("00-" + timeFlag[i]);
 62 //                break;
 63 //            }else if(hour>Integer.valueOf(timeFlag[timeFlag.length-1])){
 64 //                throw new LineException("time is error",-1);
 65 //            }else if(hour < Integer.valueOf(timeFlag[i])){
 66 //                dataLine.setTIMEFLAG(timeFlag[i-1] + "-" + timeFlag[i]);
 67 //                break;
 68 //            }
 69 //        }
 70         int i = 0, n = timeFlag.length;
 71         int hour = Integer.valueOf( dataLine.getTIME().split(" ")[1].split(":")[0] );
 72         while ( i < n && Integer.valueOf( timeFlag[i] ) <= hour )
 73             i++;
 74         if ( i < n )
 75         {
 76             if ( i == 0 )
 77                 dataLine.setTIMEFLAG( "00-" + timeFlag[i] );
 78             else
 79                 dataLine.setTIMEFLAG( timeFlag[i-1] + "-" + timeFlag[i] );
 80         }
 81         else                                     //Hour大于最大的时间点
 82             throw new LineException("", -1);
 83         
 84         return dataLine;
 85     }
 86     
 87     //输出KEY
 88     public Text outKey() {
 89         return new Text(this.IMSI + "|" + this.TIMEFLAG);
 90     }
 91     
 92     //输出VALUE
 93     public Text outValue() {
 94         long time = this.day.getTime()/1000L;
 95         return new Text(this.LOC + "|" + String.valueOf(time));
 96     }
 97     
 98     //测试主函数
 99     public static void main(String[] args) throws LineException {
100         //位置数据。                      IMSI          IMEI   UPDATETYPE LOC        TIME
101         String pos = "0000000000    0047483647    3    00000044    2013-09-12 00:09:17";
102         //网络数据.       IMSI          IMEI       LOC         TIME                URL
103         String net = "0000000000    0047483647    00000063    2013-09-12 00:20:14    www.baidu.com";
104         String[] str = {"09","17","24"};
105         DataLine temp = DataLine.parser("0000000000    0047483647    3    00000044    2013-09-12 00:15:17", true, "2013-09-12", str);
106         System.out.println(temp.getIMSI() + " " + temp.getLOC() + " " + temp.getTIME() + " " + temp.getTIMEFLAG());
107         System.out.println(temp.outKey());
108 
109     }
110 
111     public String getIMSI() {
112         return IMSI;
113     }
114 
115     public void setIMSI(String iMSI) {
116         IMSI = iMSI;
117     }
118 
119     public String getLOC() {
120         return LOC;
121     }
122 
123     public void setLOC(String lOC) {
124         LOC = lOC;
125     }
126 
127     public String getTIME() {
128         return TIME;
129     }
130 
131     public void setTIME(String tIME) {
132         TIME = tIME;
133     }
134 
135     public String getTIMEFLAG() {
136         return TIMEFLAG;
137     }
138 
139     public void setTIMEFLAG(String tIMEFLAG) {
140         TIMEFLAG = tIMEFLAG;
141     }
142 
143     public Date getDay() {
144         return day;
145     }
146 
147     public void setDay(Date day) {
148         this.day = day;
149     }
150 
151     public SimpleDateFormat getFormatter() {
152         return formatter;
153     }
154 
155     public void setFormatter(SimpleDateFormat formatter) {
156         this.formatter = formatter;
157     }
158 
159 }

主角MR:

  1 package org.admln.LBS;
  2 
  3 import java.io.IOException;
  4 import java.text.ParseException;
  5 import java.text.SimpleDateFormat;
  6 import java.util.HashMap;
  7 import java.util.Iterator;
  8 import java.util.Map.Entry;
  9 import java.util.TreeMap;
 10 
 11 import org.apache.hadoop.conf.Configuration;
 12 import org.apache.hadoop.fs.Path;
 13 import org.apache.hadoop.io.NullWritable;
 14 import org.apache.hadoop.io.Text;
 15 import org.apache.hadoop.mapreduce.Job;
 16 import org.apache.hadoop.mapreduce.Mapper;
 17 import org.apache.hadoop.mapreduce.Reducer;
 18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 19 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 20 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 21 
 22 /**
 23  * @author admln
 24  *
 25  */
 26 public class BaseStationDataPreprocess {
 27 
 28     /**  
 29      * 计数器
 30      * 用于计数各种异常数据
 31      */ 
 32     enum Counter {
 33         TIMESKIP,     //时间格式有误
 34         OUTOFTIMESKIP,//时间不在参数指定的时间段内
 35         LINESKIP,     //源文件行有误
 36         USERSKIP      //某个用户某个时间段被整个放弃
 37     }
 38     public static class baseMapper extends Mapper<Object,Text,Text,Text>{
 39         private String date;
 40         private String[] timeFlag;
 41         private boolean dataSource;
 42         @Override
 43         public void setup(Context context) throws IOException {
 44             //取出指定日期和时间间隔
 45             this.date = context.getConfiguration().get("date");
 46             this.timeFlag = context.getConfiguration().get("timeFlag").split("-");
 47             
 48             //提取文件名
 49             FileSplit fs = (FileSplit) context.getInputSplit();
 50             String fileName = fs.getPath().getName(); 
 51             if(fileName.startsWith("POS")) {
 52                 this.dataSource = true;
 53             }else if(fileName.startsWith("NET")) {
 54                 this.dataSource = false;
 55             }else {
 56                 throw new IOException("file error");
 57             }
 58         }
 59         /**  
 60          * MAP任务
 61          * 读取基站数据
 62          * 找出数据所对应时间段
 63          * 以IMSI和时间段作为 KEY
 64          * CGI和时间作为 VALUE
 65          * @throws InterruptedException 
 66          * @throws IOException 
 67          */ 
 68         @Override
 69         public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
 70             DataLine line;
 71             try {
 72                 line = DataLine.parser(value.toString(), this.dataSource, this.date, this.timeFlag);
 73             }catch(LineException e) {
 74                 if(e.getFlag()==-1) {
 75                     context.getCounter(Counter.OUTOFTIMESKIP).increment(1);
 76                 }else {
 77                     context.getCounter(Counter.TIMESKIP).increment(1);
 78                 }
 79                 return;
 80             }catch(Exception e) {
 81                 context.getCounter(Counter.LINESKIP).increment(1);
 82                 return;
 83             }
 84             
 85             context.write(line.outKey(), line.outValue());
 86         }
 87     }
 88     /**  
 89      * 统计同一个IMSI在同一时间段
 90      * 在不同CGI停留的时长
 91      */
 92     public static class baseReducer extends Reducer<Text,Text,NullWritable,Text> {
 93         private String date;
 94         private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 95         @Override
 96         public void setup(Context context) {
 97             this.date = context.getConfiguration().get("date");
 98         }
 99         public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
100             String imsi = key.toString().split("\|")[0];
101             String timeFlag = key.toString().split("\|")[1];
102             
103             TreeMap<Long,String> tree = new TreeMap<Long,String>();
104             String treeKey,treeValue;
105             for(Text val : values) {
106                 treeKey = val.toString().split("\|")[1];
107                 treeValue = val.toString().split("\|")[0];
108                 try {
109                     tree.put(Long.valueOf(treeKey),treeValue);
110                 }catch (NumberFormatException e) {
111                     context.getCounter(Counter.TIMESKIP).increment(1);
112                     continue;
113                 }
114             }
115             try {
116                 tree.put((formatter.parse(this.date + " " + timeFlag.split("-")[1] + ":00:00").getTime()/1000L), "OFF");
117                 HashMap<String, Float> locs = getStayTime(tree);
118 
119                 //输出
120                 for( Entry<String, Float> entry : locs.entrySet() )
121                 {
122                     StringBuilder sb = new StringBuilder();
123                     sb.append(imsi).append("|");
124                     sb.append(entry.getKey()).append("|");
125                     sb.append(timeFlag).append("|");
126                     sb.append(entry.getValue());
127                     
128                     context.write( NullWritable.get(), new Text(sb.toString()) );
129                 }
130             } catch (ParseException e) {
131                 e.printStackTrace();
132             }
133             
134         }
135         //获取位置停留时间方法
136         private HashMap<String,Float> getStayTime(TreeMap<Long,String> tree) {
137             HashMap<String,Float> hashMap = new HashMap<String,Float>();
138             Entry<Long,String> branch,nextBranch;
139             Iterator<Entry<Long,String>> it = tree.entrySet().iterator();
140             branch = it.next();
141             while(it.hasNext()) {
142                 nextBranch = it.next();
143                 float stay = (nextBranch.getKey()-branch.getKey())/60.0f;
144                 if(stay<60.0) {
145                     if(hashMap.containsKey(branch.getValue())) {
146                         hashMap.put(branch.getValue(),(float)(Math.round((hashMap.get(branch.getValue())+stay)*100))/100);
147                     }else {
148                         hashMap.put(branch.getValue(), (float)(Math.round(stay*100))/100);
149                     }
150                 }
151                 branch = nextBranch;
152             }
153             return hashMap;
154         }
155     }
156     @SuppressWarnings("deprecation")
157     public static void main(String[] args) throws Exception {
158         Path input = new Path("hdfs://hadoop:8020/input/baseStation/");
159         Path output = new Path("hdfs://hadoop:8020/output/baseStation/");
160         
161         Configuration conf = new Configuration();
162         
163         //把输入参数中的日期和一天内的时间间隔标志传入配置对象
164         conf.set("date", args[1]);
165         conf.set("timeFlag", args[2]);
166         
167         Job job = new Job(conf,"BaseStationDataPreprocess");
168         
169         job.setJarByClass(BaseStationDataPreprocess.class);
170         
171         job.setOutputKeyClass(Text.class);
172         job.setOutputValueClass(Text.class);
173         
174         job.setMapperClass(baseMapper.class);
175         job.setReducerClass(baseReducer.class);
176         
177         FileInputFormat.addInputPath(job, input);
178         FileOutputFormat.setOutputPath(job, output);
179         
180         System.exit(job.waitForCompletion(true)?0:1);
181 
182     }
183 
184 }

计算结果:

 

(环境是centos6.4;hadoop2.2.0;JDK1.7)


 我觉得作者的做法也不是最好的,作者肯定没有透露所有细节,肯定有更好的方法去处理误差。比如记录中第一时间和时间段的起始时间的时间差。

这个视频留了作业,关于作业1就是上面的过程。至于作业二是要求计算出每个时间段中停留时间最长的三个基站并输出。

这其实就是把每个基站的输出再根据停留时间算一个topK。


我觉得自己需要温故和补习的知识点:

 1.自定义异常

好处是自己可以定义异常中的各种动作以迎合自己程序的需要。(类java.lang.Throwable是所有异常类的基类,它包括两个子类:Exception和Error,Exception类用于描述程序能够捕获的异常,如ClassNotFoundException;自定义异常类可以继承Throwable类或者Exception,而不要继承Error类。自定义异常类之间也可以有继承关系。)需要为自定义异常类设计构造方法,以方便构造自定义异常对象。

 2.setup方法

 3.关于竖线的转义,不转义士没有语法错误的,但是实际错误。以及转义的方式,以及其他需要转移的字符也要注意,如括号、单引号等。


 特别佩服作者,不愧参加了那么多的实践项目,对数据的异常处理的很细致很全面也特别有利于事后分析修正。这是我最受用的一点。


欲为大树,何与草争;心若不动,风又奈何。
原文地址:https://www.cnblogs.com/admln/p/InAction-LBStogetmoving.html