FIR on Hadoop using hadoop-streaming

1.Prepare Hadoop Streaming

Hadoop streaming allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer.

1.1.Download Hadoop Streaming fit for your hadoop version

For hadoop2.4.0, you can visit the following website and download the jar file:

http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-streaming/2.4.0

Than put the jar file to the $HADOOP_HOME folder.

1.2.Hadoop-streaming test in my cluster:

bin/hadoop jar hadoop-streaming-2.4.0.jar -input /in -output /out2 -mapper /bin/cat

-reducer /usr/bin/wc

1.3.Other map reduce demo here:

Python:

http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

C++:

http://blog.sina.com.cn/s/blog_62a9902f01018rzj.html

2.FIR hardware:

This is the code provided by xilinx workshop for audio application, we just use it to do our tests.

2.1.fir.h

#ifndef _FIR_H_
#define _FIR_H_
#include "ap_cint.h"
#define N	59
#define SAMPLES N+10 // just few more samples then number of taps
typedef long	coef_t;
typedef long	data_t;
typedef long	acc_t;
#endif

2.2.fir_coef.dat

-378,
-73,
27,
170,
298,
352,
302,
168,
14,
-80,
-64,
53,
186,
216,
40,
-356,
-867,
-1283,
-1366,
-954,
-51,
1132,
2227,
2829,
2647,
1633,
25,
-1712,
-3042,
29229,
-3042,
-1712,
25,
1633,
2647,
2829,
2227,
1132,
-51,
-954,
-1366,
-1283,
-867,
-356,
40,
216,
186,
53,
-64,
-80,
14,
168,
302,
352,
298,
170,
27,
-73,
-378

2.3.fir.c

#include "fir.h"

void fir (
  data_t *y,
  data_t x
  ) {
  const coef_t c[N]={
 #include "fir_coef.dat"
    };


  static data_t shift_reg[N];
  acc_t acc;
  int i;

  acc=(acc_t)shift_reg[N-2]*(acc_t)c[N-1];
  loop: for (i=N-2;i!=0;i--) {
    acc+=(acc_t)shift_reg[i-1]*(acc_t)c[i];
    shift_reg[i]=shift_reg[i-1];
  }
  acc+=(acc_t)x*(acc_t)c[0];
  shift_reg[0]=x;
  *y = acc;
}

2.4.fir_test.c

#include <stdio.h>
#include <math.h>
#include "fir.h"
void fir (
  data_t *y,
  data_t x
  );

int main () {
  FILE   *fp;

  data_t signal, output;

  fp=fopen("fir_impulse.dat","w");
  int i;
  for (i=0;i<SAMPLES;i++) {
	  if(i==0)
		  signal = 0x8000;
	  else
		  signal = 0;
	  fir(&output,signal);
   	  printf("%i %d %d
",i,(int)signal,(int)output);
//   	  fprintf(fp,"%i %d %d
",i,signal,output);
  }
  fclose(fp);
  return 0;
}

2.5 Pictures

Here is the project in Vivado HLS:

image

Here is the Directive View:

image

In vivado Project Settings:

image

Block Design:

image

Reg Address:

image

2.6 Stand alone application:

#include <stdio.h>
#include "platform.h"
#include "xfir_hw.h"
#include "xparameters.h"
#include "xil_io.h"
#include "xfir.h"

#define SAMPLES 68


int main()
{

	int i = 0;
	int k=0;
	long signal, out;
	u32 sample;

    init_platform();
    print("Hello World

");

    //filter_hw_accel_input(&out,signal);
    print("debug");

    for (i=0;i<59;i++) {
    	if(i==0)
    		signal = 0;
    	else
    		signal = 0;
    	filter_hw_accel_input(&out,signal);
    }
    for (i=0;i<SAMPLES;i++) {
        	if(i==0)
        		signal = 1;
        	else
        		signal = 0;
        	filter_hw_accel_input(&out,signal);
        	//printf("in=0x%x ",signal);
        	printf("out=%ld
",out);
        	printf("i = %d
",i);
        	sleep(1);
        }
    sample = 0x1111;
    Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_X_DATA,sample);
    Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_AP_CTRL, 0x1); // pulse ap_start left channel
    Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_AP_CTRL, 0x0);
    while(1){
        			if(Xil_In32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_Y_CTRL))
        				break;
        			else
        				continue;
        		}
    sample = Xil_In32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_Y_DATA);
    printf("sample=0x%x
",sample);
}

void filter_hw_accel_input(long * Sample_out, long Sample_in)
    {

    		Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_X_DATA, Sample_in); // send left channel sample
    		Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_AP_CTRL, 0x1); // pulse ap_start left channel
    		Xil_Out32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_AP_CTRL, 0x0);
    		while(1){
    			if(Xil_In32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_Y_CTRL))
    				break;
    			else
    				continue;
    		}
    		*Sample_out = Xil_In32(XPAR_FIR_0_S_AXI_FIR_IO_BASEADDR+XFIR_FIR_IO_ADDR_Y_DATA);
    }

2.7 standalone application configurations:

Repo should be set to the folder that contains the driver folder in fir HLS project.

image

Board Support Package setting:

image

3 Linux:

3.1 driver:

#include <linux/module.h>
#include <linux/version.h>
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <linux/cdev.h>
#include <linux/delay.h>
#include <linux/spinlock.h>
#include <linux/device.h>
#include <linux/types.h>
#include <linux/ioctl.h>

#include <asm/io.h>
#include <asm/uaccess.h>
#include <asm/atomic.h>
#include <linux/wait.h>
#include <linux/cdev.h>

#include <linux/interrupt.h>
#include <asm/signal.h>
#include <linux/gpio.h>
#include <linux/irq.h>
#include <linux/semaphore.h>

#define DEVICE_NAME "fir_dev"
#define FIR_BASEADDR 0x43C00000


#define XFIR_DATA_IN_OFFSET 0x1C
#define XFIR_DATA_OUT_OFFSET    0x14
#define XFIR_CTRL_OFFSET    0x00
#define XFIR_DONE_OFFSET    0x10

static unsigned long fir_addr = 0;

static struct class* fir_class = NULL;
static struct device* fir_device = NULL;

static int fir_major = 0;

static struct semaphore sem;

//arg means the led number, cmd controls it on or off
static ssize_t fir_ioctl(struct file *file, unsigned int cmd, long *arg)
{
    //printk("fun(gpio_ioctl):");
    long status = 0xff;
    int ret,ii;
    printk("******** in ioctl ************
");
    switch(cmd){
        case 0:
        case 1:
            //init fir put 59 zero 
            for(ii=0;ii<59;ii++)
            {
                iowrite32(0x0,fir_addr+XFIR_DATA_IN_OFFSET);
                iowrite32(0x1,fir_addr+XFIR_CTRL_OFFSET);
                iowrite32(0x0,fir_addr+XFIR_CTRL_OFFSET);
                while(!ioread32(fir_addr+XFIR_DONE_OFFSET));
            }
            //now read data from fir y
            //status = ioread32(fir_addr+FIR_DATA_OUT_OFFSET);
            printk("59 zeros has been put to fir!!
");
            return 0;
        case 3:
            printk("in data = %ld
",*arg);
            iowrite32(*arg,fir_addr+XFIR_DATA_IN_OFFSET);
            iowrite32(0x1,fir_addr+XFIR_CTRL_OFFSET);
            iowrite32(0x0,fir_addr+XFIR_CTRL_OFFSET);
            //now wait for fir generate ok
            while(!ioread32(fir_addr+XFIR_DONE_OFFSET));
            //now read data from fir y
            status = ioread32(fir_addr+XFIR_DATA_OUT_OFFSET);
            ret = __put_user(status, (long *)arg);
            printk("out data = 0x%x
",status);
            return 0;
        case 5:
            up(&sem);
            printk("sema up
");
            return 0;
        default:
            printk("default cmd=%d
",cmd);
            return -EINVAL;
    }
}

int fir_open(struct inode *inode, struct file *filp)
{
    sema_init(&sem,1);
    down(&sem);
    printk("sema down
");
    return 0;
}


static struct file_operations fir_fops = {
    .owner = THIS_MODULE,
    .unlocked_ioctl = fir_ioctl,
    .open = fir_open,
};

static int __init fir_init(void)
{
    int ret;
    ret = register_chrdev(0,DEVICE_NAME, &fir_fops);
    if(ret < 0)
    {
        printk("fir: can't get major number
");
        return ret;
    }

    fir_major = ret;
    fir_class = class_create(THIS_MODULE, "fir_class");
    if(IS_ERR(fir_class))
    {
        printk("fir: failed in creating class
");
        unregister_chrdev(fir_major, DEVICE_NAME);
        return -1;
    }
    fir_device = device_create(fir_class,NULL,MKDEV(fir_major,0),NULL,DEVICE_NAME);
    if(IS_ERR(fir_device))
    {
        printk("fir: failed in creating device!
");
        unregister_chrdev(fir_major, DEVICE_NAME);
        class_unregister(fir_class);
        class_destroy(fir_class);
        return -1;
    }

    fir_addr = (unsigned long) ioremap(FIR_BASEADDR, sizeof(u32));

    printk("fir installed successfully!
");
    return 0;
}
static void __exit fir_exit(void)
{
    device_destroy(fir_class,MKDEV(fir_major, 0));
    class_unregister(fir_class);
    class_destroy(fir_class);
    unregister_chrdev(fir_major,DEVICE_NAME);
    printk("fir module exit!");
}

module_init(fir_init);
module_exit(fir_exit);


MODULE_AUTHOR("seg");
MODULE_LICENSE("GPL");

Makefile:

# Makefile for globalmm Driver
#           Program start 2014.3.14
# 如果已经定义了KERNELRELEASE,则说明从内核构造系统调用的。
# 因此可以利用其内建语句
ifneq ($(KERNELRELEASE),)
    obj-m := fir_driver.o
# 否则,要直接从命令行调用
# 这时要调用内核构造系统
else
KERNELDIR = /root/zybo_gpio/linux-digilent/
PWD := $(shell pwd)
default:
    $(MAKE) -C $(KERNELDIR) M=$(PWD) modules ARCH=arm
clean:
    rm -rf *.o *~ core .depend .*.cmd *.ko *.mod.c .tmp_versions modules.* Module.*
endif

3.2 test app:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/select.h>
#include <sys/time.h>

static int fir_fd;

int main(void)
{
        int i;
        long val;
        // open device
        fir_fd = open("/dev/fir_mod", 0);
        if (fir_fd < 0) {
                perror("open device gpio_dev error!
");
                exit(1);
        }
        printf("fd = %d
",fir_fd);
        printf("read fir devide successful
");
        ioctl(fir_fd,1,0);
        for(i=0;i<60;i++)
        {
                if(i==0)
                {
                        val = 1;
                        ioctl(fir_fd,3,&val);
                }
                else
                {
                        val = 0;
                        ioctl(fir_fd,3,&val);
                        printf("%d:%ld
",i,val);
                }

        }
        //ioctl(fir_fd,3,val);
        printf("done");
}

3.3 mapper.cpp

#include <iostream>
#include <stdio.h>
#include <string>
#include <fcntl.h>
#include <sys/ioctl.h>
using namespace std;


int fir_fd;

int main()
{
        string str;
        int i;
        fir_fd = open("/dev/fir_mod", 0);
//      printf("%d
",fir_fd);

        while(cin>>str)
        {
                //the key is the original value
                //cout << str << "	";

                //firstly, init fir with 58 zero
                char out_val[4];
//              printf("debug");
                ioctl(fir_fd,1,0);

                //secondly, do the real fir
                int len = str.length()/4;
                for(i=0;i<len;i++)
                {
                        str.copy(out_val+3,1,i*4+3);
                        str.copy(out_val+2,1,i*4+2);
                        str.copy(out_val+1,1,i*4+1);
                        str.copy(out_val+0,1,i*4+0);
                        ioctl(fir_fd,3,(long*)out_val);
                        cout << out_val[0] << out_val[1]<<out_val[2] << out_val[
3];
                }
                //release sem
                ioctl(fir_fd,5,0);

                //the value is fir out, and follow a line finish
                cout << endl;
        }
        return 0;
}

3.4 reducer.cpp

#include <iostream>
#include <string>
//#include <map>

using namespace std;

/*
int main() {
 map<string,int> firMap;
  map<string,int>::iterator it;
 string key;
 int value;
 int count;
 while(cin>>key>>value) {
  firMap[key] +=value;
 }
 for(it=firMap.begin();it != firMap.end();it++) {
  cout<<it->first<<"	"<<it->second<<endl;
 }
 return 0;
}
*/
int main()
{

        string str;
        while(cin >> str)
                cout << str;
        return 0;
}

3.5 test scripts:

#! /bin/sh -

#indir=/in
indir=/fir/signal100.dat
outdir=/out/hw100v3

cd /root/
if [ `ls /dev/ | grep fir_mod` -eq `echo` ]
then
        ./install_fir.sh
fi

echo fir run successful!

cd /root/hadoop-2.4.0/
bin/hadoop jar hadoop-streaming-2.4.0.jar 
-D mapred.map.tasks=1 
-D mapred.reduce.tasks=1 
-file /root/hw_fir/Mapper -mapper /root/hw_fir/Mapper 
-file /root/hw_fir/Reducer -reducer /root/hw_fir/Reducer 
-input $indir -output $outdir

echo test now finished.

done

原文地址:https://www.cnblogs.com/shenerguang/p/3891906.html