1.Prepare Hadoop Streaming
Hadoop streaming allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer.
1.1.Download Hadoop Streaming fit for your hadoop version
For hadoop2.4.0, you can visit the following website and download the jar file:
http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-streaming/2.4.0
Than put the jar file to the $HADOOP_HOME folder.
1.2.Hadoop-streaming test in my cluster:
bin/hadoop jar hadoop-streaming-2.4.0.jar -input /in -output /out2 -mapper /bin/cat
-reducer /usr/bin/wc
1.3.Other map reduce demo here:
Python:
http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
C++:
http://blog.sina.com.cn/s/blog_62a9902f01018rzj.html
2.FIR hardware:
This is the code provided by xilinx workshop for audio application, we just use it to do our tests.
2.1.fir.h
#ifndef _FIR_H_ #define _FIR_H_ #include "ap_cint.h" #define N 59 #define SAMPLES N+10 // just few more samples then number of taps typedef long coef_t; typedef long data_t; typedef long acc_t; #endif
2.2.fir_coef.dat
-378, -73, 27, 170, 298, 352, 302, 168, 14, -80, -64, 53, 186, 216, 40, -356, -867, -1283, -1366, -954, -51, 1132, 2227, 2829, 2647, 1633, 25, -1712, -3042, 29229, -3042, -1712, 25, 1633, 2647, 2829, 2227, 1132, -51, -954, -1366, -1283, -867, -356, 40, 216, 186, 53, -64, -80, 14, 168, 302, 352, 298, 170, 27, -73, -378
2.3.fir.c
#include "fir.h" void fir ( data_t *y, data_t x ) { const coef_t c[N]={ #include "fir_coef.dat" }; static data_t shift_reg[N]; acc_t acc; int i; acc=(acc_t)shift_reg[N-2]*(acc_t)c[N-1]; loop: for (i=N-2;i!=0;i--) { acc+=(acc_t)shift_reg[i-1]*(acc_t)c[i]; shift_reg[i]=shift_reg[i-1]; } acc+=(acc_t)x*(acc_t)c[0]; shift_reg[0]=x; *y = acc; }
2.4.fir_test.c
#include <stdio.h> #include <math.h> #include "fir.h" void fir ( data_t *y, data_t x ); int main () { FILE *fp; data_t signal, output; fp=fopen("fir_impulse.dat","w"); int i; for (i=0;i<SAMPLES;i++) { if(i==0) signal = 0x8000; else signal = 0; fir(&output,signal); printf("%i %d %d ",i,(int)signal,(int)output); // fprintf(fp,"%i %d %d ",i,signal,output); } fclose(fp); return 0; }
2.5 Pictures
Here is the project in Vivado HLS:
Here is the Directive View:
In vivado Project Settings:
Block Design:
Reg Address:
2.6 Stand alone application:
#include <stdio.h> #include "platform.h" #include "xfir_hw.h" #include "xparameters.h" #include "xil_io.h" #include "xfir.h" #define SAMPLES 68 int main() { int i = 0; int k=0; long signal, out; u32 sample; init_platform(); print("Hello World "); //filter_hw_accel_input(&out,signal); print("debug"); for (i=0;i<59;i++) { if(i==0) signal = 0; else signal = 0; filter_hw_accel_input(&out,signal); } for (i=0;i<SAMPLES;i++) { if(i==0) signal = 1; else signal = 0; filter_hw_accel_input(&out,signal); //printf("in=0x%x ",signal); printf("out=%ld ",out); printf("i = %d ",i); sleep(1); } sample = 0x1111; Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_X_DATA,sample); Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_AP_CTRL, 0x1); // pulse ap_start left channel Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_AP_CTRL, 0x0); while(1){ if(Xil_In32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_Y_CTRL)) break; else continue; } sample = Xil_In32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_Y_DATA); printf("sample=0x%x ",sample); } void filter_hw_accel_input(long * Sample_out, long Sample_in) { Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_X_DATA, Sample_in); // send left channel sample Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_AP_CTRL, 0x1); // pulse ap_start left channel Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_AP_CTRL, 0x0); while(1){ if(Xil_In32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_Y_CTRL)) break; else continue; } *Sample_out = Xil_In32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_Y_DATA); }
2.7 standalone application configurations:
Repo should be set to the folder that contains the driver folder in fir HLS project.
Board Support Package setting:
3 Linux:
3.1 driver:
#include <linux/module.h> #include <linux/version.h> #include <linux/kernel.h> #include <linux/init.h> #include <linux/fs.h> #include <linux/cdev.h> #include <linux/delay.h> #include <linux/spinlock.h> #include <linux/device.h> #include <linux/types.h> #include <linux/ioctl.h> #include <asm/io.h> #include <asm/uaccess.h> #include <asm/atomic.h> #include <linux/wait.h> #include <linux/cdev.h> #include <linux/interrupt.h> #include <asm/signal.h> #include <linux/gpio.h> #include <linux/irq.h> #include <linux/semaphore.h> #define DEVICE_NAME "fir_dev" #define FIR_BASEADDR 0x43C00000 #define XFIR_DATA_IN_OFFSET 0x1C #define XFIR_DATA_OUT_OFFSET 0x14 #define XFIR_CTRL_OFFSET 0x00 #define XFIR_DONE_OFFSET 0x10 static unsigned long fir_addr = 0; static struct class* fir_class = NULL; static struct device* fir_device = NULL; static int fir_major = 0; static struct semaphore sem; //arg means the led number, cmd controls it on or off static ssize_t fir_ioctl(struct file *file, unsigned int cmd, long *arg) { //printk("fun(gpio_ioctl):"); long status = 0xff; int ret,ii; printk("******** in ioctl ************ "); switch(cmd){ case 0: case 1: //init fir put 59 zero for(ii=0;ii<59;ii++) { iowrite32(0x0,fir_addr+XFIR_DATA_IN_OFFSET); iowrite32(0x1,fir_addr+XFIR_CTRL_OFFSET); iowrite32(0x0,fir_addr+XFIR_CTRL_OFFSET); while(!ioread32(fir_addr+XFIR_DONE_OFFSET)); } //now read data from fir y //status = ioread32(fir_addr+FIR_DATA_OUT_OFFSET); printk("59 zeros has been put to fir!! "); return 0; case 3: printk("in data = %ld ",*arg); iowrite32(*arg,fir_addr+XFIR_DATA_IN_OFFSET); iowrite32(0x1,fir_addr+XFIR_CTRL_OFFSET); iowrite32(0x0,fir_addr+XFIR_CTRL_OFFSET); //now wait for fir generate ok while(!ioread32(fir_addr+XFIR_DONE_OFFSET)); //now read data from fir y status = ioread32(fir_addr+XFIR_DATA_OUT_OFFSET); ret = __put_user(status, (long *)arg); printk("out data = 0x%x ",status); return 0; case 5: up(&sem); printk("sema up "); return 0; default: printk("default cmd=%d ",cmd); return -EINVAL; } } int fir_open(struct inode *inode, struct file *filp) { sema_init(&sem,1); down(&sem); printk("sema down "); return 0; } static struct file_operations fir_fops = { .owner = THIS_MODULE, .unlocked_ioctl = fir_ioctl, .open = fir_open, }; static int __init fir_init(void) { int ret; ret = register_chrdev(0,DEVICE_NAME, &fir_fops); if(ret < 0) { printk("fir: can't get major number "); return ret; } fir_major = ret; fir_class = class_create(THIS_MODULE, "fir_class"); if(IS_ERR(fir_class)) { printk("fir: failed in creating class "); unregister_chrdev(fir_major, DEVICE_NAME); return -1; } fir_device = device_create(fir_class,NULL,MKDEV(fir_major,0),NULL,DEVICE_NAME); if(IS_ERR(fir_device)) { printk("fir: failed in creating device! "); unregister_chrdev(fir_major, DEVICE_NAME); class_unregister(fir_class); class_destroy(fir_class); return -1; } fir_addr = (unsigned long) ioremap(FIR_BASEADDR, sizeof(u32)); printk("fir installed successfully! "); return 0; } static void __exit fir_exit(void) { device_destroy(fir_class,MKDEV(fir_major, 0)); class_unregister(fir_class); class_destroy(fir_class); unregister_chrdev(fir_major,DEVICE_NAME); printk("fir module exit!"); } module_init(fir_init); module_exit(fir_exit); MODULE_AUTHOR("seg"); MODULE_LICENSE("GPL");
Makefile:
# Makefile for globalmm Driver # Program start 2014.3.14 # 如果已经定义了KERNELRELEASE,则说明从内核构造系统调用的。 # 因此可以利用其内建语句 ifneq ($(KERNELRELEASE),) obj-m := fir_driver.o # 否则,要直接从命令行调用 # 这时要调用内核构造系统 else KERNELDIR = /root/zybo_gpio/linux-digilent/ PWD := $(shell pwd) default: $(MAKE) -C $(KERNELDIR) M=$(PWD) modules ARCH=arm clean: rm -rf *.o *~ core .depend .*.cmd *.ko *.mod.c .tmp_versions modules.* Module.* endif
3.2 test app:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/ioctl.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <sys/select.h> #include <sys/time.h> static int fir_fd; int main(void) { int i; long val; // open device fir_fd = open("/dev/fir_mod", 0); if (fir_fd < 0) { perror("open device gpio_dev error! "); exit(1); } printf("fd = %d ",fir_fd); printf("read fir devide successful "); ioctl(fir_fd,1,0); for(i=0;i<60;i++) { if(i==0) { val = 1; ioctl(fir_fd,3,&val); } else { val = 0; ioctl(fir_fd,3,&val); printf("%d:%ld ",i,val); } } //ioctl(fir_fd,3,val); printf("done"); }
3.3 mapper.cpp
#include <iostream> #include <stdio.h> #include <string> #include <fcntl.h> #include <sys/ioctl.h> using namespace std; int fir_fd; int main() { string str; int i; fir_fd = open("/dev/fir_mod", 0); // printf("%d ",fir_fd); while(cin>>str) { //the key is the original value //cout << str << " "; //firstly, init fir with 58 zero char out_val[4]; // printf("debug"); ioctl(fir_fd,1,0); //secondly, do the real fir int len = str.length()/4; for(i=0;i<len;i++) { str.copy(out_val+3,1,i*4+3); str.copy(out_val+2,1,i*4+2); str.copy(out_val+1,1,i*4+1); str.copy(out_val+0,1,i*4+0); ioctl(fir_fd,3,(long*)out_val); cout << out_val[0] << out_val[1]<<out_val[2] << out_val[ 3]; } //release sem ioctl(fir_fd,5,0); //the value is fir out, and follow a line finish cout << endl; } return 0; }
3.4 reducer.cpp
#include <iostream> #include <string> //#include <map> using namespace std; /* int main() { map<string,int> firMap; map<string,int>::iterator it; string key; int value; int count; while(cin>>key>>value) { firMap[key] +=value; } for(it=firMap.begin();it != firMap.end();it++) { cout<<it->first<<" "<<it->second<<endl; } return 0; } */ int main() { string str; while(cin >> str) cout << str; return 0; }
3.5 test scripts:
#! /bin/sh - #indir=/in indir=/fir/signal100.dat outdir=/out/hw100v3 cd /root/ if [ `ls /dev/ | grep fir_mod` -eq `echo` ] then ./install_fir.sh fi echo fir run successful! cd /root/hadoop-2.4.0/ bin/hadoop jar hadoop-streaming-2.4.0.jar -D mapred.map.tasks=1 -D mapred.reduce.tasks=1 -file /root/hw_fir/Mapper -mapper /root/hw_fir/Mapper -file /root/hw_fir/Reducer -reducer /root/hw_fir/Reducer -input $indir -output $outdir echo test now finished.
done