es-08-hadoop集成

1, 版本匹配: 

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/requirements.html

2, maven集成: 

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/install.html

3, pom文件配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>xiaoniubigdata</artifactId>
        <groupId>com.wenbronk</groupId>
        <version>1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>hadoop03-es</artifactId>

    <properties>
        <hadoop.version>2.7.2</hadoop.version>
        <hive.version>1.2.1</hive.version>
        <eslasticsearch.version>6.3.2</eslasticsearch.version>
    </properties>

    <dependencies>
        <!--<dependency>-->
            <!--<groupId>org.elasticsearch</groupId>-->
            <!--<artifactId>elasticsearch-hadoop</artifactId>-->
            <!--<version>6.3.2</version>-->
        <!--</dependency>-->

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-hadoop-mr</artifactId>
            <version>${eslasticsearch.version}</version>
        </dependency>

        <!--<dependency>-->
            <!--<groupId>org.elasticsearch</groupId>-->
            <!--<artifactId>elasticsearch-hadoop-hive</artifactId>-->
            <!--<version>${eslasticsearch.version}</version>-->
        <!--</dependency>-->

        <!-- hadoop -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-annotations</artifactId>
            <version>${hadoop.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-archives</artifactId>
            <version>${hadoop.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.hive.hcatalog</groupId>
            <artifactId>hive-hcatalog-core</artifactId>
            <version>${hive.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-serde</artifactId>
            <version>${hive.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>24.1-jre</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-deploy-plugin</artifactId>
                <version>2.8.2</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <!--<minimizeJar>true</minimizeJar>-->
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <relocations>
                                <relocation>
                                    <pattern>com.google.common</pattern>
                                    <shadedPattern>shadowing.com.google.common</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>io.netty</pattern>
                                    <shadedPattern>shadowing.io.netty</shadedPattern>
                                </relocation>
                            </relocations>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

4, 从es中读取数据: 

mainjob: 

package com.wenbronk.mr.es.read;

import com.wenbronk.mr.es.rw.ESRWJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.elasticsearch.hadoop.mr.EsInputFormat;


public class ESReadJob extends Configured implements Tool {

    @Override
    public int run(String[] strings) throws Exception {
        GenericOptionsParser parser = new GenericOptionsParser(strings);
        if (parser.getCommandLine() == null) {
            throw new RuntimeException("args can not be null");
        }

        Job job = Job.getInstance(getConf());
        job.setJobName("es-hadoop-read");
        job.setJarByClass(ESRWJob.class);

        job.setInputFormatClass(EsInputFormat.class);

        job.setMapperClass(ESReadMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(MapWritable.class);

        job.setReducerClass(ESReaderReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        Path outPath = new Path("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/hadoop03-es/target/out");
        FileSystem fileSystem = FileSystem.get(getConf());
        if (fileSystem.exists(outPath)) {
            fileSystem.delete(outPath, true);
        }
        FileOutputFormat.setOutputPath(job, outPath);

        return job.waitForCompletion(true) ? 0 : -1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);

        conf.set("es.nodes", "10.124.147.22:9200");
//        conf.set("es.resource", "macsearch_fileds/mac");
        conf.set("es.resource.read", "macsearch_fileds/mac");
        conf.set("es.resource.write", "sink/group");
        conf.set("es.query", "?q=me*");
        // 设置读入格式为 json, map 的 inputvalue 为 text
//        conf.set("es.output.json", "true");

        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);

        int result = ToolRunner.run(conf, new ESReadJob(), args);
        System.exit(result);
    }
}

2, mapper

package com.wenbronk.mr.es.read;

import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ESReadMapper extends Mapper<Text, MapWritable, Text, MapWritable> {

    private Text text;
    private NullWritable nullWritable;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        this.text = new Text();
        this.nullWritable = NullWritable.get();
    }

    // docId, source
    @Override
    protected void map(Text key, MapWritable value, Context context) throws IOException, InterruptedException {
        context.write(key, value);
    }
}

reducer

package com.wenbronk.mr.es.read;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import org.apache.commons.lang.ObjectUtils;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class ESReaderReducer extends Reducer<Text, MapWritable, NullWritable, Text> {

    private Text text;
    private NullWritable nullWritable;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        this.text = new Text();
        this.nullWritable = NullWritable.get();
    }

    @Override
    protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException {

        for (MapWritable value : values) {
            JSONObject jsonObject = new JSONObject();
            value.entrySet().forEach(entry -> {
                jsonObject.put(String.valueOf(entry.getKey()), entry.getValue());
            });
            text.set(jsonObject.toString());
            context.write(nullWritable, text);
        }
    }
}

如果需要更改json格式, 可见: 

git@gitlab.com:wenbronk/xiaoniubigdata.git

5, 写入es中

json格式写入, 

mainjob

package com.wenbronk.mr.es.writeJson;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.elasticsearch.hadoop.mr.EsOutputFormat;

public class ESWriteJobWriteJson extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        GenericOptionsParser parser = new GenericOptionsParser(strings);
        if (parser.getCommandLine() == null) {
            throw new RuntimeException("args can not be null");
        }

        Job job = Job.getInstance(getConf());
        job.setJobName("es-hadoop-write");
        job.setJarByClass(ESWriteJobWriteJson.class);

        job.setMapperClass(ESWriterMapperJson.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(ESWriterReducerJson.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        job.setOutputFormatClass(EsOutputFormat.class);

        Path inPath = new Path("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/hadoop03-es/target/out/part-r-00000");
        FileInputFormat.setInputPaths(job, inPath);

        return job.waitForCompletion(true) ? 0 : -1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);

        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);

        conf.set("es.nodes", "10.124.147.22:9200");
//        conf.set("es.resource", "macsearch_fileds/mac");
        conf.set("es.resource.read", "macsearch_fileds/mac");
        conf.set("es.resource.write", "sink/group");
        conf.set("es.query", "?q=me*");
        // 设置读入格式为 json, map 的 inputvalue 为 text
        conf.set("es.input.json", "true");

        int result = ToolRunner.run(conf, new ESWriteJobWriteJson(), args);
        System.exit(result);
    }
}

mapper

package com.wenbronk.mr.es.writeJson;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ESWriterMapperJson extends Mapper<LongWritable, Text, NullWritable, Text> {

    private NullWritable nullWritable;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        this.nullWritable = NullWritable.get();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(nullWritable, value);
    }
}

reducer

package com.wenbronk.mr.es.writeJson;


import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Map;

public class ESWriterReducerJson extends Reducer<NullWritable, Text, NullWritable, Text> {

    private NullWritable nullWritable;
//    private BytesWritable bytesWritable;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        this.nullWritable = NullWritable.get();
//        this.bytesWritable = new BytesWritable();
    }

    @Override
    protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        JSONObject jsonObject = new JSONObject();
        for (Text value : values) {
            context.write(nullWritable, value);
        }
    }
}

 6 从一个index读取写入另一个index

mainjob

package com.wenbronk.mr.es.rw;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;

public class ESRWJob extends Configured implements Tool {

    @Override
    public int run(String[] strings) throws Exception {

        GenericOptionsParser parser = new GenericOptionsParser(strings);
        if (parser.getCommandLine() == null) {
            throw new RuntimeException("args can not be null");
        }

        Job job = Job.getInstance(getConf());
        job.setJobName("es-hadoop-write");
        job.setJarByClass(ESRWJob.class);

        job.setInputFormatClass(EsInputFormat.class);

        job.setMapperClass(EsRWMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(MapWritable.class);

        job.setReducerClass(EsRWReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(MapWritable.class);

        job.setOutputFormatClass(EsOutputFormat.class);
        job.setNumReduceTasks(3);

        return job.waitForCompletion(true) ? 0 : -1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
//        conf.setSpeculativeExecution(false);
        // 设置用户目录优先
        conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
        // 设置es, 给定集群中一个节点的名字即可
        conf.set("es.nodes", "10.124.147.22:9200");
        // index/type, 可设置 es。resource.wirte/read, 设置为只读或者只写
//        conf.set("es.resource", "macsearch_fileds/mac");
        conf.set("es.resource.read", "macsearch_fileds/mac");
        conf.set("es.resource.write", "sink/group");
        conf.set("es.query", "?q=me*");

        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);

        int result = ToolRunner.run(conf, new ESRWJob(), args);
        System.exit(result);
    }
}

mapper

package com.wenbronk.mr.es.rw;


import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class EsRWMapper extends Mapper<Text, MapWritable, Text, MapWritable> {

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {

    }

    @Override
    protected void map(Text key, MapWritable value, Context context) throws IOException, InterruptedException {
        context.write(key, value);
    }
}

reducer

package com.wenbronk.mr.es.rw;

import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class EsRWReducer extends Reducer<Text, MapWritable, NullWritable, MapWritable> {

    private NullWritable nullWritable;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        this.nullWritable = NullWritable.get();
    }

    @Override
    protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException {
        for (MapWritable value : values) {
            context.write(nullWritable, value);
        }
    }
}
原文地址:https://www.cnblogs.com/wenbronk/p/9399925.html