079 微博表的设计,以及代码的设计

一:表的设计

1.需求分析

  用户发表微博,关注人可以接收到被关注人的微博

  设计三张表格。

2.第一张,微博内容表weibo-content

  RowKey : uid_timestamp 用户账号结合内容发布的时间戳

  Column Family:cf 因为rowkey是使用用户账号结合内容发布的时间戳,所以这里内容保存的版本只会有一个版本

  Column Qualifier :
    theme 主题
    content 内容
    photo 图片
    mp4 视频
    link 链接

3.第二张,用户关系表relations

  RowKey:uid

  Column Family:cf1 关注用户
    Column Qualifier:使用关注用户的uid作为列标签,
    value:也用关注用户的uid

  Column Family:cf2 粉丝用户
    Column Qualifier:使用粉丝用户的uid作为列标签,
    value:也用粉丝用户的uid

4.第三张,接受内容邮箱表 receive-content-email

  RowKey:uid 用户账号

  Column Family:cf

  Column Qualifier:以关注用户账号作为列标签
    value: 以微博内容表的rowkey作为value
    保留版本数 1000(最大版本1000,最小版本1000,版本存留时间365*24*60*60)

5.梳理框架

  当用户发表了一篇微博,内容将会保存到第一张表格,再根据第二张表格的rowkey(uid),找到关注自己的列uid。

  第三张表的rowkey都是粉丝的uid,这时候再根据第三张表找到关注人的最新的内容。

二:编程实现

1.message.java

 1 package org.apache.hadoop.hbase.weibo;
 2 
 3 import java.io.Serializable;
 4 
 5 /**
 6  * 微博内容实体类
 7  * @author ibeifeng
 8  *
 9  */
10 public class Message implements Serializable{
11     
12     private static final long serialVersionUID = 2789732708160004861L;
13 
14     private String uid;
15     
16     private  String content;
17     
18     private String timestamp;
19 
20     @Override
21     public boolean equals(Object obj) {
22         return super.equals(obj);
23     }
24 
25 
26     @Override
27     public String toString() {
28         return "uid=" + uid +",timestamp=" 
29                 + timestamp + ",content="" + content+""";
30     }
31 
32     public String getUid() {
33         return uid;
34     }
35 
36     public void setUid(String uid) {
37         this.uid = uid;
38     }
39 
40     public String getContent() {
41         return content;
42     }
43 
44     public void setContent(String content) {
45         this.content = content;
46     }
47 
48     public String getTimestamp() {
49         return timestamp;
50     }
51 
52     public void setTimestamp(String timestamp) {
53         this.timestamp = timestamp;
54     }
55     
56 }

2.weibo.java

  1 package org.apache.hadoop.hbase.weibo;
  2 
  3 import java.io.IOException;
  4 import java.util.ArrayList;
  5 import java.util.Iterator;
  6 import java.util.List;
  7 
  8 import org.apache.hadoop.conf.Configuration;
  9 import org.apache.hadoop.hbase.Cell;
 10 import org.apache.hadoop.hbase.CellUtil;
 11 import org.apache.hadoop.hbase.HBaseConfiguration;
 12 import org.apache.hadoop.hbase.HColumnDescriptor;
 13 import org.apache.hadoop.hbase.HTableDescriptor;
 14 import org.apache.hadoop.hbase.MasterNotRunningException;
 15 import org.apache.hadoop.hbase.NamespaceDescriptor;
 16 import org.apache.hadoop.hbase.TableName;
 17 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 18 import org.apache.hadoop.hbase.client.Delete;
 19 import org.apache.hadoop.hbase.client.Get;
 20 import org.apache.hadoop.hbase.client.HBaseAdmin;
 21 import org.apache.hadoop.hbase.client.HConnection;
 22 import org.apache.hadoop.hbase.client.HConnectionManager;
 23 import org.apache.hadoop.hbase.client.HTableInterface;
 24 import org.apache.hadoop.hbase.client.Put;
 25 import org.apache.hadoop.hbase.client.Result;
 26 import org.apache.hadoop.hbase.client.ResultScanner;
 27 import org.apache.hadoop.hbase.client.Scan;
 28 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 29 import org.apache.hadoop.hbase.filter.RowFilter;
 30 import org.apache.hadoop.hbase.filter.SubstringComparator;
 31 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 32 import org.apache.hadoop.hbase.util.Bytes;
 33 
 34 /**
 35  * 微博类
 36  * @author ibeifeng
 37  *
 38  */
 39 public class Weibo {
 40     
 41     static  final Configuration conf = HBaseConfiguration.create();
 42     
 43     private static final byte[] weibo_content = Bytes.toBytes("weibo:weibo-content");
 44     
 45     private static final byte[] relations =  Bytes.toBytes("weibo:relations");
 46     
 47     private static final byte[] receive_content_email = Bytes.toBytes("weibo:receive-content-email");
 48     
 49     
 50     /**
 51      * 初始化命名空间
 52      */
 53     public void initNameSpace(){
 54         HBaseAdmin admin = null;
 55         try {
 56             admin = new HBaseAdmin(conf);
 57             
 58             NamespaceDescriptor descriptor = NamespaceDescriptor.create("weibo")
 59                     .addConfiguration("creator", "ibeifeng")
 60                     .addConfiguration("createTime", System.currentTimeMillis()+"").build();
 61             admin.createNamespace(descriptor);
 62             
 63         } catch (MasterNotRunningException e) {
 64             e.printStackTrace();
 65         } catch (ZooKeeperConnectionException e) {
 66             e.printStackTrace();
 67         } catch (IOException e) {
 68             e.printStackTrace();
 69         }finally{
 70             if(admin!=null)
 71                 try {
 72                     admin.close();
 73                 } catch (IOException e) {
 74                     e.printStackTrace();
 75                 }
 76         }
 77     }
 78     
 79     
 80     /**
 81      * 初始化表
 82      */
 83     public void initTable(){
 84         HBaseAdmin admin = null;
 85         try {
 86             admin = new HBaseAdmin(conf);
 87             
 88             /*
 89              * 1、微博内容表
 90                 TableName:   weibo:weibo-content
 91                 RowKey:用户ID_timestamp
 92                 列簇:cf
 93                 列标签:    
 94                         cf:content
 95                         cf:title
 96                         cf:photo
 97                         
 98                     版本设计:只需要保留一个版本
 99              */
100             HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(weibo_content));
101             HColumnDescriptor family = new HColumnDescriptor(Bytes.toBytes("cf"));
102             // 开启列簇 -- store的块缓存
103             family.setBlockCacheEnabled(true);
104             family.setBlocksize(1024*1024*2);
105             
106             family.setCompressionType(Algorithm.SNAPPY);
107             
108             family.setMaxVersions(1);
109             family.setMinVersions(1);
110             
111             desc.addFamily(family);
112             
113             //admin.createTable(desc);
114             byte[][] splitKeys = {
115                     Bytes.toBytes("100"),
116                     Bytes.toBytes("200"),
117                     Bytes.toBytes("300")
118             };
119             admin.createTable(desc,splitKeys);
120             
121             
122             /*
123              * 2、用户关系表
124                 TableName: weibo:relations
125                 RowKey: 用户ID
126                 列簇:attend 关注用户
127                         fan  粉丝用户
128                 列标签:使用用户ID作为列标签,值为用户ID
129                 
130                 版本:只需要一个版本
131              */
132             HTableDescriptor relationTbl = new HTableDescriptor(TableName.valueOf(relations));
133             HColumnDescriptor attend = new HColumnDescriptor(Bytes.toBytes("attend"));
134             // 开启列簇 -- store的块缓存
135             attend.setBlockCacheEnabled(true);
136             attend.setBlocksize(1024*1024*2);
137             
138             attend.setCompressionType(Algorithm.SNAPPY);
139             
140             attend.setMaxVersions(1);
141             attend.setMinVersions(1);
142             
143             relationTbl.addFamily(attend);
144             
145             HColumnDescriptor fans = new HColumnDescriptor(Bytes.toBytes("fans"));
146             // 开启列簇 -- store的块缓存
147             fans.setBlockCacheEnabled(true);
148             fans.setBlocksize(1024*1024*2);
149             
150             fans.setCompressionType(Algorithm.SNAPPY);
151             
152             fans.setMaxVersions(1);
153             fans.setMinVersions(1);
154             
155             relationTbl.addFamily(fans);
156             
157             admin.createTable(relationTbl);
158             
159             /*
160              * 3、用户微博内容接收邮件箱表
161             TableName:   weibo:receive-content-email
162             RowKey:用户ID
163             列簇:cf
164             列标签:
165                 直接使用用户ID,value值取微博内容的RowKey
166                 
167                 版本:设置最大版本为1000
168              */
169             HTableDescriptor receiveContentEmail = 
170                     new HTableDescriptor(TableName.valueOf(receive_content_email));
171             HColumnDescriptor rce_cf = new HColumnDescriptor(Bytes.toBytes("cf"));
172             // 开启列簇 -- store的块缓存
173             rce_cf.setBlockCacheEnabled(true);
174             rce_cf.setBlocksize(1024*1024*2);
175             
176             rce_cf.setCompressionType(Algorithm.SNAPPY);
177             
178             rce_cf.setMaxVersions(1000);
179             rce_cf.setMinVersions(1000);
180             
181             receiveContentEmail.addFamily(rce_cf);
182             
183             admin.createTable(receiveContentEmail);
184             
185         } catch (MasterNotRunningException e) {
186             e.printStackTrace();
187         } catch (ZooKeeperConnectionException e) {
188             e.printStackTrace();
189         } catch (IOException e) {
190             e.printStackTrace();
191         }finally{
192             if(admin!=null)
193                 try {
194                     admin.close();
195                 } catch (IOException e) {
196                     e.printStackTrace();
197                 }
198         }
199     }
200     
201     /*
202      *  发布微博内容:
203      *      1)在微博内容表中插入一行数据
204      *      2)在用户微博内容接收邮件箱表对用户的所有粉丝用户添加数据
205      *  
206      *  Put
207      *  put 'tablename','rowkey','cf:cq','value'
208      */
209     public void pubishWeiboContent(String uid,String content){
210         HConnection hconn = null;
211         try {
212             hconn = HConnectionManager.createConnection(conf);
213             // 1)在微博内容表中插入一行数据
214             HTableInterface weiboContentTbl = hconn.getTable(TableName.valueOf(weibo_content));
215             // rowkey : uid_timestamp
216             long timestamp = System.currentTimeMillis();
217             String rowkey = uid+"_"+timestamp;
218             Put put = new Put(Bytes.toBytes(rowkey));
219             put.add(Bytes.toBytes("cf"), Bytes.toBytes("content"), Bytes.toBytes(content));
220             weiboContentTbl.put(put);
221                     
222             // 查询该用户的粉丝用户
223             HTableInterface relationsTbl = hconn.getTable(TableName.valueOf(relations));
224             // get 'tablename','rowkey','cf','cq'
225             Get get = new Get(Bytes.toBytes(uid));
226             // 查询粉丝列簇下的所有粉丝
227             get.addFamily(Bytes.toBytes("fans"));
228             Result r = relationsTbl.get(get);
229             
230             List<byte[]> fans = new ArrayList<byte[]>();
231             Cell[] cells = r.rawCells();
232             for(Cell c : cells){
233                 fans.add(CellUtil.cloneQualifier(c));
234             }
235             
236             if(fans.size() > 0){
237                 //2)在用户微博内容接收邮件箱表对用户的所有粉丝用户添加数据
238                 HTableInterface rceTbl = hconn.getTable(TableName.valueOf(receive_content_email));
239                 List<Put> ps = new  ArrayList<Put>();
240                 for(byte[] fanId : fans){
241                     Put p = new Put(fanId);
242 //                    p.add(Bytes.toBytes("cf"), 
243 //                            Bytes.toBytes(uid), 
244 //                            Bytes.toBytes(uid+"_"+System.currentTimeMillis()));
245                     
246                     p.add(Bytes.toBytes("cf"), 
247                             Bytes.toBytes(uid), timestamp,
248                             Bytes.toBytes(rowkey));
249                     ps.add(p);
250                 }
251                 rceTbl.put(ps);
252             }
253         } catch (IOException e) {
254             
255             e.printStackTrace();
256         }finally{
257             if(hconn!=null)
258                 try {
259                     hconn.close();
260                 } catch (IOException e) {
261                     e.printStackTrace();
262                 }
263         }
264     }
265     
266     
267     //添加关注用户
268     /**
269      * 添加关注用户
270      *     1)在微博用户关系表中,新增数据(关注用户列簇下添加标签)
271      *     2)从被添加的关注用户角度,新增粉丝用户
272      *     3)在微博邮件箱中添加关注用户发布的微博内容通知
273      * 
274      * @param uid
275      * @param attends
276      */
277     public void addAttends(String uid,String... attends){
278         
279         if(attends == null || attends.length <= 0) return ;
280         
281         HConnection hconn = null;
282         try {
283             hconn = HConnectionManager.createConnection(conf);
284             //1)在微博用户关系表中,新增数据(关注用户列簇下添加标签)
285             HTableInterface relationsTbl = hconn.getTable(TableName.valueOf(relations));
286             List<Put> ps = new ArrayList<Put>();
287             Put put = new Put(Bytes.toBytes(uid));
288             for(String attend:attends){
289                 put.add(Bytes.toBytes("attend"), Bytes.toBytes(attend), Bytes.toBytes(attend));
290                 // 2)从被添加的关注用户角度,新增粉丝用户
291                 Put attendPut = new Put(Bytes.toBytes(attend));
292                 attendPut.add(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid));
293                 ps.add(attendPut);
294             }
295             ps.add(put);
296             relationsTbl.put(ps);
297             
298             
299             //3)在微博邮件箱中添加关注用户发布的微博内容通知
300             // 先查询关注用户发布微博内容
301             HTableInterface weiboContentTbl = hconn.getTable(TableName.valueOf(weibo_content));
302             List<byte[]> rks = new ArrayList<byte[]>();
303             Scan scan = new Scan();
304             for(String attend:attends){
305                 // Filter
306                 // 扫描表的rowkey,只有rowkey含有字符串("关注用户ID_"),取出
307                 RowFilter rowFilter = 
308                         new RowFilter(CompareOp.EQUAL, new SubstringComparator(attend+"_"));
309                 scan.setFilter(rowFilter);
310                 ResultScanner resultScanner = weiboContentTbl.getScanner(scan);
311                 Iterator<Result> it = resultScanner.iterator();
312                 while(it.hasNext()){
313                     Result r = it.next();
314                     Cell[] cells = r.rawCells();
315                     for(Cell c : cells){
316                         rks.add(CellUtil.cloneRow(c));
317                     }
318                 }
319             }
320             if(rks.size() > 0){
321                 //List<byte[]> rks = new ArrayList<byte[]>();
322                 HTableInterface rceTbl = hconn.getTable(TableName.valueOf(receive_content_email));
323                 List<Put> puts = new ArrayList<Put>();
324                 for(byte[] rk : rks){
325                     Put p = new Put(Bytes.toBytes(uid));
326                     String rowkey =  Bytes.toString(rk);
327                     Long timestamp = Long.valueOf(rowkey.substring(rowkey.indexOf("_")+1));
328                     String attendId = rowkey.substring(0, rowkey.indexOf("_"));
329                     p.add(Bytes.toBytes("cf"), 
330                             Bytes.toBytes(attendId), timestamp,rk);
331                     
332                     puts.add(p);
333                 }
334                 rceTbl.put(puts);
335             }
336             
337         } catch (IOException e) {
338             e.printStackTrace();
339         }finally{
340             if(hconn!=null)
341                 try {
342                     hconn.close();
343                 } catch (IOException e) {
344                     e.printStackTrace();
345                 }
346         }
347         
348     }
349     
350     /**
351      * 取消关注用户
352      * 1)在微博用户关系表,针对该用户,删除被取消的关注用户所对应的单元格
353      * 2)在微博用户关系表,针对被取消用户,删除它们的粉丝用户
354      * 3)在微博内容接收邮件箱表中,移除该用户的这些被取消关注用户微博内容通知记录
355      * @param uid
356      * @param attends  可变长度的参数列表
357      */
358     public void removeAttends(String uid,String... attends){
359         
360         if(attends == null || attends.length <= 0) return ;
361         
362         HConnection hconn = null;
363         try {
364             hconn = HConnectionManager.createConnection(conf);
365             
366             // 1)在微博用户关系表,针对该用户,删除被取消的关注用户所对应的单元格
367             HTableInterface relationsTbl = hconn.getTable(TableName.valueOf(relations));
368             List<Delete> deletes = new ArrayList<Delete>();
369             Delete delete = new Delete(Bytes.toBytes(uid));
370             for(String attend:attends){
371                 delete.deleteColumn(Bytes.toBytes("attend"), Bytes.toBytes(attend));
372                 // 2)在微博用户关系表,针对被取消用户,删除它们的粉丝用户
373                 Delete deleteFan = new Delete(Bytes.toBytes(attend));
374                 deleteFan.deleteColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
375                 deletes.add(deleteFan);
376             }
377             deletes.add(delete);
378             relationsTbl.delete(deletes);
379             
380             
381             //3)在微博内容接收邮件箱表中,移除该用户的这些被取消关注用户微博内容通知记录
382             HTableInterface rceTbl = hconn.getTable(TableName.valueOf(receive_content_email));
383             Delete deleteRCE = new Delete(Bytes.toBytes(uid));
384             for(String attend:attends){
385                 // deleteColumn删除最近版本
386                 //deleteRCE.deleteColumn(Bytes.toBytes("cf"), Bytes.toBytes(attend));
387                 // 删除单元格的所有版本
388                 Long timestamp = System.currentTimeMillis();
389                 deleteRCE.deleteColumns(Bytes.toBytes("cf"), Bytes.toBytes(attend),timestamp+1000000);
390             }
391             rceTbl.delete(deleteRCE);
392             
393         } catch (IOException e) {
394             e.printStackTrace();
395         }finally{
396             if(hconn!=null)
397                 try {
398                     hconn.close();
399                 } catch (IOException e) {
400                     e.printStackTrace();
401                 }
402         }
403     }
404     
405     /**
406      * 用户获取所关注用户的微博内容
407      * 1) 从微博内容接收邮件箱表中获取用户其关注用户的微博内容 rowkey
408      * 2)从微博内容表中取出微博内容
409      * 
410      * 
411      * @param uid
412      * @return
413      */
414     public List<Message> getAttendContents(String uid){
415         
416         List<Message>  msgs = new ArrayList<Message>();
417         HConnection hconn = null;
418         try {
419             hconn = HConnectionManager.createConnection(conf);
420             // 1) 从微博内容接收邮件箱表中获取用户其关注用户的微博内容 rowkey
421             HTableInterface rceTbl = hconn.getTable(TableName.valueOf(receive_content_email));
422             Get get = new Get(Bytes.toBytes(uid));
423             get.setMaxVersions(5);
424             Result r = rceTbl.get(get);
425             List<byte[]> rks = new ArrayList<byte[]>();
426             Cell[] cells = r.rawCells();
427             if(cells != null && cells.length > 0){
428                 for(Cell c : cells){
429                     
430                     byte[] rk = CellUtil.cloneValue(c);
431                     rks.add(rk);
432                 }
433             }
434             
435             //2)从微博内容表中取出微博内容
436             if(rks.size() > 0){
437                 HTableInterface weiboContentTbl = hconn.getTable(TableName.valueOf(weibo_content));
438                 List<Get> gets = new  ArrayList<Get>();
439                 for(byte[] rk : rks){
440                     Get g = new Get(rk);
441                     gets.add(g);
442                 }
443                 
444                 Result[] results = weiboContentTbl.get(gets);
445                 for(Result result : results){
446                     Cell[] cls = result.rawCells();
447                     for(Cell cell : cls){
448                         Message msg = new Message();
449                         String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
450                         String attendUid = rowkey.substring(0, rowkey.indexOf("_"));
451                         msg.setUid(attendUid);
452                         String timestamp = rowkey.substring(rowkey.indexOf("_")+1);
453                         msg.setTimestamp(timestamp);
454                         
455                         String content = Bytes.toString(CellUtil.cloneValue(cell));
456                         msg.setContent(content);
457                         
458                         msgs.add(msg);
459                     }
460                 }
461             }
462             
463         } catch (IOException e) {
464             e.printStackTrace();
465         }finally{
466             if(hconn!=null)
467                 try {
468                     hconn.close();
469                 } catch (IOException e) {
470                     e.printStackTrace();
471                 }
472         }
473         return msgs;
474     } 
475     
476     public static void main(String[] args) {
477         Weibo wb = new  Weibo();
478         wb.initNameSpace();
479         wb.initTable();
480         //wb.pubishWeiboContent("0001", "今天天气真不错!");
481         //wb.pubishWeiboContent("0003", "今天天气真不错!");
482         //wb.pubishWeiboContent("0003", "今天天气真不错!");
483         //wb.pubishWeiboContent("0004", "今天天气真不错!");
484         //wb.pubishWeiboContent("0004", "今天天气真不错!");
485         //wb.pubishWeiboContent("0005", "今天天气真不错!");
486         
487         //wb.addAttends("0001", "0003","0004","0005");
488         //wb.removeAttends("0001", "0003");
489         
490         //List<Message> msgs = wb.getAttendContents("0001");
491         
492         //System.out.println(msgs);
493         
494         for(int i=0;i < 1000 ;i++){
495             wb.pubishWeiboContent(String.format("%04d", i), "今天天气真不错!" + i);
496         }
497         
498         
499     }
500 
501 }

  

  

原文地址:https://www.cnblogs.com/juncaoit/p/6168907.html