Now that the batch analytics portion is set up, let’s move onto configuring the real-time portion of our pipeline so we can see data populating on a graph as it comes in. First, let’s configure a Kinesis Analytics stream.
First, we need to start sending streaming data again. Go back to the
Kinesis Data Generator and hit
Send data again. Keep data sending this time since we want to look at data as it comes in, do not stop the Kinesis Data Generator. If you need a refresher how to do this, check here.
Go to the
AWS Management Console, select
Services, and then choose
Kinesis or use this quick link.
Data Analytics, choose
Choose an application name, for example
peculiar-analytics-stream, and leave the
runtime as SQL which should be selected by default.
We need to connect streaming data as a source. The source will be our Kinesis Data Stream we configured previously. We are sending data from the Kinesis Data stream to Kinesis Firehose to store directly in S3 to save our data for historical purposes. Now, we are also going to send it to a Kinesis Analytics application to run SQL queries on that data in real time.
Connect streaming data and choose
Kinesis data stream to add Kinesis Data Stream as our streaming source. Choose the delivery stream you created previously - this lab uses
Peculiar-KDS as the name.
Discover schemato discover the schema of your data in your stream. It should look similar to below:
If you are unable to discover the schema, it is probably because you are not sending data with the Kinesis Data Generator. If you are sending data with the Kinesis Data Generator, you might need to log out and log back in because the session might have timed out. Another potential issue might be low data throughput. If this is the case, make sure to increase the amount of records being sent from the Kinesis Data Generator.
Save and continue
Now, configure your real time analytics by clicking
Go to SQL editor. If you are asked if you would like to start running your application, choose
Yes, start application.
In the SQL editor, paste the following SQL query and then
Save and run SQL:
-- ** Continuous Filter ** CREATE OR REPLACE STREAM "data_stream" ("event_id" INTEGER, "event_type" INTEGER, "event_name" varchar(50), "event_timestamp" INTEGER, "event_version" INTEGER, "item_id" INTEGER, "item_name" varchar(50), "item_amount" INTEGER, "real_value" INTEGER, "virtual_value" INTEGER, "currency_type" varchar(50), "country_id" varchar(50), "platform" varchar(50)); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "data_stream" SELECT STREAM "event_id", "event_type", "event_name", "event_timestamp", "event_version", "item_id", "item_name", "item_amount", "real_value", "virtual_value", "currency_type", "country_id", "platform" FROM "SOURCE_SQL_STREAM_001" WHERE "platform" = 'iPhone';
This is a continous filter query that will filter the data in the stream. This query creates an output stream, which can be used to send to a destination. It creates a pump to insert data into the output stream, it selects all columns from the source stream that we want to filter through, and then it filters based on a
WHERE clause. In this case, the SQL query filters data where platform equals iPhone.
For more information on how to create SQL queries for Kinesis Data Analytics, check out the SQL reference guide in the documentation.
You should be able to see results coming in real-time as shown below. If you look at the
platform column, all of the results should be iPhone due to the continuous filter SQL query.
The last step for setting up the Kinesis Data Analytics stream is configuring a destination for the stream. The destination will be a Lambda function that will be configured next.