Filter data with Amazon Kinesis Data Analytics

Now that the batch analytics portion is set up, let’s move onto configuring the real-time portion of our pipeline so we can see data populating on a graph as it comes in. First, let’s configure a Kinesis Analytics stream.

  • First, we need to start sending streaming data again. Go back to the Kinesis Data Generator and hit Send data again. Keep data sending this time since we want to look at data as it comes in, do not stop the Kinesis Data Generator. If you need a refresher how to do this, check here.

  • Go to the AWS Management Console, select Services, and then choose Kinesis or use this quick link.

  • Under Data Analytics, choose Create application.

  • Choose an application name, for example peculiar-analytics-stream, and leave the runtime as SQL which should be selected by default.

  • Click Create application.

  • We need to connect streaming data as a source. The source will be our Kinesis Data Stream we configured previously. We are sending data from the Kinesis Data stream to Kinesis Firehose to store directly in S3 to save our data for historical purposes. Now, we are also going to send it to a Kinesis Analytics application to run SQL queries on that data in real time.

  • Click Connect streaming data and choose Kinesis data stream to add Kinesis Data Stream as our streaming source. Choose the delivery stream you created previously - this lab uses Peculiar-KDS as the name.

  • Scroll down and click Discover schema to discover the schema of your data in your stream. It should look similar to below:

If you are unable to discover the schema, it is probably because you are not sending data with the Kinesis Data Generator. If you are sending data with the Kinesis Data Generator, you might need to log out and log back in because the session might have timed out. Another potential issue might be low data throughput. If this is the case, make sure to increase the amount of records being sent from the Kinesis Data Generator.

  • Click Save and continue

  • Now, configure your real time analytics by clicking Go to SQL editor. If you are asked if you would like to start running your application, choose Yes, start application.

  • In the SQL editor, paste the following SQL query and then Save and run SQL:

-- ** Continuous Filter **
CREATE OR REPLACE STREAM "data_stream" ("event_id" INTEGER, "event_type" INTEGER, "event_name" varchar(50),
        "event_timestamp" INTEGER, "event_version" INTEGER, "item_id" INTEGER, "item_name" varchar(50), 
        "item_amount" INTEGER, "real_value" INTEGER, "virtual_value" INTEGER, "currency_type" varchar(50), "country_id" varchar(50),
        "platform" varchar(50));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "data_stream"
SELECT STREAM "event_id", "event_type", "event_name", "event_timestamp", "event_version", "item_id", "item_name", 
        "item_amount", "real_value", "virtual_value", "currency_type", "country_id",
        "platform"
FROM "SOURCE_SQL_STREAM_001"
WHERE "platform" = 'iPhone';

This is a continous filter query that will filter the data in the stream. This query creates an output stream, which can be used to send to a destination. It creates a pump to insert data into the output stream, it selects all columns from the source stream that we want to filter through, and then it filters based on a WHERE clause. In this case, the SQL query filters data where platform equals iPhone.

For more information on how to create SQL queries for Kinesis Data Analytics, check out the SQL reference guide in the documentation.

You should be able to see results coming in real-time as shown below. If you look at the platform column, all of the results should be iPhone due to the continuous filter SQL query.

The last step for setting up the Kinesis Data Analytics stream is configuring a destination for the stream. The destination will be a Lambda function that will be configured next.