- Learning Apache Apex
- Thomas Weise Munagala V. Ramanath David Yan Kenneth Knowles
- 375字
- 2021-07-02 22:38:41
Writing the integration test
The test covers the entire DAG and will run the application in embedded mode. In embedded mode, all operators and containers share the JUnit JVM. Containers are threads (instead of separate processes) but the data flow still behaves as if operators lived in separate processes. This means operators execute asynchronously as they would in a distributed cluster and data is transferred over the loopback interface (if that's how the streams are configured).
@Test public void testApplication() throws Exception { EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED); Attribute.AttributeMap launchAttributes = new
Attribute.AttributeMap.DefaultAttributeMap(); launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true); Configuration conf = new Configuration(false); conf.addResource(this.getClass().getResourceAsStream("/META-
INF/properties.xml")); conf.addResource(this.getClass().getResourceAsStream("/wordcounttest-
properties.xml")); File resultFile = new File("./target/wordcountresult_4.0"); if (resultFile.exists() && !resultFile.delete()) { throw new AssertionError("Failed to delete " + resultFile); } AppHandle appHandle = launcher.launchApp(new Application(), conf,
launchAttributes); long timeoutMillis = System.currentTimeMillis() + 10000; while (!appHandle.isFinished() && System.currentTimeMillis() < timeoutMillis) { Thread.sleep(500); if (resultFile.exists() && resultFile.length() > 0) { break; } } appHandle.shutdown(ShutdownMode.KILL); Assert.assertTrue(resultFile.exists() && resultFile.length() > 0); String result = FileUtils.readFileToString(resultFile); Assert.assertTrue(result, result.contains("MyFirstApplication=5")); }
The flow of the test is as follows:
- The test driver creates a launcher for embedded mode.
- Load the configuration (from the standard location as well as overrides that are needed for the test.
- Make sure that the result area is clean, which means any residual from previous runs should be removed as part of the test initialization.
- Instantiate the application and launch it with the embedded cluster, keep the returned handle to check and control the asynchronous execution.
- In a loop, check if the application has terminated or cancel it after the expected results are in. In case the logic checks for the expected result files and with the timeout also makes sure that the test does not turn into a runaway process if the application fails to produce the expected results.
- After the results are available, check the details with assertions.
The test completes within a few seconds and can easily be run within the IDE whenever a change is made. At this stage, there is no need to package the application, launch it on a Hadoop cluster, and diagnose issues by collecting logs and other information from distributed workers. Instead, when a test fails it is most of the time straightforward to identify the cause and where appropriate step through the code with a debugger.
- 機器學習實戰:基于Sophon平臺的機器學習理論與實踐
- Canvas LMS Course Design
- Blender Compositing and Post Processing
- 21天學通C語言
- 新編計算機組裝與維修
- 電氣控制與PLC技術應用
- 工業機器人力覺視覺控制高級應用
- 未來學徒:讀懂人工智能飛馳時代
- Mastering Predictive Analytics with scikit:learn and TensorFlow
- 信息系統安全保障評估
- Learning Couchbase
- 大數據:從基礎理論到最佳實踐
- Qt中的C++技術
- Learning Kibana 7(Second Edition)
- Network Science with Python and NetworkX Quick Start Guide