[Amazon Kinesis] How to extract Anomalous events from Amazon Kinesis Streams with Kinesis Data Analytics
overview
you’ll practice extracting anomalous events with Kinesis Data Analytics. When you’re finished with this lab, you’ll have a streaming pipeline capable of detecting anomalous events in real-time and send them to an S3 bucket.
Amazon Kinesis Data Streams Fundamentals
- Data streaming technology enables a customer to ingest, process and analyze high volumes of high velocity data from a variety of sources in real time.
Technical requirements for streaming
- Be able to process high throughput data with low latency
- Fan out to multiple consumers
- ordered consumptions of data for certain use cases
- Replay data to handle downstream failures
- Granular scaling
- High availability
- High durability
- High read concurrency
- Data Retention
Managed ability to capture & Store data
- Data streams are made of Shards
- Each Shard ingests data up to 1MB/sec, and up to 1000TPS
- Each Shard emits up to 2MB/sec
- All data is stored for 24h retention period by default up to 7 days
- Scale Kinesis data streams by splitting or merging Shards
- Replay data inside of 24h -7 days window
What is Kinesis Firehose?
- Allows you to deliver streaming (events) data into destinations such as BI databases, data exploration tools, dashboards
- Fully managed with elastic scaling that responds to increased throughput
- Allows you to batch large numbers of events into a single output file
DEMO
Set Up a Kinesis Data Stream
Since the number of complaints related to fraudulent trades have increased, Your job is to use Kinesis to detect and highlight in real-time those fraudulent trades.
The data flow of the end-product can be observed in the following diagram:
You are going to start by creating a Kinesis Data Stream that will store all the incoming trades.
- Go to Kinesis > Data Steams > Create Data stream
- Type name and choose Provisioned. Then click Shard estimator
To generate an estimated number of required shards, enter the following values:
- Average record size:
128
- Maximum records written per second:
100
- Total numbers of consumers:
1
(This should be the default value)
click Create data stream.
You should be redirected to the Demo-samples page where you will notice a Status label. After a couple of seconds, the Status column will display Active.
Create a Kinesis Producer Using Lambda Functions
Your Kinesis Data Stream is ready to accept traffic, however, there is no existing setup that can produce data directly to a Kinesis Data Stream. You will use a Lambda function that generates and sends information to the stream.
- Go to Lambda > functions > stock_trades_producer
Its role is to generate Stock Trade information and send it to a Kinesis Data Stream. For each invocation of this Lambda function, 100 records will be created and sent to the Kinesis Data Stream. Also, 5% of those records will have a significantly higher price and quantity. These values can be controlled through environment variables.
- Click the Configuration tab, and navigate to Permissions. Click the stock_trades_producer_role link.
- In the Permissions policies section, click Attach policies.
- Beside Filter policies in the Search input box type
Kinesis
, tick theAmazonKinesisFullAccess
box, and click on the Attach policy button.
- Go back to the stock_trades_producer browser tab and Within the Configuration tab, navigate to Environment variables and, click Edit.
Click Add environment variable, fill in the following values, and click Save
- Key:
STREAM_NAME
- Value: Demo-samples
Click the Test tab, then under the Event name enter trigger
, and click Save changes.
- Click Test.
A new section appears on the page saying Executing function… The lambda function will run for approx. 15 minutes, producing data to the Kinesis Data Stream, and needs to run
Run Kinesis Data Analytics Queries
Data is flowing into your Kinesis Data Stream, ready to be analyzed. You need to create a Kinesis Data Analytics application that will analyze all the incoming data in the Data Stream, and detect all the anomalous events.
- Go to Kinesis > Data Analytics > Create application
Complete the Kinesis Data Analytics — Create Application form using the following values, then click Create application:
- Application name:
anomaly-extractor-test
- Description:
Kinesis Data Analytics application that extracts anomalous events from a Kinesis Data Stream
- Runtime: SQL
- From the Source section, click Configure
Ensure the following fields are selected
- Source: Kinesis data stream
- Kinesis data stream: Demo-samples
- Note: Likely this is the only value you will have to set.
- Record pre-processing: Off
- Access permissions: Create / update IAM role kinesis-analytics-anomaly-extractor-us-west-2
Under the Schema section, click Discover schema, and once Schema discovery is successful, in the lower-right click Save and continue.
- From the Real time analytics section click configure.
- Copy the following Streaming SQL queries into the editor and click Save and run SQL.
-- Creates a temporary in-application temporary stream CREATE OR REPLACE STREAM "TEMP_STREAM" ( "tickerSymbol" VARCHAR(8), "tradeType" VARCHAR(4), "price" DOUBLE, "quantity" INTEGER, "id" INTEGER, "ANOMALY_SCORE" DOUBLE);-- Creates an output stream CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "tickerSymbol" VARCHAR(8), "tradeType" VARCHAR(4), "price" DOUBLE, "quantity" INTEGER, "id" INTEGER, "ANOMALY_SCORE" DOUBLE);-- Compute an anomaly score for each record in the source stream -- using the Random Cut Forest algorithm CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM" SELECT STREAM source."tickerSymbol", source."tradeType", rcf."price", rcf."quantity", source."id", rcf."ANOMALY_SCORE" FROM TABLE(RANDOM_CUT_FOREST( CURSOR(SELECT STREAM "price", "quantity" FROM "SOURCE_SQL_STREAM_001") ) ) AS rcf, SOURCE_SQL_STREAM_001 AS source;-- Sort records by descending ANOMALY_SCORE, insert into OUTPUT_STREAM events that have an anomaly score higher than 1 CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM * FROM "TEMP_STREAM" WHERE ANOMALY_SCORE > 1 ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
After some seconds, a new table should appear on the screen outputting all the events with an Anomaly Score higher than 1. The data is refreshed every couple of seconds.
The STREAM_PUMP
is using the Random Cut Forest algorithm to enrich the Demo-samples Kinesis Data Stream ( SOURCE_SQL_STREAM_001
) with an additional field called Anomaly Score. The Anomaly Score is calculated based on the price
and quantity
of each stock trade. The OUTPUT_PUMP
is selecting only the stock trades with an Anomaly Score higher than 1, and sends to the DESTINATION_SQL_STREAM
.
Sink Data to an S3 Bucket Using Kinesis Firehose
All anomalous data is now being registered in an in-application data stream, but they aren’t easily accessible. You will connect a data sink represented by an S3 bucket using Kinesis Firehose.
- Go to anomaly-extractor-test > Destination > Add destination
To deliver the data to a pre-defined S3 bucket, use the following values at the Configure form, and then in the lower-right click Save and continue
- Destination: Kinesis Firehose delivery stream
- Kinesis Firehose delivery stream: anomalous_stock_trades_firehose_stream
- Connect in-application stream: Choose an existing in-application stream (should be selected by default)
- In-application stream name: DESTINATION_SQL_STREAM
- Output format: JSON (should be selected by default)
- Access permissions: Create / update IAM role for kinesis-analytics-anomaly-extractor (should be selected by default)
- Go to s3 bucket list.
You should notice a folder structure represented by the current date. In the bottom level directory, you will notice that every minute a new JSON file is created containing the anomalous events extracted from the Kinesis Data Stream.
You detected malicious events using Amazon Kinesis!!!