- Mastering Hadoop
- Sandeep Karanth
- 1165字
- 2021-08-06 19:53:00
Hadoop's "small files" problem
Hadoop's problem with small files—files that are significantly smaller than the HDFS block size—is well known. When dealing with small files as input, a Map task is created for each of these files introducing bookkeeping overheads. The same Map task is able to finish processing in a matter of a few seconds, a processing time much smaller than the time taken to spawn and cleanup the task. Each object in the NameNode occupies about 150 bytes of memory. Many small files will proliferate in the presence of these objects and adversely affect NameNode's performance and scalability. Reading a set of smaller files is also very inefficient because of the large number of disk seeks and hops across DataNodes to fetch them.
Unfortunately, small files are a reality, but there are the following strategies to handle small files:
- Combining smaller files into a bigger file as a preprocessing step before storing it in HDFS and running the job.
SequenceFile
andTFile
formats are popular ways of combining smaller files into a bigger file. Using Hadoop archive files (HAR) is another way of alleviating NameNode memory pressures. HAR is a meta-filesystem that resides on top of HFDS. - Using
CombineFileInputFormat
to combine multiple smaller files intoInputSplit
. This also takes into consideration node and rack locality for better performance. It may not relieve the memory requirements of the NameNode though, as the number of files that need to be tracked still remains the same.
To illustrate the working of CombineFileInputFormat
, we have a public NSF grant proposal abstracts dataset from the years 1990 to 2003 at https://archive.ics.uci.edu/ml/datasets/NSF+Research+Award+Abstracts+1990-2003. Though the dataset has 130,000 grant proposals, we will consider a subset of 441 grants. The standard output for a MapReduce Hadoop job that reads each line from the proposals spawns 441 input splits, as shown in following snippet. In this sample job, the number of reduce tasks has been set to zero:
14/04/10 07:50:03 INFO input.FileInputFormat: Total input paths to process : 441 14/04/10 07:50:03 INFO mapreduce.JobSubmitter: number of splits:441
As we saw previously, inputs to a Hadoop MapReduce job are specified using the InputFormat
, InputSplit
, and RecordReader
classes. In this program, we will combine all 441 proposals into a single split.
CombineFileInputFormat
is an abstract class that facilitates input specifications to combine files. The only override that it expects the developer to fill is the createRecordReader()
method. This is a method that instantiates a custom RecordReader
class to read records. The CombineFileInputFormat
class returns the CombineFileSplit
object in the getSplits()
method. Each split might be a combination of blocks from different files. If the setMaxSplitSize()
method is used to set a maximum split size, local node files are combined in a split. Residue blocks are combined with other blocks from the same rack. However, if this value is not set, combining is not attempted at the node level; it is only attempted at the rack level. If the setMaxSplitSize()
method is used to set the maximum split size to the block size in HDFS, default behavior is seen, that is, each block is a split.
The following code shows the concrete class based on this abstract class:
package MasteringHadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.util.LineReader; import java.io.IOException; public class MasteringHadoopCombineFileInputFormat extends CombineFileInputFormat<LongWritable, Text>{ @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException { return new CombineFileRecordReader<LongWritable, Text>((CombineFileSplit) inputSplit, taskAttemptContext,MasteringHadoopCombineFileRecordReader.class); } }
Tip
Downloading the example code
You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.
The following code shows the custom RecordReader
class that is created to return records from CombineFileSplit
. The difference between CombineFileSplit
and FileSplit
is the presence of multiple paths implying multiple offsets and lengths. The custom RecordReader
class will be called for every file in the split. Therefore, it is mandatory for the constructor of the custom RecordReader
class to have an Integer
index that specifies the file that is being considered for record generation.
The second important method is nextKeyValue()
, which generates the next key-value pair. The getCurrentKey()
and getCurrentValue()
methods return this generated key-value pair. In the following example, keys are byte offsets in the file and values are lines of text. A LineReader
object is used to read each line:
public static class MasteringHadoopCombineFileRecordReader extends RecordReader<LongWritable, Text>{ private LongWritable key; private Text value; private Path path; private FileSystem fileSystem; private LineReader lineReader; private FSDataInputStream fsDataInputStream; private Configuration configuration; private int fileIndex; private CombineFileSplit combineFileSplit; private long start; private long end; public MasteringHadoopCombineFileRecordReader(CombineFileSplit inputSplit, TaskAttemptContext context, Integer index) throws IOException{ this.fileIndex = index; this.combineFileSplit = inputSplit; this.configuration = context.getConfiguration(); this.path = inputSplit.getPath(index); this.fileSystem = this.path.getFileSystem(configuration); this.fsDataInputStream = fileSystem.open(this.path); this.lineReader = new LineReader(this.fsDataInputStream, this.configuration); this.start = inputSplit.getOffset(index); this.end = this.start + inputSplit.getLength(index); this.key = new LongWritable(0); this.value = new Text(""); } @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { //Overloaded in the constructor. } @Override public boolean nextKeyValue() throws IOException, InterruptedException { int offset = 0; boolean isKeyValueAvailable = true; if(this.start < this.end){ offset = this.lineReader.readLine(this.value); this.key.set(this.start); this.start += offset; } if(offset == 0){ this.key.set(0); this.value.set(""); isKeyValueAvailable = false; } return isKeyValueAvailable; } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { long splitStart = this.combineFileSplit.getOffset(fileIndex); f(this.start < this.end){ return Math.min(1.0f, (this.start - splitStart)/ (float) (this.end - splitStart)); } return 0; } @Override public void close() throws IOException { if(lineReader != null){ lineReader.close(); } } }
The Mapper
class and the driver program are given in the following snippet. The most important line in the driver is that which sets InputFormat
as job.setInputFormatClass(MasteringHadoop.MasteringHadoopCombineFileInputFormat.class)
. When the program is executed, the standard output obtained is also given after the snippet. The number of splits comes up as one. The size of the corpus in this case is 5 MB while the HDFS block size is 128 MB.
package MasteringHadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
public class CombineFilesMasteringHadoop {
public static class CombineFilesMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
public static void main(String args[]) throws IOException, InterruptedException, ClassNotFoundException{
GenericOptionsParser parser = new GenericOptionsParser(args);
Configuration config = parser.getConfiguration();
String[] remainingArgs = parser.getRemainingArgs();
Job job = Job.getInstance(config, "MasteringHadoop-CombineDemo");
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(CombineFilesMapper.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(MasteringHadoop.MasteringHadoopCombineFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
TextOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
job.waitForCompletion(true);
}
}
The output is as shown as follows:
14/04/10 16:32:05 INFO input.FileInputFormat: Total input paths to process : 441 14/04/10 16:32:06 INFO mapreduce.JobSubmitter: number of splits:1