Process small json blob files using firehose and lambda using batch setup-Part-2

Rajesh
AWS Tip
Published in
5 min readSep 18, 2023

--

In our previous blog post, we explored the topic of utilizing Lambda Firehose for handling small blob files. If you haven’t had a chance to read it yet, we encourage you to take a look at it below.

Process small json blob files using firehose and lambda

The solution mentioned above is effective when dealing with incoming blobs or small JSON files in real-time, and the Lambda function is triggered by the S3 put lifecycle event. However, there could be situations where the Lambda parser isn’t prepared yet. By the time the parser is ready, there might be an accumulation of billions of these small JSON files. Attempting to trigger a separate Lambda function for each of these files could lead to Lambda and Firehose throttling issues.( Lambda throttling is contingent upon the maximum number of simultaneous Lambda executions permitted for an AWS account, usually capped at 1000 concurrent executions per account. In parallel, AWS Firehose operates with its own set of throttling limitations, encompassing restrictions on the quantity of records it can handle concurrently. These limitations are separate from the Lambda throttling parameters).This would make it a cumbersome process to handle such a large volume of files.

To over come this we can either three below service which will enable us to process those accumulated blobs in a batch

To over come this we can either three below service which will enable us to process those accumulated blobs in a batch

  • S3 Batch Operation
  • AWS Batch
  • AWS StepFunction

S3 Batch:With this feature, you can execute Lambda functions in a batch mode on a substantial collection of S3 objects. Amazon S3 oversees the advancement of these batch operations, delivers notifications, and maintains a completion report detailing the status of each action performed.

To initiate a batch operation, you create an Amazon S3 batch operations job. During the job creation process, you specify a manifest containing the list of objects and configure the action that needs to be executed on these objects.

AWS Batch: AWS Batch provides a solution where you can execute your Lambda parsing code within a separate Docker container. This approach helps you address concerns related to Lambda throttling and timeouts effectively.

AWS StepFunction:AWS Step Functions is a serverless orchestration service .Using this we can schedule job and even run lambda function

In this blog post, we will delve into the initial approach, which involves utilizing S3 Batch Operations, and we will explore the necessary modifications required in our Lambda function to integrate it seamlessly into the S3 batch processing workflow.

import json
import boto3
import time
from datetime import datetime

firehose_client = boto3.client("firehose")

def process_record(bucket_name,file_key):
try:
objWithKey = get_object_with_key(bucket_name,file_key)
file_content = objWithKey['obj']['Body'].read().decode('utf-8')
json_content = json.loads(file_content)

result_list = []

for item in json_content['events']:
if int(item['maskLeakage']) > 40:
start_time = datetime.fromtimestamp(int(item['startTime']) / 1000)
end_time = datetime.fromtimestamp(int(item['endTime']) / 1000)
item['startTime'] = start_time.strftime("%m-%d-%Y, %H:%M:%S.%f")
item['endTime'] = end_time.strftime("%m-%d-%Y, %H:%M:%S.%f")
result_list.append(item)

return result_list

except Exception as e:
print(e)
raise e

def send_messages_to_firehose_with_backoff(stream_name, records):
max_retries = 5
base_delay = 0.1 # Initial delay in seconds, adjust as needed
retries = 0

while retries < max_retries:
try:
response = firehose_client.put_record_batch(
DeliveryStreamName=stream_name,
Records=[
{"Data": json.dumps(record) + "\n"} for record in records
]
)
return response
except Exception as e:
print(e)
retries += 1
delay = base_delay * (2 ** retries) # Exponential backoff formula
print(f"Retrying in {delay} seconds...")
time.sleep(delay)

print(f"Max retries reached. Failed to send messages to {stream_name}")
return None

def lambda_handler(event, context):
try:
print("---- Debug Print ----")
print(event)
task_id = event["tasks"][0]["taskId"]
s3_bucket_arn = event['tasks'][0]['s3BucketArn']
bucketName = s3_bucket_arn.split(':::')[-1]
key = event['tasks'][0]['s3Key']
print(f"Calling json parser with bucket: {bucketName} and key: {key}")

results = []
result_code = "Succeeded"
result_string = ''
if bucketName:
resultList=run_throttled(bucketName, key)
response = send_messages_to_firehose_with_backoff("demo-json-blob-ingestion-firehose", resultList)
print(response)
else:
raise Exception(f'Bucket name not found in task: {json.dumps("task1")}')
except Exception as e:
result_code = "PermanentFailure"
result_string = str(e)
print(e)
finally:
results.append(
{
"taskId": task_id,
"resultCode": result_code,
"resultString": result_string,
}
)
return {
"invocationSchemaVersion": event['invocationSchemaVersion'],
"treatMissingKeysAs": "PermanentFailure",
'invocationId': event['invocationId'],
"results": results,
}


def get_object_with_key(bucket, key):
s3 = boto3.client('s3')
try:
obj = s3.get_object(Bucket=bucket, Key=key)
return {'obj': obj, 'key': key}
except Exception as err:
print("key not found", key)
raise err

def run_throttled(bucketName, key):
time.sleep(1)
return process_record(bucketName, key)

The provided code processes the blob files synchronously rather than the typical asynchronous behavior. In the code snippet, you can observe that we are resturning a JSON object with the following fields for s3 batch .Where as for normal lambda there is no such requirement.

{
"invocationSchemaVersion": "1.0",
"treatMissingKeysAs" : "PermanentFailure",
"invocationId" : event['invocationId'],
"results": [
{
"taskId": "sometask",
"resultCode": "Succeeded",
"resultString": "["result1","reslt2"]"
}
]
}

if we see above code carefully we have changed the result status depedning on exception top get the exact error.

send_messages_to_firehose_with_backoff function takes care of throttling of firehose if there is any due to no of records .We handled it with a retry with expontial backoff time .

Now, let’s create the manifest csv ,which is essentially a collection of key-value pairs. In this context, the key represents the bucket name, while the value corresponds to the path of the blob file. There are differnet ways to create the manifest csv file I have used below command to create the same

aws s3 ls s3://json-blob-demo-bucket --recursive |  awk '{gsub("", "");print "json-blob-demo-bucket,"$4}' > samplemanifest_demo.csv

A sample of this manifest structure is demonstrated below.

Sample manifest csv

After completing the preceding steps, we can proceed to create an S3 Batch Job that references the previously mentioned Lambda function and utilizes the provided sample manifest to execute the Lambda in batch mode. Once the job is successfully created, we can review the job details, which will include information about the number of objects specified in our manifest.

Now, we can initiate the job we just created, and upon completion, it will display a report indicating if there were any failures. It’s that straightforward! That concludes this blog post. For access to the code and Terraform setup used in this demonstration, you can find the GitHub link provided here.

Ref:https://docs.aws.amazon.com/lambda/latest/dg/services-s3-batch.html

--

--