Streaming ETL with Lambda

Now let’s set up a Lambda function for the Kinesis Data Firehose stream to do some basic pre-processing before data is stored in the S3 bucket. The Game Analytics Pipeline uses a Lambda transform to do similar ETL to transform data from JSON to Parquet before storing in S3. We will replicate something similar, but keep it simple for the purpose of this lab.

  • In the AWS Management Console, select Services and choose Lambda

  • Click Create function in the top right corner

  • Choose to Author from scratch and give the function a name, this lab uses the name PeculiarProcessing

  • Set the runtime to Python 3.8 and Create function

  • In the body of the Lambda function code, copy and paste the following:
import json
import base64

def lambda_handler(event, context):
    
    output = []
    
    for record in event['records']:
    
        data=base64.b64decode(record['data']) 
        json_data = json.loads(data)
        
        new_data = flatten_json(json_data)
    
        output_record = {
           'recordId': record['recordId'],
           'result': 'Ok',
           'data': base64.b64encode(json.dumps(new_data, separators=(',', ':')).encode('utf-8') + b'\n').decode('utf-8')
        }
        
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))
    return {'records': output}
    
# Function for flattening json 
def flatten_json(y): 
    out = {} 
  
    def flatten(x, name =''): 
          
        # If the Nested key-value  
        # pair is of dict type 
        if type(x) is dict: 
              
            for a in x: 
                flatten(x[a], a + '_') 
                  
        # If the Nested key-value 
        # pair is of list type 
        elif type(x) is list: 
              
            i = 0
              
            for a in x:                 
                flatten(a, str(i) + '_') 
                i += 1
        else: 
            out[name[:-1]] = x 
  
    flatten(y) 
    return out

This code simply flattens the JSON data to be used for analytics later. This is a simple example, but you can make this custom code more complex to do the ETL that you would like. You can do things such as transform the data to a different format before storing in your data lake. The benefit of pre-processing data before storing it is that it will be an optimal format for storage and analytics. This can help with both performance and cost.

  • Your code should look similar to the following:

  • Scroll down to Edit the Basic settings

  • Change the Timeout from 3 seconds to 3 minutes and save

  • Hit Deploy to save and deploy your updated Lambda function

  • Now, we need to add it to the Kinesis Data Firehose stream so that it can pre-process the incoming streaming data.

  • In the AWS Management Console, go to Kinesis, and find the Kinesis Data Firehose stream you created previously - this lab uses a stream by the name of Peculiar-KDF.

  • Click into the stream to view the details and choose Edit

  • Find the section Transform source records with AWS Lambda and set it to Enabled

  • Find and select the Lambda function you just created

  • Leave everything else as default and hit Save

You’ve successfully created a transform Lambda function to pre-process streaming data with Kinesis! In your own data analytics pipeline, you can customize the logic in this Lambda function to suit your analytics needs.