Today I’m going to share some experience of building a data engineering project that I always take pride in. You are going to learn the reasons behind why I used the tools and AWS components, and how I designed the architecture.
Disclaimer: The content of this text is inspired by my experience with an unnamed entity. However, certain critical commercial interests and details have intentionally been replaced with fictional data/codes or omitted, for the purpose of maintaining confidentiality and privacy. Therefore, the full and accurate extent of the actual commercial interests involved is reserved.
Prerequisites
- Knowledge of Python
- Understanding of AWS components, such as DynamoDB, Lambda serverless, SQS and CloudWatch
- Comfortable coding experience with YAML & SAM CLI
Background
Let’s say you are a data engineer and you need to constantly update the data in the warehouse. For example, you are responsible to sync up with the sales records of Dunder Mifflin Paper Co. on a regular basis. (I understand this is not a realistic scenario but have fun 🙂 !) The data is sent to you via a vendor’s API and you are held accountable for making sure the information of the branches, employees (actually only salespersons are considered), and sales are up-to-date. The provided API has the following 3 paths:
/branches
, accepting branch name as a query parameter for retrieving the metadata of a specified branch;/employees
, accepting branch ID as a query parameter for retrieving the information of all its employees of a certain branch, the response includes a key-value pair that indicates the employees’ occupations;/sales
, accepting employee ID as a query parameter for retrieving the all-time sales records of a salesperson, the response includes a key-value pair that indicates when the transaction was complete.
So generally speaking, the returns of API look like this:
/branches
path:
{
"result": [
{
"id": 1,
"branch_name": "Scranton",
"employees": 50,
"location": "Scranton, PA",
...
}
]
}
/employees
path:
{
"result": {
"branch_id": 1,
"employees": [
{
"id": 1234,
"occupation": "data engineer",
"name": "John Doe",
...
},
{
"id": 1235,
"occupation": "salesperson",
"name": "Jim Doe",
...
},
...
],
...
}
}
/sales
path:
{
"result": {
"employee_id": 1235,
"sales: [
{
"id": 3972,
"transaction_timestamp": "2023-01-01 23:43:23",
...
},
{
"id": 4002,
"transaction_timestamp": "2023-01-05 12:23:31",
...
},
...
],
...
}
}
It’s expected that your final work will facilitate the data analysts who are able to retrieve data for their analyses only with SQL queries.
Ideas
It’s quite an easy call to say that in the end, we are going to have 3 different places to respectively store the data of branches, salespersons, and sales. The data will be imported by accessing certain API paths respectively. However, due to the fact that the identifiers for all those entities are mostly automatically generated, it’s not likely for a practitioner to have the IDs beforehand. Instead, since it’s normally available for us to find the branch names so it’s plausible that we use the first path to grab the metadata of branches, as well as the IDs of its employees. And we can access the /employees
path using the employee IDs and so can we do to the /sales
path. That’s exactly why I call this pipeline cascading.
To assure our database is up-to-date most of the time, it is necessary to execute these operations frequently enough. But on the other, we are also obligated to take into consideration the cost and the potential API visit quotas. Hence, running it once an hour is proper though arguably not yet optimal.
Last but not least, let’s discuss AWS. First of all, the codes executing those operations are going to be run by AWS Lambda because of its capacity of having 200+ AWS services and applications as its triggers, including SQS and EventBridge. The data is going to be delivered via SQS as one of the most established messaging services provided by AWS. Finally, the information scraped from API is going to be stored in DynamoDB. To some experienced readers here, it’s probably confusing to leverage DynamoDB as the data warehousing tool since this is a NoSQL database service while data warehouses are generally incompatible with NoSQL databases. I’m surely aware of it, and the DynamoDB tables here will only be the staging ones as I can make use of its flexibility in key-value/document data model schemas before eventually converting JSON-formatted API retrievals into data warehouse records. Check out this article if you are interested in the details of my implementation of DynamoDB-S3 loading.
Implementation
Here is the structure of my final work.
Cascading-ETL-pipeline
├── LICENSE
├── README.md
├── branches
│ ├── Pipfile
│ ├── Pipfile.lock
│ ├── lambda_function.py
│ ├── requirements.txt
│ └── service
│ ├── config.py
│ └── service.py
├── sales
│ ├── Pipfile
│ ├── Pipfile.lock
│ ├── lambda_function.py
│ ├── requirements.txt
│ └── service
│ ├── config.py
│ └── service.py
├── salespersons
│ ├── Pipfile
│ ├── Pipfile.lock
│ ├── lambda_function.py
│ ├── requirements.txt
│ └── service
│ ├── config.py
│ └── service.py
├── template.yml
└── utils.py
There are 3 folders (/branches, /salespersons, /sales) respectively containing the codes of each lambda function. Utils.py is a Swiss-army-knife-like file where the functions, variables, and classes are globally applied. And template.yml is the AWS CloudFormation template that we will use to declare and deploy AWS resources to establish our Data Pipeline.
Lambda_function.py in each folder is the entrance function of the code execution:
import json
import logging
from pythonjsonlogger import jsonlogger
from service import service, config
# Load environment
ENV = config.load_env()
LOGGER = logging.getLogger()
# Replace the LambdaLoggerHandler formatter :
LOGGER.handlers[0].setFormatter(jsonlogger.JsonFormatter())
# Set default logging level
LOGGING_LEVEL = getattr(logging, ENV["LOGGING_LEVEL"])
LOGGER.setLevel(LOGGING_LEVEL)
def _lambda_context(context):
"""
Extract information relevant from context object.
Args:
context: The context object provided by the Lambda runtime.
Returns:
dict: A dictionary containing relevant information from the context object.
"""
return {
"function_name": context.function_name,
"function_version": context.function_version,
}
# @datadog_lambda_wrapper
def lambda_handler(event, context):
"""
Handle the Lambda event.
Args:
event(dict): The event object containing input data for the Lambda function.
context(dict): The context object provided by the Lambda runtime.
Returns:
dict: A dictionary containing the response for the Lambda function.
"""
LOGGER.info("Starting lambda executing.", extra=_lambda_context(context))
service.main(event, ENV)
LOGGER.info("Successful lambda execution.", extra=_lambda_context(context))
return {"statusCode": 200}
/Service/config.py returns the environment variables input in the template.yml:
import os
import sys
import logging
LOGGER = logging.getLogger(__name__)
def load_env():
"""Load environment variables.
Returns:
dict: A dictionary containing the loaded environment variables.
Raises:
KeyError: If any required environment variable is missing.
Notes:
- The function attempts to load several environment variables including:
- If any of the required environment variables are missing, a KeyError is raised.
- The function logs an exception message indicating the missing environment variable and exits the program with a status code of 1.
"""
try:
return {
"LOGGING_LEVEL": os.environ["LOGGING_LEVEL"],
"APP_ENV": os.environ["APP_ENV"],
"SQS": os.environ["SQS"],
"DB": os.environ["DB"],
}
except KeyError as error:
LOGGER.exception("Enviroment variable %s is required.", error)
sys.exit(1)
/Service/service.py is where we actually wrangle the data. Basically speaking, the function is invoked by a trigger or two (time schedulers), before retrieving data from a data source (API or SQS queue). Data will be packaged in a set of key-value pairs and if in need, updated into its corresponding DynamoDB table before the function distributes the identifiers of its members (i.e., all employees in a branch, all sales records of a salesperson).
Take /branches/service/service.py
as an example. Its functionality includes:
- acquire all the data from the API
/branches
as soon as it is wakened up; - check the existence and accuracy of the personal information of each salesperson in the DynamoDB data table, if it’s not up-to-date, upsert the data record;
- get all the IDs of their employees, and deliver them along with their branch ID as a message via an SQS queue to the tailing function (/salespersons).
In practice, the implementation will be like:
import logging, requests, sys
from utils import *
from boto3.dynamodb.conditions import Key
LOGGER = logging.getLogger(__name__)
def main(event, environment):
"""Process invoking event data and update the DynamoDB table based on specified branches.
Args:
event (dict): A JSON-formatted document that contains data for a Lambda function to process.
environment (dict): A context object that provides methods and properties about the invocation, function and runtime environment.
Returns:
None
Raises:
SystemExit: If an exception occurs during the execution.
Notes:
- If `event` does not contain the 'branches' key, the function will default to processing information for all branches.
- The function retrieves branch-specific information from a URL and updates the DynamoDB table accordingly.
- The updated information is then delivered to an SQS queue for further processing.
"""
LOGGER.info(event)
if not event.get("branches"):
# default to look up all branches if the value is an empty list
branches = [
"Scranton",
"Akron",
"Buffalo",
"Rochester",
"Syracuse",
"Utica",
"Binghamton",
"Albany",
"Nashua",
"Pittsfield",
"Stamford",
"Yonkers",
"New York",
]
else:
branches = event["branches"] # should be an array
queue = environment["SQS"]
table = environment["DB"]
try:
for branch in branches:
# go to a path that allows users to retrieve all information of the specified branch(es) based on input date range
response = requests.get(
url=f"www.dundermifflinpaper.com/branches/?branch={branch}"
)
response = response.json()
branches = response.get("result")
for result in branches:
if not upToDate(
table,
Key("branch_id").eq(str(result["id"])),
result,
"branch_",
):
# only update DynamoDB table when it's NOT complete ingesting
update_info(table, result)
deliver_message(queue, str({"branch": result["branch_id"]}))
LOGGER.info(f"sending branch {result['branch_id']} for the next stage")
except Exception as e:
LOGGER.error(str(e), exc_info=True)
sys.exit(1)
In the end, we need to prepare for the build and deployment:
AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Parameters: # Type: String
Environment:
Type: String
Resources:
# =========================================================================================
# IAM ROLES, POLICIES, PERMISSIONS
# =========================================================================================
LambdaRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub '${AWS::StackName}-lambda-role'
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
- events.amazonaws.com
Action:
- sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AWSLambdaExecute
- arn:aws:iam::aws:policy/AmazonSQSFullAccess
- arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
Path: '/'
LambdaPolicy:
Type: AWS::IAM::Policy
Properties:
PolicyName: !Sub '${AWS::StackName}-lambda-policy'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: EventBusAccess
Effect: Allow
Action:
- events:PutEvents
Resource: '*'
- Sid: LambdaInvokeAccess
Effect: Allow
Action:
- lambda:InvokeFunction
Resource: "*"
- Sid: LogAccess
Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: arn:aws:logs:*:*:*
Roles:
- !Ref LambdaRole
# =========================================================================================
# AWS LAMBDA FUNCTIONS
# =========================================================================================
BranchCollector:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub branch-collector-${Environment}
Handler: lambda_function.lambda_handler
Runtime: python3.9
CodeUri: branches/
Description: updating branch info in our DynamoDB table
MemorySize: 128
Timeout: 900
Role: !GetAtt LambdaRole.Arn
Environment:
Variables:
LOGGING_LEVEL: INFO
APP_ENV: !Ref Environment
SQS: !Ref EmployeeQueue
DB: !Sub branches-${Environment}
DeadLetterQueue:
Type: SQS
TargetArn:
Fn::GetAtt: BranchFunctionDeadLetterQueue.Arn
Events:
StartScheduledEvent:
Type: Schedule
Properties:
Schedule: rate(1 hour)
SalespersonCollector:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub salesperson-collector-${Environment}
Handler: lambda_function.lambda_handler
Runtime: python3.9
CodeUri: salespersons/
Description: updating salesperson info in our DynamoDB table
MemorySize: 128
Timeout: 900
Role: !GetAtt LambdaRole.Arn
ReservedConcurrentExecutions: 5
Environment:
Variables:
LOGGING_LEVEL: INFO
APP_ENV: !Ref Environment
SOURCE_SQS: !Ref EmployeeQueue
TARGET_SQS: !Ref SaleQueue
DB: !Sub salespersons-${Environment}
DeadLetterQueue:
Type: SQS
TargetArn:
Fn::GetAtt: EmployeeFunctionDeadLetterQueue.Arn
Events:
StartScheduledEvent:
Type: Schedule
Properties:
# every minute
Schedule: rate(1 minute)
SaleCollector:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub sale-collector-${Environment}
Handler: lambda_function.lambda_handler
Runtime: python3.9
CodeUri: sales/
Description: updating sales info in our DynamoDB table
MemorySize: 128
Timeout: 900
ReservedConcurrentExecutions: 3
Role:
Fn::GetAtt:
- LambdaRole
- Arn
Environment:
Variables:
LOGGING_LEVEL: INFO
APP_ENV: !Ref Environment
SQS: !Ref SaleQueue
DB: !Sub sales-${Environment}
DeadLetterQueue:
Type: SQS
TargetArn:
Fn::GetAtt: SaleFunctionDeadLetterQueue.Arn
Events:
StartScheduledEvent:
Type: Schedule
Properties:
# every minute
Schedule: rate(1 minute)
# =========================================================================================
# AWS DynamoDB TABLES
# =========================================================================================
BranchDynamoDBTable:
Type: AWS::DynamoDB::Table
DeletionPolicy: Delete
Properties:
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
-
AttributeName: "branch_id"
AttributeType: "S"
KeySchema:
-
AttributeName: "branch_id"
KeyType: "HASH"
StreamSpecification:
StreamViewType: NEW_IMAGE
TableName: !Sub branch-${Environment}
SalespersonDynamoDBTable:
Type: AWS::DynamoDB::Table
DeletionPolicy: Delete
Properties:
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
-
AttributeName: "employee_id"
AttributeType: "S"
-
AttributeName: "branch_id"
AttributeType: "S"
KeySchema:
-
AttributeName: "employee_id"
KeyType: "HASH"
-
AttributeName: "branch_id"
KeyType: "RANGE"
StreamSpecification:
StreamViewType: NEW_IMAGE
TableName: !Sub salesperson-${Environment}
SaleDynamoDBTable:
Type: AWS::DynamoDB::Table
DeletionPolicy: Delete
Properties:
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
-
AttributeName: "sale_id"
AttributeType: "S"
-
AttributeName: "employee_id"
AttributeType: "S"
KeySchema:
-
AttributeName: "sale_id"
KeyType: "HASH"
-
AttributeName: "employee_id"
KeyType: "RANGE"
StreamSpecification:
StreamViewType: NEW_IMAGE
TableName: !Sub sale-${Environment}
# =========================================================================================
# AWS SQS QUEUES
# =========================================================================================
EmployeeQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub employee-queue-${Environment}
VisibilityTimeout: 900
RedrivePolicy:
deadLetterTargetArn:
Fn::GetAtt: EmployeeWorkloadDeadLetterQueue.Arn
maxReceiveCount: 10
EmployeeWorkloadDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub employee-workload-dead-letter-queue-${Environment}
MessageRetentionPeriod: 1209600
BranchFunctionDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub branch-function-dead-letter-queue-${Environment}
MessageRetentionPeriod: 1209600
SaleQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub sale-queue-${Environment}
VisibilityTimeout: 900
RedrivePolicy:
deadLetterTargetArn:
Fn::GetAtt: SaleWorkloadDeadLetterQueue.Arn
maxReceiveCount: 10
SaleWorkloadDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub sale-workload-dead-letter-queue-${Environment}
MessageRetentionPeriod: 1209600
EmployeeFunctionDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub employee-function-dead-letter-queue-${Environment}
MessageRetentionPeriod: 1209600
SaleFunctionDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub sale-function-dead-letter-queue-${Environment}
MessageRetentionPeriod: 1209600
# =========================================================================================
# AWS CLOUDWATCH ALARMS
# =========================================================================================
BranchErrorAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
ComparisonOperator: GreaterThanOrEqualToThreshold
Dimensions:
- Name: FunctionName
Value: !Ref BranchCollector
EvaluationPeriods: 1
MetricName: Errors
Namespace: AWS/Lambda
Period: 300
Statistic: Sum
Threshold: '1'
AlarmActions:
- arn:aws:sns:us-east-1:{id}:{alarm-action-name}
BranchDurationAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
ComparisonOperator: GreaterThanOrEqualToThreshold
Dimensions:
- Name: FunctionName
Value: !Ref BranchCollector
EvaluationPeriods: 1
MetricName: Duration
Namespace: AWS/Lambda
Period: 60
Statistic: Maximum
Threshold: '750000'
AlarmActions:
- arn:aws:sns:us-east-1:{id}:{alarm-action-name}
BranchThrottleAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
ComparisonOperator: GreaterThanOrEqualToThreshold
Dimensions:
- Name: FunctionName
Value: !Ref BranchCollector
EvaluationPeriods: 1
MetricName: Throttles
Namespace: AWS/Lambda
Period: 300
Statistic: Sum
Threshold: '1'
AlarmActions:
- arn:aws:sns:us-east-1:{id}:{alarm-action-name}
SalespersonErrorAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
ComparisonOperator: GreaterThanOrEqualToThreshold
Dimensions:
- Name: FunctionName
Value: !Ref SalespersonCollector
EvaluationPeriods: 1
MetricName: Errors
Namespace: AWS/Lambda
Period: 300
Statistic: Sum
Threshold: '1'
AlarmActions:
- arn:aws:sns:us-east-1:{id}:{alarm-action-name}
SalespersonDurationAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
ComparisonOperator: GreaterThanOrEqualToThreshold
Dimensions:
- Name: FunctionName
Value: !Ref SalespersonCollector
EvaluationPeriods: 1
MetricName: Duration
Namespace: AWS/Lambda
Period: 60
Statistic: Maximum
Threshold: '750000'
AlarmActions:
- arn:aws:sns:us-east-1:{id}:{alarm-action-name}
SalespersonThrottleAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
ComparisonOperator: GreaterThanOrEqualToThreshold
Dimensions:
- Name: FunctionName
Value: !Ref SalespersonCollector
EvaluationPeriods: 1
MetricName: Throttles
Namespace: AWS/Lambda
Period: 300
Statistic: Sum
Threshold: '1'
AlarmActions:
- arn:aws:sns:us-east-1:{id}:{alarm-action-name}
SaleErrorAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
ComparisonOperator: GreaterThanOrEqualToThreshold
Dimensions:
- Name: FunctionName
Value: !Ref SaleCollector
EvaluationPeriods: 1
MetricName: Errors
Namespace: AWS/Lambda
Period: 300
Statistic: Sum
Threshold: '1'
AlarmActions:
- arn:aws:sns:us-east-1:{id}:{alarm-action-name}
SaleDurationAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
ComparisonOperator: GreaterThanOrEqualToThreshold
Dimensions:
- Name: FunctionName
Value: !Ref SaleCollector
EvaluationPeriods: 1
MetricName: Duration
Namespace: AWS/Lambda
Period: 60
Statistic: Maximum
Threshold: '750000'
AlarmActions:
- arn:aws:sns:us-east-1:{id}:{alarm-action-name}
SaleThrottleAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
ComparisonOperator: GreaterThanOrEqualToThreshold
Dimensions:
- Name: FunctionName
Value: !Ref SaleCollector
EvaluationPeriods: 1
MetricName: Throttles
Namespace: AWS/Lambda
Period: 300
Statistic: Sum
Threshold: '1'
AlarmActions:
- arn:aws:sns:us-east-1:{id}:{alarm-action-name}
Q&A
Yes, I’m doing a Q&A session with myself. It’s usually helpful to challenge myself with "Why?" or "How?" when coding, by doing so I will have more confidence in solidifying each decision I made, as well as justifying every tool I used.
a. How do I monitor the functions?
I use CloudWatch alarms. CloudWatch alarms watch any available metric or the calculation of the metrics supported by AWS CloudWatch. They can conduct customized action(s) based on the metric or the calculation relative to the given threshold value within a specified period.
To me, it’s most pivotal to learn and relieve it as soon as an error happens. So I set up alarms towards all 3 functions with 1 error as the threshold, that’s to say, alarms will be pulled whenever there is an error. I want to recognize the errors without constantly keeping an eye on a CloudWatch dashboard, so the alarms’ actions are to push a notification to an SNS topic which forwards the alert to my email inbox.
If you are working in a collaborative environment, I suggest you extend the visibility by sending it to a Slack channel, distributing it to all addresses in a distribution list, or including it in a shared mailbox.
b. Why do you define the keys of the tables as they are?
It’s based on reality. Apparently, branches differentiate from each other so their IDs are sufficient to be the sole hash key in the branches’ table under the 1NF constraint. By contrast, both salespersons and sales tables take extra keys to be normalized.
Because in reality, a branch is likely to have multiple employees in the book, while an employee is allowed to transition from one branch to another, the relationship between branches and employees, from the perspective of data schemas, is many-to-many. Also, it is because of it that only a combination of sale record ID + salesperson ID + branch ID (the branch when the transaction took place) shall point to an exact record in the sales table. The bottleneck is a document-based database like DynamoDB allows as many as 2 attributes to serve as keys, I picked the salesperson ID as the sort key over the branch ID in favor of the certainty between a sales-record-salesperson relationship. The variance between sales and branches is going to be explained in the following question.
c. How do I establish the linkage between the sales and branches? And why?
The data vendor falls short in including branch information in the sales records. The cookie cutter to take care of it is to attach the branch ID from the very top (branch collector function) down to the end. This manner, nevertheless, omits some extreme scenarios. For instance, Jim Halpert placed a sale on his last day in the Scranton branch. owing to some technical issues, this record wasn’t appended to his sales record list or published on the API, until the second business day when his status had been preset to transfer as a Stamford worker.
It’s hard to sniff the mislabeling without any context especially when the root reason is from the vendor. From my experience, the debugging at this stage relies heavily on our feedback. This is why I let the branch ID in the sales table be a loose key-value pair; otherwise, it takes additional effort to remove and rewrite the item.
d. How do I trigger the salespersons and sales collector functions?
SQS queue is one of the invoking actions officially allowed by the Lambda function, and hence the natural choice to wake up these 2 functions since they are set to listen to queues already. I took a detour, to walk around the maximum visit cap imposed by the API owner. Should I let my functions pick up the messages and hit the API from the queues as soon as they come, there would be multiple functions processing the messages nearly at the same time, which rendered the pipelining architecture no longer functionable as it could easily exceed the API quota. With time schedulers set up every minute (I created 2 schedulers for each function), the processing frequency is declined from the millisecond level to the second level. In this way, the message traffic in the data Pipeline is mitigated.
e. How do I avoid repetitive operations?
It is almost impossible to tell whether the data collected is up-to-date or not without actually visiting the API, the source of truth. So we can’t reduce API visits, but instead, can do what I do in the last question to lower the chance that the API visit quota is exceeded.
If the destination of the dataflow is DynamoDB, it’s all set to fully upsert each record every single time when we receive it from the API. The horror is that our firehose stream from DynamoDB to S3 is short of bandwidth, which leads to a halt in transportation occasionally. In light of this fact, I insert a sanity check before an upsertion. This check compares each value of the record’s attributes with those recently withdrawn from the API. The existing record shall be overwritten unless it is totally unchanged.
Attached is the sanity check function:
def upToDate(table_name, condition, result, prefix):
"""
Check if a record in a given specified DynamoDB table is up-to-date, which means that it's no different from the API retrieval.
Args:
table_name (str): The name of the DynamoDB table to check.
condition (boto3.dynamodb.conditions.Key): The key condition expression for querying the table.
result (dict): The record to check for ingestion completion.
prefix (str): The prefix used for key matching.
Returns:
bool: True if the ingestion is completed, False otherwise.
Notes:
- The function queries the specified DynamoDB table using the provided condition.
- It retrieves the items matching the condition.
- The function compares the key-value pairs of the result with the retrieved items, accounting for the provided prefix if applicable.
- If all key-value pairs match between the result and the retrieved items, the ingestion is considered completed.
- The function returns True if the ingestion is completed, and False otherwise.
"""
table = dynamodb.Table(table_name)
retrieval = table.query(KeyConditionExpression=condition)["Items"]
existing_items = 0
if len(retrieval) > 0:
for key in retrieval.keys():
if key.upper() not in reserved_words:
if result[key] == retrieval[0].get(key):
existing_items += 1
elif result["key"] == retrieval[0].get(prefix + key):
existing_items += 1
completed = len(retrieval) and existing_items == len(result.items())
# len(retrieval) == 0: the item doesn't exist in DynamoDB at all
# existing_items == len(result.items()): the item exists and all its key-value pairs
# are synced up with API
return completed
Deployment
In each folder, do
pip install -r requirements.txt
And go back to the parent folder:
# copy utils.py to each folder
for d in */; do cp utils.py "$d"; done
# build the cloudformation
sam build -u
# invoke the functions locally for local testing
# event.json should be like:
# {
# "branches": ["Scranton"]
# }
# env.json should be like:
# {
# "Parameters": {
# "Environment": "local"
# }
# }
sam local invoke "BranchCollector" -e branch.json --env-vars env.json
sam local invoke "SalespersonCollector" -e branch.json --env-vars env.json
sam local invoke "SalesCollector" -e branch.json --env-vars env.json
# deploy it onto AWS
sam deploy --parameter-overrides Environment=dev
Final Work
Credits
- Diagrams as Code for all the tools that support concise visualizations
- The Office for a brilliant show, the laughter it brings, and the inspiration