- Mastering Hadoop
- Sandeep Karanth
- 1675字
- 2021-08-06 19:53:01
Handling data joins
Joins are commonplace in Big Data processing. They occur on the value of a join key and on a data type in the datasets that participate in a join. In this book, we will refrain from explaining the different join semantics such as inner joins, outer joins, and cross joins, and focus on inner join processing using MapReduce and the optimizations involved in it.
In MapReduce, joins can be done in either the Map task or the Reduce task. The former is called a Map-side join and the latter is called a Reduce-side join.
Reduce-side joins
Reduce-side joins are meant for more general purposes and do not impose too many conditions on the datasets that participate in the join. However, the shuffle step is very heavy on resources.
The basic idea involves tagging each record with a data source tag and extracting the join key in the Map tasks. The Reduce task receives all the records with the same join key and does the actual join. If one of the datasets participating in the join is very small, it can be distributed via a side channel such as the DistributedCache to every Reduce task.
For the Reduce-side joins to work, there are the following requirements:
- There needs to be a way of specifying the
InputFormat
andMapper
classes for the different datasets participating in the join. TheMultipleInputs
class is designed for this purpose. For a smaller file, the DistributedCache API can be used. A Map-side join, which will be explained later, shows how to use this side-file distribution channel. - Secondary sorting capability needs to be there for optimal Reduce-side joins. The sorting of the join keys will happen, but it is important that the source is also sorted for each matching join key. By secondary sorting, one source occurs after the other, eliminating the need to hold all records for a particular key in the memory.
The following example illustrates Reduce-side joins. The dataset contains world cities and some information about the cities, the country code being one of them. It is available at http://dev.maxmind.com/geoip/legacy/geolite/ in a CSV format. Countries have a two-letter ISO code. The countrycodes.txt
file was taken from http://www.spoonfork.org/isocodes.html.
In this example and in subsequent examples of joins, the ISO code for the country is the join key. This key is used to get the country name and the total population of that country calculated by summing up the population of its individual cities. The join can be done by the following steps:
- A custom
Writable
data type needs to be implemented to have the dataset tag information within the key. The following code shows the implementation of such a composite key:package MasteringHadoop; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class CompositeJoinKeyWritable implements WritableComparable<CompositeJoinKeyWritable> { private Text key = new Text(); private IntWritable source = new IntWritable(); public CompositeJoinKeyWritable(){ } public CompositeJoinKeyWritable(String key, int source){ this.key.set(key); this.source.set(source); } public IntWritable getSource(){ return this.source; } public Text getKey(){ return this.key; } public void setSource(int source){ this.source.set(source); } public void setKey(String key){ this.key.set(key); } @Override public void write(DataOutput dataOutput) throws IOException { this.key.write(dataOutput); this.source.write(dataOutput); } @Override public void readFields(DataInput dataInput) throws IOException { this.key.readFields(dataInput); this.source.readFields(dataInput); } @Override public int compareTo(CompositeJoinKeyWritable o) { int result = this.key.compareTo(o.key); if(result == 0){ return this.source.compareTo(o.source); } return result; } @Override public boolean equals(Object obj){ if(obj instanceof CompositeJoinKeyWritable){ CompositeJoinKeyWritable joinKeyWritable = (CompositeJoinKeyWritable)obj; return (key.equals(joinKeyWritable.key) && source.equals(joinKeyWritable.source)); } return false; } }
- A custom
Partitioner
class needs to be implemented.Partitioner
must partition the data based on the natural join key only; in this case, it is the ISO country code. This ensures that all the cities with the same country code are processed by the same Reduce task. The following code gives an implementation of a customPartitioner
class:public static class CompositeJoinKeyPartitioner extends Partitioner<CompositeJoinKeyWritable, Text>{ @Override public int getPartition(CompositeJoinKeyWritable key, Text value, int i) { return (key.getKey().hashCode() % i); } }
- A custom grouping comparator needs to be written. Again, like the partitioner, the grouping has to be done on the natural key alone. The following code shows the grouping comparator for the composite key:
public static class CompositeJoinKeyComparator extends WritableComparator{ protected CompositeJoinKeyComparator(){ super(CompositeJoinKeyWritable.class, true); } @Override public int compare(Object a, Object b) { CompositeJoinKeyWritable compositeKey1 = (CompositeJoinKeyWritable) a; CompositeJoinKeyWritable compositeKey2 = (CompositeJoinKeyWritable) b; return compositeKey1.getKey().compareTo(compositeKey2.getKey()); } }
- The
Mapper
classes have to be written for each kind of input datasets. In the following example, twoMapper
classes are present: one for the city dataset and the other for the country dataset. The country dataset has a number less than the city dataset. This is done for efficiency. When a secondary sort is done on the dataset keys, the country dataset record appears before the city records at the Reducer:public static class MasteringHadoopReduceSideJoinCountryMap extends Mapper<LongWritable, Text, CompositeJoinKeyWritable, Text>{ private static short COUNTRY_CODE_INDEX = 0; private static short COUNTRY_NAME_INDEX = 1; private static CompositeJoinKeyWritable joinKeyWritable = new CompositeJoinKeyWritable("", 1); private static Text recordValue = new Text(""); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split(",", -1); if(tokens != null){ joinKeyWritable.setKey(tokens[COUNTRY_CODE_INDEX]); recordValue.set(tokens[COUNTRY_NAME_INDEX]); context.write(joinKeyWritable, recordValue); } } } public static class MasteringHadoopReduceSideJoinCityMap extends Mapper<LongWritable, Text, CompositeJoinKeyWritable, Text>{ private static short COUNTRY_CODE_INDEX = 0; private static CompositeJoinKeyWritable joinKeyWritable = new CompositeJoinKeyWritable("", 2); private static Text record = new Text(""); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split(",", -1); if(tokens != null){ joinKeyWritable.setKey(tokens[COUNTRY_CODE_INDEX]); record.set(value.toString()); context.write(joinKeyWritable, record); } } }
- The
Reducer
class takes advantage of secondary sorting to emit the joined records. The first value of the iterator is the country record. The name of the country is stored away and the population is calculated based on other records.public static class MasteringHadoopReduceSideJoinReduce extends Reducer<CompositeJoinKeyWritable, Text, Text, LongWritable>{ private static LongWritable populationValue = new LongWritable(0); private static Text countryValue = new Text(""); private static short POPULATION_INDEX = 4; @Override protected void reduce(CompositeJoinKeyWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { long populationTotal = 0; boolean firstRecord = true; String country = null; for(Text record : values){ String[] tokens = record.toString().split(",", -1); if(firstRecord){ firstRecord = false; if(tokens.length > 1) break; else country = tokens[0]; } else{ String populationString = tokens[POPULATION_INDEX]; if(populationString != null && populationString.isEmpty() == false){ populationTotal += Long.parseLong(populationString); } } } if(country != null){ populationValue.set(populationTotal); countryValue.set(country); context.write(countryValue, populationValue); } } }
- The driver program specifies all the custom data types that are required to do the Reduce-side join:
public static void main(String args[]) throws IOException, InterruptedException, ClassNotFoundException{ GenericOptionsParser parser = new GenericOptionsParser(args); Configuration config = parser.getConfiguration(); String[] remainingArgs = parser.getRemainingArgs(); Job job = Job.getInstance(config, "MasteringHadoop-ReduceSideJoin"); job.setMapOutputKeyClass(CompositeJoinKeyWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setReducerClass(MasteringHadoopReduceSideJoinReduce.class); job.setPartitionerClass(CompositeJoinKeyPartitioner.class); job.setGroupingComparatorClass(CompositeJoinKeyComparator.class); job.setNumReduceTasks(3); MultipleInputs.addInputPath(job, new Path(remainingArgs[0]), TextInputFormat.class, MasteringHadoopReduceSideJoinCountryMap.class); MultipleInputs.addInputPath(job, new Path(remainingArgs[1]), TextInputFormat.class, MasteringHadoopReduceSideJoinCityMap.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(remainingArgs[2])); job.waitForCompletion(true); }
Map-side joins
Map-side joins, on the contrary, require either of two conditions satisfied in the datasets they join. These conditions are as follows:
- In addition to the presence of join keys, all inputs must be sorted using the join keys. The input datasets must have the same number of partitions. All records with the same key must reside in the same partition. Map-side joins are particularly attractive when operated on outputs of other MapReduce jobs. Such conditions are automatically satisfied in these cases. The
CompositeInputFormat
class can be used to run Map-side joins on such datasets. The configurations for inputs and join types can be specified using properties. - If one of the datasets is small enough, side file distribution channels such as the DistributedCache can be used to do a Map-side join.
In the following example, the countries file is distributed across all nodes. During Map task setup, it is loaded into the memory onto a TreeMap
data structure. The setup()
method of the Mapper
class is overridden to load the smaller data set in memory:
package MasteringHadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.LineReader; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.TreeMap; public class MasteringHadoopMapSideJoin { public static class MasteringHadoopMapSideJoinMap extends Mapper<LongWritable, Text, Text, LongWritable> { private static short COUNTRY_CODE_INDEX = 0; private static short COUNTRY_NAME_INDEX = 1; private static short POPULATION_INDEX = 4; private TreeMap<String, String> countryCodesTreeMap = new TreeMap<String, String>(); private Text countryKey = new Text(""); private LongWritable populationValue = new LongWritable(0); @Override protected void setup(Context context) throws IOException, InterruptedException { URI[] localFiles = context.getCacheFiles(); String path = null; for(URI uri : localFiles){ path = uri.getPath(); if(path.trim().equals("countrycodes.txt")){ break; } } if(path != null){ getCountryCodes(path, context); } }
The getCountryCodes()
private method, given as follows, is used to read the side file from the DistributedCache. Each line is processed and stored in the TreeMap
instance. This method is a part of the Mapper
class as well:
private void getCountryCodes(String path, Context context) throws IOException{ Configuration configuration = context.getConfiguration(); FileSystem fileSystem = FileSystem.get(configuration); FSDataInputStream in = fileSystem.open(new Path(path)); Text line = new Text(""); LineReader lineReader = new LineReader(in, configuration); int offset = 0; do{ offset = lineReader.readLine(line); if(offset > 0){ String[] tokens = line.toString().split(",", -1); countryCodesTreeMap.put(tokens[COUNTRY_CODE_INDEX], tokens[COUNTRY_NAME_INDEX]); } }while(offset != 0); }
The map override method of the Mapper is where the join takes place. Each key is checked against the TreeMap
data structure for a match. If a match exists, a joined record is emitted:
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String cityRecord = value.toString(); String[] tokens = cityRecord.split(",", -1); String country = tokens[COUNTRY_CODE_INDEX]; String populationString = tokens[POPULATION_INDEX]; if(country != null && country.isEmpty() == false){ if(populationString != null && populationString.isEmpty() == false){ long population = Long.parseLong(populationString); String countryName = countryCodesTreeMap.get(country); if(countryName == null) countryName = country; countryKey.set(countryName); populationValue.set(population); context.write(countryKey, populationValue); } } } }
The Reduce task is a simple task that reduces on the join key and calculates the total population in a country. The code is given as follows:
public static class MasteringHadoopMapSideJoinReduce extends Reducer<Text, LongWritable, Text, LongWritable>{ private static LongWritable populationValue = new LongWritable(0); @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long populationTotal = 0; for(LongWritable population : values){ populationTotal += population.get(); } populationValue.set(populationTotal); context.write(key, populationValue); } }
- 21天學(xué)通JavaScript
- Mastering D3.js
- Photoshop CS3圖像處理融會貫通
- 3D Printing for Architects with MakerBot
- Learning C for Arduino
- Implementing AWS:Design,Build,and Manage your Infrastructure
- LAMP網(wǎng)站開發(fā)黃金組合Linux+Apache+MySQL+PHP
- Bayesian Analysis with Python
- AI的25種可能
- Learning Apache Apex
- 青少年VEX IQ機(jī)器人實(shí)訓(xùn)課程(初級)
- MongoDB 4 Quick Start Guide
- MySQL Management and Administration with Navicat
- Flash CS3動畫制作融會貫通
- 特征工程入門與實(shí)踐