MapReduce 规划 系列的12 使用Hadoop Streaming技术集成newLISP文字

本文example6环境与前Hadoop 1.x异,于Hadoop 2.x环境测试。

功能与前面相同的日志处理程序。

第一newLISP文字,游玩mapper任务。于stdin读取文本数据,将did由于key, value至1,结果是随后被输出到stdout

第二个newLISP脚本。起到reducer的作用,在stdin中读取<key, values>, key是dic。 values是全部的value,简单对value求和后。写到stdout中

最后应该能够在HDFS下看到结果。


用脚本编程的优点是方便測试,如今先开发newLISP脚本读入文件。并仿照map逻辑处理,然后交给兴许的newLISP脚本仿照reduce处理。

以下是map.lsp代码:

#!/usr/bin/newlisp

(while (read-line)
  (set 'value (parse (current-line) ","))
  (println (string (value 2) "	1"))
)

(exit)

測试一下:

cat logs/sign_2014-05-10.0.csv | ./map.lsp

结果还不错:

537025b84700aab27472b87f        1
537023124700aab27472b82a        1
537031a24700aab27472b982        1
537023c84700aab27472b841        1
537014e74700aab27472b48b        1
53702cac4700aab27472b928        1
537049cd4700aab27472ba91        1
5370dd0b4700aab27472bde4        1

将一行记录依照,拆开。放在一个list中,然后取第三个元素。也就是设备ID。之后加入 为列分隔符号,然后再加入1.

这样就转成了did 1 的形式的<key,value>给reduce。注意newLISP的代码println函数会自己主动在字符串后面加入 .


以下来实现reduce.lsp代码:

(new Tree 'my-table)

(while (read-line)
  (set 'line-value (parse (current-line) "	"))
  (set 'key (line-value 0))
  (set 'value (int (line-value 1)))
  (set 'v (my-table key))
  (if v
      (my-table key (+ v value))
    (my-table key value)
      )
)

(dolist (item (my-table)) (println (item 0) "	" (item 1)))

(exit)

首先创建了一个my-table,用来保存<key,value>

然后将map.lsp输出的数据每行依照 拆分。获取key和value

存入my-table中,用key查询,有则value加1,无key则加入进去。

最后遍历整个my-table,输出did sum 这种数据。


以下的命令能够将map和reduce脚本连起来測试:

cat logs/sign_2014-05-10.0.csv | ./map.lsp | sort | ./reduce.lsp


在hadoop集群部署的时候首先要确保newlisp二进制程序都部署在全部节点的/usr/bin/文件夹下,而且有运行权限。因为newlisp程序本身很小。所以部署及其轻松。直接scp就可以。

然后运行hadoop命令:

hadoop jar hadoop-streaming-1.0.0.jar -files map.lsp reduce.lsp -input /user/chenshu/share/logs -output /user/chenshu/share/output/lisp -mapper map.lsp -reducer reduce.lsp 




版权声明:本文博主原创文章,博客,未经同意不得转载。

原文地址:https://www.cnblogs.com/zfyouxi/p/4834844.html