- Mastering Apache Storm
- Ankit Jain
- 1019字
- 2021-07-02 20:32:29
Developing the hello world example
Before starting the development, you should have Eclipse and Maven installed in your project. The sample topology explained here will cover how to create a basic Storm project, including a spout and bolt, and how to build, and execute them.
Create a Maven project by using com.stormadvance as groupId and storm-example as artifactId.
Add the following Maven dependencies to the pom.xml file:
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.2</version> <scope>provided<scope> </dependency>
Add the following Maven build plugins in the pom.xml file:
<build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.2.1</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies </descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass /> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
Write your first sample spout by creating a SampleSpout class in the com.stormadvance.storm_example package. The SampleSpout class extends the serialized BaseRichSpout class. This spout does not connect to an external source to fetch data, but randomly generates the data and emits a continuous stream of records. The following is the source code of the SampleSpout class with an explanation:
public class SampleSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; private static final Map<Integer, String> map = new HashMap<Integer, String>(); static { map.put(0, "google"); map.put(1, "facebook"); map.put(2, "twitter"); map.put(3, "youtube"); map.put(4, "linkedin"); } private SpoutOutputCollector spoutOutputCollector; public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) { // Open the spout this.spoutOutputCollector = spoutOutputCollector; } public void nextTuple() { // Storm cluster repeatedly calls this method to emita continuous // stream of tuples. final Random rand = new Random(); // generate the random number from 0 to 4. int randomNumber = rand.nextInt(5); spoutOutputCollector.emit(new Values(map.get(randomNumber))); try{ Thread.sleep(5000); }catch(Exception e) { System.out.println("Failed to sleep the thread"); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { // emit the tuple with field "site" declarer.declare(new Fields("site")); } }
Write your first sample bolt by creating a SampleBolt class within the same package. The SampleBolt class extends the serialized BaseRichBolt class. This bolt will consume the tuples emitted by the SampleSpout spout and will print the value of the field site on the console. The following is the source code of the SampleStormBolt class with an explanation:
public class SampleBolt extends BaseBasicBolt { private static final long serialVersionUID = 1L; public void execute(Tuple input, BasicOutputCollector collector) { // fetched the field "site" from input tuple. String test = input.getStringByField("site"); // print the value of field "site" on console. System.out.println("######### Name of input site is : " + test); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
Create a main SampleStormTopology class within the same package. This class creates an instance of the spout and bolt along with the classes, and chaines them together using a TopologyBuilder class. This class uses org.apache.storm.LocalCluster to simulate the Storm cluster. The LocalCluster mode is used for debugging/testing the topology on a developer machine before deploying it on the Storm cluster. The following is the implementation of the main class:
public class SampleStormTopology { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { // create an instance of TopologyBuilder class TopologyBuilder builder = new TopologyBuilder(); // set the spout class builder.setSpout("SampleSpout", new SampleSpout(), 2); // set the bolt class builder.setBolt("SampleBolt", new SampleBolt(), 4).shuffleGrouping("SampleSpout"); Config conf = new Config(); conf.setDebug(true); // create an instance of LocalCluster class for // executing topology in local mode. LocalCluster cluster = new LocalCluster(); // SampleStormTopology is the name of submitted topology cluster.submitTopology("SampleStormTopology", conf, builder.createTopology()); try { Thread.sleep(100000); } catch (Exception exception) { System.out.println("Thread interrupted exception : " + exception); } // kill the SampleStormTopology cluster.killTopology("SampleStormTopology"); // shutdown the storm test cluster cluster.shutdown(); } }
Go to your project's home directory and run the following commands to execute the topology in local mode:
$> cd $STORM_EXAMPLE_HOME $> mvn compile exec:java -Dexec.classpathScope=compile -Dexec.mainClass=com.stormadvance.storm_example.SampleStormTopology
Now create a new topology class for deploying the topology on an actual Storm cluster. Create a main SampleStormClusterTopology class within the same package. This class also creates an instance of the spout and bolt along with the classes, and chains them together using a TopologyBuilder class:
public class SampleStormClusterTopology { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { // create an instance of TopologyBuilder class TopologyBuilder builder = new TopologyBuilder(); // set the spout class builder.setSpout("SampleSpout", new SampleSpout(), 2); // set the bolt class builder.setBolt("SampleBolt", new SampleBolt(), 4).shuffleGrouping("SampleSpout"); Config conf = new Config(); conf.setNumWorkers(3); // This statement submit the topology on remote // args[0] = name of topology try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException alreadyAliveException) { System.out.println(alreadyAliveException); } catch (InvalidTopologyException invalidTopologyException) { System.out.println(invalidTopologyException); } catch (AuthorizationException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
Build your Maven project by running the following command on the projects home directory:
mvn clean install
The output of the preceding command is as follows:
------------------------------------------------------------------ ----- [INFO] ----------------------------------------------------------- ----- [INFO] BUILD SUCCESS [INFO] ----------------------------------------------------------- ----- [INFO] Total time: 58.326s [INFO] Finished at: [INFO] Final Memory: 14M/116M [INFO] ----------------------------------------------------------- ----
We can deploy the topology to the cluster using the following Storm client command:
bin/storm jar jarName.jar [TopologyMainClass] [Args]
The preceding command runs TopologyMainClass with the arguments arg1 and arg2. The main function of TopologyMainClass is to define the topology and submit it to the Nimbus machine. The storm jar part takes care of connecting to the Nimbus machine and uploading the JAR part.
Log in on a Storm Nimbus machine and execute the following commands:
$> cd $STORM_HOME $> bin/storm jar ~/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.stormadvance.storm_example.SampleStormClusterTopology storm_example
In the preceding code ~/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar is the path of the SampleStormClusterTopology JAR that we are deploying on the Storm cluster.
The following information is displayed:
702 [main] INFO o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -8367952358273199959:-5050558042400210383 793 [main] INFO o.a.s.s.a.AuthUtils - Got AutoCreds [] 856 [main] INFO o.a.s.StormSubmitter - Uploading topology jar /home/USER/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar to assigned location: /tmp/storm-data/nimbus/inbox/stormjar-d3007821-f87d-48af-8364-cff7abf8652d.jar 867 [main] INFO o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /tmp/storm-data/nimbus/inbox/stormjar-d3007821-f87d-48af-8364-cff7abf8652d.jar 868 [main] INFO o.a.s.StormSubmitter - Submitting topology storm_example in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-8367952358273199959:-5050558042400210383","topology.workers":3} 1007 [main] INFO o.a.s.StormSubmitter - Finished submitting topology: storm_example
Run the jps command to see the number of running JVM processes as follows:
jps
The preceding command's output is:
26827 worker 26530 supervisor 26824 worker 26468 nimbus 26822 worker
In the preceding code, a worker is the JVM launched for the SampleStormClusterTopology topology.
- PHP 從入門到項目實踐(超值版)
- VMware vSphere 6.7虛擬化架構實戰指南
- GameMaker Programming By Example
- Visual Basic程序設計實踐教程
- HTML5從入門到精通(第4版)
- Visual Studio 2015高級編程(第6版)
- INSTANT Adobe Edge Inspect Starter
- Python Interviews
- 創意UI:Photoshop玩轉APP設計
- 新印象:解構UI界面設計
- Vue.js 3應用開發與核心源碼解析
- Python青少年趣味編程
- 算法設計與分析:基于C++編程語言的描述
- Ubuntu Server Cookbook
- Learning PowerShell DSC(Second Edition)