官术网_书友最值得收藏!

Hadoop's "small files" problem

Hadoop's problem with small files—files that are significantly smaller than the HDFS block size—is well known. When dealing with small files as input, a Map task is created for each of these files introducing bookkeeping overheads. The same Map task is able to finish processing in a matter of a few seconds, a processing time much smaller than the time taken to spawn and cleanup the task. Each object in the NameNode occupies about 150 bytes of memory. Many small files will proliferate in the presence of these objects and adversely affect NameNode's performance and scalability. Reading a set of smaller files is also very inefficient because of the large number of disk seeks and hops across DataNodes to fetch them.

Unfortunately, small files are a reality, but there are the following strategies to handle small files:

  • Combining smaller files into a bigger file as a preprocessing step before storing it in HDFS and running the job. SequenceFile and TFile formats are popular ways of combining smaller files into a bigger file. Using Hadoop archive files (HAR) is another way of alleviating NameNode memory pressures. HAR is a meta-filesystem that resides on top of HFDS.
  • Using CombineFileInputFormat to combine multiple smaller files into InputSplit. This also takes into consideration node and rack locality for better performance. It may not relieve the memory requirements of the NameNode though, as the number of files that need to be tracked still remains the same.

To illustrate the working of CombineFileInputFormat, we have a public NSF grant proposal abstracts dataset from the years 1990 to 2003 at https://archive.ics.uci.edu/ml/datasets/NSF+Research+Award+Abstracts+1990-2003. Though the dataset has 130,000 grant proposals, we will consider a subset of 441 grants. The standard output for a MapReduce Hadoop job that reads each line from the proposals spawns 441 input splits, as shown in following snippet. In this sample job, the number of reduce tasks has been set to zero:

14/04/10 07:50:03 INFO input.FileInputFormat: Total input paths to process : 441
14/04/10 07:50:03 INFO mapreduce.JobSubmitter: number of splits:441

As we saw previously, inputs to a Hadoop MapReduce job are specified using the InputFormat, InputSplit, and RecordReader classes. In this program, we will combine all 441 proposals into a single split.

CombineFileInputFormat is an abstract class that facilitates input specifications to combine files. The only override that it expects the developer to fill is the createRecordReader() method. This is a method that instantiates a custom RecordReader class to read records. The CombineFileInputFormat class returns the CombineFileSplit object in the getSplits() method. Each split might be a combination of blocks from different files. If the setMaxSplitSize() method is used to set a maximum split size, local node files are combined in a split. Residue blocks are combined with other blocks from the same rack. However, if this value is not set, combining is not attempted at the node level; it is only attempted at the rack level. If the setMaxSplitSize() method is used to set the maximum split size to the block size in HDFS, default behavior is seen, that is, each block is a split.

The following code shows the concrete class based on this abstract class:

package MasteringHadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.util.LineReader;
import java.io.IOException;
public class MasteringHadoopCombineFileInputFormat extends CombineFileInputFormat<LongWritable, Text>{
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
        return new CombineFileRecordReader<LongWritable, Text>((CombineFileSplit) inputSplit, taskAttemptContext,MasteringHadoopCombineFileRecordReader.class);
    }
}

Tip

Downloading the example code

You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.

Note

The CombineFileFormat class has an isSplitable() method. The default setting is true, but it can be made false to ensure that a file is processed by a single Map task in its entirety.

The following code shows the custom RecordReader class that is created to return records from CombineFileSplit. The difference between CombineFileSplit and FileSplit is the presence of multiple paths implying multiple offsets and lengths. The custom RecordReader class will be called for every file in the split. Therefore, it is mandatory for the constructor of the custom RecordReader class to have an Integer index that specifies the file that is being considered for record generation.

The second important method is nextKeyValue(), which generates the next key-value pair. The getCurrentKey() and getCurrentValue() methods return this generated key-value pair. In the following example, keys are byte offsets in the file and values are lines of text. A LineReader object is used to read each line:

public static class MasteringHadoopCombineFileRecordReader extends RecordReader<LongWritable, Text>{
        private LongWritable key;
        private Text value;
        private Path path;
        private FileSystem fileSystem;
        private LineReader lineReader;
        private FSDataInputStream fsDataInputStream;
        private Configuration configuration;
        private int fileIndex;
        private CombineFileSplit combineFileSplit;
        private long start;
        private long end;

        public MasteringHadoopCombineFileRecordReader(CombineFileSplit inputSplit, TaskAttemptContext context, Integer index) throws IOException{
            this.fileIndex = index;
            this.combineFileSplit = inputSplit;
            this.configuration = context.getConfiguration();
            this.path = inputSplit.getPath(index);
            this.fileSystem = this.path.getFileSystem(configuration);
            this.fsDataInputStream = fileSystem.open(this.path);
            this.lineReader = new LineReader(this.fsDataInputStream, this.configuration);
            this.start = inputSplit.getOffset(index);
            this.end = this.start + inputSplit.getLength(index);
            this.key = new LongWritable(0);
            this.value = new Text("");

        }
      @Override
        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            //Overloaded in the constructor.
}
     @Override
 public boolean nextKeyValue() throws IOException, InterruptedException {
            int offset = 0;
            boolean isKeyValueAvailable = true;
            if(this.start < this.end){
                offset = this.lineReader.readLine(this.value);
                this.key.set(this.start);
                this.start += offset;
            }

            if(offset == 0){
                this.key.set(0);
                this.value.set("");
                isKeyValueAvailable = false;
            }

            return isKeyValueAvailable;

        }

        @Override
        public LongWritable getCurrentKey() throws IOException, InterruptedException {
            return key;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
long splitStart = this.combineFileSplit.getOffset(fileIndex);
f(this.start < this.end){
                return Math.min(1.0f, (this.start -  splitStart)/ (float) (this.end - splitStart));
            }

            return 0;
        }

        @Override
        public void close() throws IOException {
            if(lineReader != null){
                lineReader.close();
            }
        }
    }

The Mapper class and the driver program are given in the following snippet. The most important line in the driver is that which sets InputFormat as job.setInputFormatClass(MasteringHadoop.MasteringHadoopCombineFileInputFormat.class). When the program is executed, the standard output obtained is also given after the snippet. The number of splits comes up as one. The size of the corpus in this case is 5 MB while the HDFS block size is 128 MB.

package MasteringHadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;

public class CombineFilesMasteringHadoop {
    public static class CombineFilesMapper extends  Mapper<LongWritable, Text, LongWritable, Text>{

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.write(key, value);
        }
    }
public static void main(String args[]) throws IOException, InterruptedException, ClassNotFoundException{
        GenericOptionsParser parser = new GenericOptionsParser(args);
        Configuration config = parser.getConfiguration();
        String[] remainingArgs = parser.getRemainingArgs();

        Job job = Job.getInstance(config, "MasteringHadoop-CombineDemo");
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setMapperClass(CombineFilesMapper.class);
       job.setNumReduceTasks(0);
job.setInputFormatClass(MasteringHadoop.MasteringHadoopCombineFileInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
       FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
        TextOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
       job.waitForCompletion(true);
    }
}

The output is as shown as follows:

14/04/10 16:32:05 INFO input.FileInputFormat: Total input paths to process : 441
14/04/10 16:32:06 INFO mapreduce.JobSubmitter: number of splits:1
主站蜘蛛池模板: 台前县| 息烽县| 平乡县| 乡城县| 威远县| 新余市| 和平区| 屏山县| 辽源市| 富源县| 庆元县| 赤城县| 沅江市| 平定县| 万安县| 通榆县| 白朗县| 徐州市| 安西县| 蓬溪县| 张家港市| 柳林县| 新蔡县| 太仓市| 鹤岗市| 宜城市| 六盘水市| 夏邑县| 思茅市| 宽甸| 涞水县| 静乐县| 邳州市| 建德市| 德阳市| 琼海市| 化州市| 达孜县| 自贡市| 舞阳县| 梁山县|