官术网_书友最值得收藏!

Application specifications

Let's start by transforming this placeholder application into an application that counts words – the Hello World equivalent for big data processing frameworks. The functionality is easy to understand and not very important, as our focus here is on the development process.

The full source code of the modified application is available at https://github.com/tweise/apex-samples/tree/master/wordcount. Here is the modified application assembly in Application.java:

@Override
public void populateDAG(DAG dag, Configuration conf)
{
LineByLineFileInputOperator lineReader = dag.addOperator("input",
new LineByLineFileInputOperator());
LineSplitter parser = dag.addOperator("parser", new LineSplitter());
UniqueCounter counter = dag.addOperator("counter", new UniqueCounter());
GenericFileOutputOperator<Object> output = dag.addOperator("output",
new GenericFileOutputOperator<>());
output.setConverter(new ToStringConverter());
dag.addStream("lines", lineReader.output, parser.input);
dag.addStream("words", parser.output, counter.data);
dag.addStream("counts", counter.count, output.input);
}

The pipeline reads from a file (LineByLineFileInputOperator), then each line is split into words (LineSplitter), then occurrences of each word are counted (UniqueCounter), and finally the result is written to the file (GenericFileOutputOperator). Apart from the LineSplitter operator, all other operators are part of the Apex library. After all the operators are added to the DAG, the pipeline is completed connecting the operator (through their ports) using addStream. This is the explicit style of composing the logical DAG (rather than using the high level API), hence the name compositional API. Note that ports must always be defined in their respective operators, and may not always be named input and output.

主站蜘蛛池模板: 白朗县| 乌什县| 尼玛县| 栾川县| 鸡泽县| 什邡市| 奇台县| 巍山| 长海县| 丹江口市| 秦安县| 遵义县| 封开县| 常熟市| 德安县| 扬州市| 泾阳县| 宁安市| 民乐县| 潞城市| 桦川县| 莱阳市| 宜昌市| 鄯善县| 博野县| 讷河市| 临高县| 宁陕县| 镇远县| 德惠市| 张北县| 湛江市| 青阳县| 辉南县| 新兴县| 江山市| 交城县| 罗源县| 宁蒗| 新邵县| 壤塘县|