官术网_书友最值得收藏!

How it works...

Stream processors listen for data from a streaming service such as Kinesis or DynamoDB Streams. Deploying a stream processor is completely declarative. We configure a function with the stream event type and the pertinent settings, such as the type, arn, batchSize, and startingPosition. The arn is set dynamically using a CloudFormation variable, ${cf:cncb-event-stream-${opt:stage}.streamArn}, that references the output value of the cnbc-event-stream stack.


Streams are the only resources that are shared between autonomous cloud-native services.

We will discuss batch size and starting position in detail in both Chapter 8, Designing for Failure, and Chapter 9, Optimizing Performance. For now, you may have noticed that the new stream processor logged all the events that were published to the stream in the last 24 hours. This is because the startingPosition is set to TRIM_HORIZON. If it was set to LATEST, then it would only receive events that were published after the function was created.

Stream processing is a perfect match for functional reactive programming with Node.js streams. The terminology can be a little confusing because the word stream is overloaded. I like to think of streams as either macro or micro. For example, Kinesis is the macro stream and the code in our stream processor function is the micro stream. My favorite library for implementing the micro stream is Highland.js (https://highlandjs.org). A popular alternative is RxJS (https://rxjs-dev.firebaseapp.com). As you can see in this recipe, functional reactive programming is very descriptive and readable. One of the reasons for this is that there are no loops. If you try to implement a stream processor with imperative programming, you will find that it quickly gets very messy. You also lose backpressure, which we will discuss in Chapter 8, Designing for Failure.

The code in the listener function creates a pipeline of steps that the data from the Kinesis stream will ultimately flow through. The first step, _(event.Records), converts the array of Kinesis records into a Highland.js stream object that will allow each element in the array to be pulled through the stream in turn as the downstream steps are ready to receive the next element. The .map(recordToEvent) step decodes the Base64 encoded data from the Kinesis record and parses the JSON into an event object. The next step, .tap(printEvent), simply logs the event so that we can see what is happening in the recipe.

Kinesis and event streaming, in general, is a member of the high performance, dumb-pipe-smart-endpoints generation of messaging middleware. This means that Kinesis, the dumb pipe, does not waste its processing power on filtering data for the endpoints. Instead, all that logic is spread out across the processing power of the smart endpoints. Our stream processor function is the smart endpoint. To that end, the .filter(forThingCreated) step is responsible for filtering out the events that the processor is not interested in. All the remaining steps can assume that they are receiving the expected event types.

Our bare-boned stream processor needs something somewhat interesting but simple to do. So, we count and print the number of thing-created events in the batch. We have filtered out all other event types, so the .collect() step collects all the remaining events into an array. Then, the .tap(printCount) step logs the length of the array. Finally, the .toCallback(cb) step will invoke the callback function once all the data in the batch has been processed. At this point, the Kinesis checkpoint is advanced and the next batch of events is processed. We will cover error handling and how it relates to batches and checkpoints in Chapter 8, Designing for Failure.

主站蜘蛛池模板: 景泰县| 房产| 奇台县| 宜阳县| 宁明县| 襄城县| 定襄县| 定西市| 潜江市| 河曲县| 东安县| 同德县| 宁津县| 横峰县| 鹿泉市| 蚌埠市| 峨眉山市| 临武县| 定州市| 靖西县| 兰坪| 海盐县| 万盛区| 榆林市| 雅安市| 凌源市| 衡阳县| 德令哈市| 彰化县| 垣曲县| 台东市| 浮山县| 新野县| 宣城市| 邯郸市| 金山区| 确山县| 九台市| 奉化市| 临漳县| 凤凰县|