Building a Serverless Data Pipeline with Python: S3 to Lambda to DynamoDB
A step-by-step tutorial on creating a classic serverless data pipeline where files dropped into an S3 bucket automatically trigger a Python Lambda function to process the data and store the results in DynamoDB.
One of the most common and powerful patterns in the serverless world is the automated data pipeline. The ability to drop a file into an S3 bucket and have it automatically processed and stored without managing any servers is a game-changer for many applications.
This guide provides a step-by-step tutorial on how to build a classic serverless data pipeline using S3, Lambda, and DynamoDB with Python.
The Architecture
Our pipeline will be simple but effective:
- S3 Bucket: A new CSV file containing product data is uploaded to an S3 bucket.
- S3 Event Notification: The upload event triggers an AWS Lambda function.
- Lambda Function: The Python function reads the CSV file, processes each row, and writes the data to a DynamoDB table.
- DynamoDB Table: The processed data is stored in a DynamoDB table for fast querying.
This architecture is fully serverless, meaning it's cost-effective (you only pay when it runs), infinitely scalable, and requires no server maintenance.
Step 1: Set Up the AWS Resources
First, we need to create our S3 bucket and DynamoDB table.
Create the S3 Bucket: Go to the S3 console and create a new bucket. Give it a unique name (e.g.,
my-product-data-pipeline
).Create the DynamoDB Table: Go to the DynamoDB console and create a table named
Products
with a primary key ofid
(string).
Step 2: Write the Python Lambda Function
Now, let's write the core logic of our pipeline. This Lambda function will be responsible for reading the file from S3 and writing to DynamoDB.
import boto3
import csv
import io
s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
def handler(event, context):
"""Processes a CSV file from S3 and saves the data to DynamoDB."""
# Get the bucket and key from the S3 event notification
bucket_name = event['Records'][0]['s3']['bucket']['name']
file_key = event['Records'][0]['s3']['object']['key']
print(f"Processing file: {file_key} from bucket: {bucket_name}")
# Get the CSV file from S3
response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV file line by line
# Use io.StringIO to treat the byte string as a file
csv_file = io.StringIO(response['Body'].read().decode('utf-8'))
csv_reader = csv.DictReader(csv_file)
table = dynamodb.Table('Products')
with table.batch_writer() as batch:
for row in csv_reader:
print(f"Writing item: {row}")
batch.put_item(Item=row)
return {
'statusCode': 200,
'body': f'Successfully processed {file_key}.'
}
Key Points:
- Event Parsing: The function gets the bucket name and file key from the
event
object passed to it by S3. - Streaming: It reads the file content directly from the S3 response body. For very large files, you would use the streaming capabilities of the body object.
- Batch Writer: Instead of calling
put_item
for every row (which would be slow and inefficient), we use DynamoDB'sbatch_writer
. This automatically handles buffering the items and writing them to the table in efficient batches.
Step 3: Create the Lambda Function and Configure the Trigger
Create the Lambda Function: Go to the AWS Lambda console and create a new Python function. Paste the code from Step 2 into the function's code editor.
Configure IAM Permissions: This is the most critical step. Your Lambda function needs permission to read from the S3 bucket and write to the DynamoDB table. Go to the function's configuration and attach an IAM policy to its execution role with the following permissions:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::my-product-data-pipeline/*" }, { "Effect": "Allow", "Action": [ "dynamodb:BatchWriteItem", "dynamodb:PutItem" ], "Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/Products" } ] }
Add the S3 Trigger: In the function's designer, click "Add trigger." Select S3, choose your
my-product-data-pipeline
bucket, and set the event type toAll object create events
. You can also specify a prefix (e.g.,uploads/
) if you only want to trigger the function for files in a specific folder.
Step 4: Test the Pipeline
Your pipeline is now live! To test it, create a simple CSV file named products.csv
:
id,name,price
prod-001,Laptop,1200
prod-002,Mouse,25
prod-003,Keyboard,75
Upload this file to your S3 bucket. Within seconds, the S3 event will trigger your Lambda function. You can check the function's CloudWatch logs to see the print statements, and you will find the new items in your Products
DynamoDB table.
Conclusion
You've just built a powerful, scalable, and fully serverless data pipeline with just a few lines of Python and some simple AWS configuration. This S3 -> Lambda -> DynamoDB pattern is a fundamental building block of many cloud-native applications. It can be adapted for a huge variety of use cases, from processing IoT data to ingesting user activity logs.
By mastering this pattern, you unlock a core competency of modern cloud development.