官术网_书友最值得收藏!

  • Mastering Hadoop
  • Sandeep Karanth
  • 1165字
  • 2021-08-06 19:53:00

Hadoop's "small files" problem

Hadoop's problem with small files—files that are significantly smaller than the HDFS block size—is well known. When dealing with small files as input, a Map task is created for each of these files introducing bookkeeping overheads. The same Map task is able to finish processing in a matter of a few seconds, a processing time much smaller than the time taken to spawn and cleanup the task. Each object in the NameNode occupies about 150 bytes of memory. Many small files will proliferate in the presence of these objects and adversely affect NameNode's performance and scalability. Reading a set of smaller files is also very inefficient because of the large number of disk seeks and hops across DataNodes to fetch them.

Unfortunately, small files are a reality, but there are the following strategies to handle small files:

  • Combining smaller files into a bigger file as a preprocessing step before storing it in HDFS and running the job. SequenceFile and TFile formats are popular ways of combining smaller files into a bigger file. Using Hadoop archive files (HAR) is another way of alleviating NameNode memory pressures. HAR is a meta-filesystem that resides on top of HFDS.
  • Using CombineFileInputFormat to combine multiple smaller files into InputSplit. This also takes into consideration node and rack locality for better performance. It may not relieve the memory requirements of the NameNode though, as the number of files that need to be tracked still remains the same.

To illustrate the working of CombineFileInputFormat, we have a public NSF grant proposal abstracts dataset from the years 1990 to 2003 at https://archive.ics.uci.edu/ml/datasets/NSF+Research+Award+Abstracts+1990-2003. Though the dataset has 130,000 grant proposals, we will consider a subset of 441 grants. The standard output for a MapReduce Hadoop job that reads each line from the proposals spawns 441 input splits, as shown in following snippet. In this sample job, the number of reduce tasks has been set to zero:

14/04/10 07:50:03 INFO input.FileInputFormat: Total input paths to process : 441
14/04/10 07:50:03 INFO mapreduce.JobSubmitter: number of splits:441

As we saw previously, inputs to a Hadoop MapReduce job are specified using the InputFormat, InputSplit, and RecordReader classes. In this program, we will combine all 441 proposals into a single split.

CombineFileInputFormat is an abstract class that facilitates input specifications to combine files. The only override that it expects the developer to fill is the createRecordReader() method. This is a method that instantiates a custom RecordReader class to read records. The CombineFileInputFormat class returns the CombineFileSplit object in the getSplits() method. Each split might be a combination of blocks from different files. If the setMaxSplitSize() method is used to set a maximum split size, local node files are combined in a split. Residue blocks are combined with other blocks from the same rack. However, if this value is not set, combining is not attempted at the node level; it is only attempted at the rack level. If the setMaxSplitSize() method is used to set the maximum split size to the block size in HDFS, default behavior is seen, that is, each block is a split.

The following code shows the concrete class based on this abstract class:

package MasteringHadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.util.LineReader;
import java.io.IOException;
public class MasteringHadoopCombineFileInputFormat extends CombineFileInputFormat<LongWritable, Text>{
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
        return new CombineFileRecordReader<LongWritable, Text>((CombineFileSplit) inputSplit, taskAttemptContext,MasteringHadoopCombineFileRecordReader.class);
    }
}

Tip

Downloading the example code

You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.

Note

The CombineFileFormat class has an isSplitable() method. The default setting is true, but it can be made false to ensure that a file is processed by a single Map task in its entirety.

The following code shows the custom RecordReader class that is created to return records from CombineFileSplit. The difference between CombineFileSplit and FileSplit is the presence of multiple paths implying multiple offsets and lengths. The custom RecordReader class will be called for every file in the split. Therefore, it is mandatory for the constructor of the custom RecordReader class to have an Integer index that specifies the file that is being considered for record generation.

The second important method is nextKeyValue(), which generates the next key-value pair. The getCurrentKey() and getCurrentValue() methods return this generated key-value pair. In the following example, keys are byte offsets in the file and values are lines of text. A LineReader object is used to read each line:

public static class MasteringHadoopCombineFileRecordReader extends RecordReader<LongWritable, Text>{
        private LongWritable key;
        private Text value;
        private Path path;
        private FileSystem fileSystem;
        private LineReader lineReader;
        private FSDataInputStream fsDataInputStream;
        private Configuration configuration;
        private int fileIndex;
        private CombineFileSplit combineFileSplit;
        private long start;
        private long end;

        public MasteringHadoopCombineFileRecordReader(CombineFileSplit inputSplit, TaskAttemptContext context, Integer index) throws IOException{
            this.fileIndex = index;
            this.combineFileSplit = inputSplit;
            this.configuration = context.getConfiguration();
            this.path = inputSplit.getPath(index);
            this.fileSystem = this.path.getFileSystem(configuration);
            this.fsDataInputStream = fileSystem.open(this.path);
            this.lineReader = new LineReader(this.fsDataInputStream, this.configuration);
            this.start = inputSplit.getOffset(index);
            this.end = this.start + inputSplit.getLength(index);
            this.key = new LongWritable(0);
            this.value = new Text("");

        }
      @Override
        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            //Overloaded in the constructor.
}
     @Override
 public boolean nextKeyValue() throws IOException, InterruptedException {
            int offset = 0;
            boolean isKeyValueAvailable = true;
            if(this.start < this.end){
                offset = this.lineReader.readLine(this.value);
                this.key.set(this.start);
                this.start += offset;
            }

            if(offset == 0){
                this.key.set(0);
                this.value.set("");
                isKeyValueAvailable = false;
            }

            return isKeyValueAvailable;

        }

        @Override
        public LongWritable getCurrentKey() throws IOException, InterruptedException {
            return key;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
long splitStart = this.combineFileSplit.getOffset(fileIndex);
f(this.start < this.end){
                return Math.min(1.0f, (this.start -  splitStart)/ (float) (this.end - splitStart));
            }

            return 0;
        }

        @Override
        public void close() throws IOException {
            if(lineReader != null){
                lineReader.close();
            }
        }
    }

The Mapper class and the driver program are given in the following snippet. The most important line in the driver is that which sets InputFormat as job.setInputFormatClass(MasteringHadoop.MasteringHadoopCombineFileInputFormat.class). When the program is executed, the standard output obtained is also given after the snippet. The number of splits comes up as one. The size of the corpus in this case is 5 MB while the HDFS block size is 128 MB.

package MasteringHadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;

public class CombineFilesMasteringHadoop {
    public static class CombineFilesMapper extends  Mapper<LongWritable, Text, LongWritable, Text>{

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.write(key, value);
        }
    }
public static void main(String args[]) throws IOException, InterruptedException, ClassNotFoundException{
        GenericOptionsParser parser = new GenericOptionsParser(args);
        Configuration config = parser.getConfiguration();
        String[] remainingArgs = parser.getRemainingArgs();

        Job job = Job.getInstance(config, "MasteringHadoop-CombineDemo");
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setMapperClass(CombineFilesMapper.class);
       job.setNumReduceTasks(0);
job.setInputFormatClass(MasteringHadoop.MasteringHadoopCombineFileInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
       FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
        TextOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
       job.waitForCompletion(true);
    }
}

The output is as shown as follows:

14/04/10 16:32:05 INFO input.FileInputFormat: Total input paths to process : 441
14/04/10 16:32:06 INFO mapreduce.JobSubmitter: number of splits:1
主站蜘蛛池模板: 昌图县| 新和县| 辉南县| 博罗县| 通渭县| 新闻| 莲花县| 宁都县| 东丰县| 博客| 广昌县| 安泽县| 称多县| 大渡口区| 淅川县| 慈溪市| 华亭县| 翁牛特旗| 抚宁县| 泾阳县| 上蔡县| 万盛区| 民丰县| 平陆县| 桐柏县| 犍为县| 喀喇沁旗| 孝义市| 华阴市| 蓝田县| 天台县| 定西市| 鲜城| 吴旗县| 彰化县| 汝州市| 灵宝市| 旺苍县| 高青县| 昭平县| 普定县|