Hadoop map任务数量的计算

Hadoop中决定map个数的的因素有几个,由于版本的不同,决定因素也不一样,掌握这些因素对了解hadoop分片的划分有很大帮助,

并且对优化hadoop性能也很有大的益处。

旧API中getSplits方法:

 1 public InputSplit[] getSplits(JobConf job, int numSplits)
 2     throws IOException {
 3     FileStatus[] files = listStatus(job);
 4     
 5     // Save the number of input files in the job-conf
 6     job.setLong(NUM_INPUT_FILES, files.length);
 7     long totalSize = 0;                           // compute total size
 8     for (FileStatus file: files) {                // check we have valid files
 9       if (file.isDir()) {
10         throw new IOException("Not a file: "+ file.getPath());
11       }
12       totalSize += file.getLen();
13     }
14 
15     long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
16     long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
17                             minSplitSize);
18 
19     // generate splits
20     ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
21     NetworkTopology clusterMap = new NetworkTopology();
22     for (FileStatus file: files) {
23       Path path = file.getPath();
24       FileSystem fs = path.getFileSystem(job);
25       long length = file.getLen();
26       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
27       if ((length != 0) && isSplitable(fs, path)) { 
28         long blockSize = file.getBlockSize();
29         long splitSize = computeSplitSize(goalSize, minSize, blockSize);
30 
31         long bytesRemaining = length;
32         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
33           String[] splitHosts = getSplitHosts(blkLocations, 
34               length-bytesRemaining, splitSize, clusterMap);
35           splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
36               splitHosts));
37           bytesRemaining -= splitSize;
38         }
39         
40         if (bytesRemaining != 0) {
41           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
42                      blkLocations[blkLocations.length-1].getHosts()));
43         }
44       } else if (length != 0) {
45         String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
46         splits.add(new FileSplit(path, 0, length, splitHosts));
47       } else { 
48         //Create empty hosts array for zero length files
49         splits.add(new FileSplit(path, 0, length, new String[0]));
50       }
51     }
52     LOG.debug("Total # of splits: " + splits.size());
53     return splits.toArray(new FileSplit[splits.size()]);
54   }
55 
56   protected long computeSplitSize(long goalSize, long minSize,
57                                        long blockSize) {
58     return Math.max(minSize, Math.min(goalSize, blockSize));
59   }
View Code

新API中getSplits方法:

 1 public List<InputSplit> getSplits(JobContext job
 2                                     ) throws IOException {
 3     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
 4     long maxSize = getMaxSplitSize(job);
 5 
 6     // generate splits
 7     List<InputSplit> splits = new ArrayList<InputSplit>();
 8     List<FileStatus>files = listStatus(job);
 9     for (FileStatus file: files) {
10       Path path = file.getPath();
11       FileSystem fs = path.getFileSystem(job.getConfiguration());
12       long length = file.getLen();
13       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
14       if ((length != 0) && isSplitable(job, path)) { 
15         long blockSize = file.getBlockSize();
16         long splitSize = computeSplitSize(blockSize, minSize, maxSize);
17 
18         long bytesRemaining = length;
19         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
20           int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
21           splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
22                                    blkLocations[blkIndex].getHosts()));
23           bytesRemaining -= splitSize;
24         }
25         
26         if (bytesRemaining != 0) {
27           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
28                      blkLocations[blkLocations.length-1].getHosts()));
29         }
30       } else if (length != 0) {
31         splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
32       } else { 
33         //Create empty hosts array for zero length files
34         splits.add(new FileSplit(path, 0, length, new String[0]));
35       }
36     }
37     
38     // Save the number of input files in the job-conf
39     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
40 
41     LOG.debug("Total # of splits: " + splits.size());
42     return splits;
43   }
44 
45   protected long computeSplitSize(long blockSize, long minSize,
46                                   long maxSize) {
47     return Math.max(minSize, Math.min(maxSize, blockSize));
48   }
View Code

 测试一个输入文件大小为:0.52 KB 日志如下:

new :
blockSize:67108864 minSize:1 maxSize:9223372036854775807
splitSize:67108864

决定因素为 blockSize的大小.这个很容易理解

old:
blockSize:67108864 totalSize:529 numSplits:2 goalSize:264 minSplitSize:1 minSize:1
splitSize:264

numSplits为2,这个是在调用getSplits中传入的,这个地方要注意,经过查找发现这个参数为job.getNumMapTasks()的值如下

JobConf: public int getNumMapTasks() { return getInt("mapred.map.tasks", 1); }

mapred-default.xml中:

<property>
<name>mapred.map.tasks</name>
<value>2</value>
<description>The default number of map tasks per job.
Ignored when mapred.job.tracker is "local".
</description>
</property>

所以使用旧的API编写的MP程序,会产生2个map,而使用新的API则会产生1个map.

原文地址:https://www.cnblogs.com/lvfeilong/p/23849jffdslkfjd.html