The world’s leading publication for data science, AI, and ML professionals.

Automating ETL to SFTP Server Using Python and SQL

Learn how to automate a daily data transfer process on Windows, from PostgreSQL database to a remote server

Photo by Shubham Dhage on Unsplash
Photo by Shubham Dhage on Unsplash

The process of transfering files from one location to another is obviously a perfect candidate for Automation. It can be daunting to do repetitively, especially when you have to perform the entire ETL (Extract, Transform, Load) process for several groups of data.

Imagine your company has their data in their data warehouse, and then they decide to contract out part of their analytics to an external data analytics supplier. This supplier is offering a bespoke analytics software that will display dashboards and reports for the core poduction team of your company.

The implication of this is that, you, as the data engineer, will have have to transfer data to this supplier daily, hourly, every 30 minutes or any other frequency decided upon by the out-sourcing contract.

This article explains in detail this Etl process that includes an SFTP upload. We will incorporate Secure File Transfer Protocol (SFTP) which is a secure means of transfering files between two remote servers, by encrypting the files using what is known as the Secure Shell (SSH) protocol.

We will learn the process of obtaining and transferring such data files using the following detailed steps:

  1. Data Extraction: Write SQL scripts to extract specific data, in this case, sales, donations and labor data, from a PostgreSQL database in a data warehouse. We will then embed these SQL scripts into our Python code using the psycopg2 library.
  2. Data Transformation: Transform the data, based on the expectations of the external supplier, using Python.
  3. Data Loading: Upload the data into the remote server of the external supplier, still using Python.
  4. Schedule the daily run using Windows Task Scheduler.

First of all, let us look at the nature of the files that the external supplier’s system is expecting to receive.

Data specifications as requested by the external analytics supplier

In my current example, here are the file requirements:

  • Separate CSV files for each of sales, number of store donations, number of ‘ADC’ donations and labor hours data, to be transferred to them once daily at 6:00am. The files transferred should cover the complete data generated for the previous day.
  • Templates for these CSV files were provided to ensure the right columns are extracted and encoded with UTf-8.
  • The name of each file should include a certain string plus the previous day’s date in the following format:
Image by author
Image by author

1. Data Extraction

Photo by Luke Porter on Unsplash
Photo by Luke Porter on Unsplash

The first step is to go into your PostgresSQL database and write SQL scripts that will be saved in the project folder. In the current case, let’s name the scripts: wesa_sales.sql, wesa_donors_stores.sql, wesa_donors_adc.sql and wesa_labor.sql. These scripts will extract the required sales, store donations, ‘ADC’ donations and labor data respectively. ‘Store’ and ‘ADC’ are the two location types for the business in this example.

The complexity of your SQL scripts will depend on the kind of data being extracted, how they were stored in the database, and the requirements of the expected CSV files. In my current example, here are my SQL scripts:

-- Extract sales data for previous day
DROP TABLE IF EXISTS public.sales1;
DROP TABLE IF EXISTS public.sales2;

SELECT 
   a.balance_start_dt AS "Date",
 CAST(a.location_id AS TEXT) AS "StoreNo",
 'All' as "Cashier",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Accessories' THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "AccessoriesUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Accessories'::text THEN a.value ELSE 0::numeric END) AS "AccessoriesSales",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Jewellery'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "JewelryUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Jewellery'::text THEN a.value ELSE 0::numeric END) AS "JewelrySales",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Books'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "BookUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Books'::text THEN a.value ELSE 0::numeric END) AS "BookSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'AV'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "AVUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'AV'::text THEN a.value ELSE 0::numeric END) AS "AVSales",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Electrical'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "ElectricalUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Electrical'::text THEN a.value ELSE 0::numeric END) AS "ElectricalSales",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Furniture'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "FurnitureUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Furniture'::text THEN a.value ELSE 0::numeric END) AS "FurnitureSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Outlet Furniture'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "OutletFurnitureUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Outlet Furniture'::text THEN a.value ELSE 0::numeric END) AS "OutletFurnitureSales",
 sum(CASE WHEN a.pos_dept_desc = 'Outlet'::text AND a.pos_sub_dept_desc != 'Outlet Furniture'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "OutletUnits",
    sum(CASE WHEN a.pos_dept_desc = 'Outlet'::text AND a.pos_sub_dept_desc != 'Outlet Furniture'::text THEN a.value ELSE 0::numeric END) AS "OutletSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Footwear'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "ShoeUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Footwear'::text THEN a.value ELSE 0::numeric END) AS "ShoeSales",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Women'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "WomenUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Women'::text THEN a.value ELSE 0::numeric END) AS "WomenSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Men'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "MenUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Men'::text THEN a.value ELSE 0::numeric END) AS "MenSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Children'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "ChildrenUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Children'::text THEN a.value ELSE 0::numeric END) AS "ChildrenSales",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Boutique'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "BoutiqueUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Boutique'::text THEN a.value ELSE 0::numeric END) AS "BoutiqueSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Linens'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "LinenUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Linens'::text THEN a.value ELSE 0::numeric END) AS "LinenSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Collectibles'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "CollectiblesUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Collectibles'::text THEN a.value ELSE 0::numeric END) AS "CollectiblesSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Sporting Goods'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "SportingGoodsUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Sporting Goods'::text THEN a.value ELSE 0::numeric END) AS "SportingGoodsSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Toys'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "ToysUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Toys'::text THEN a.value ELSE 0::numeric END) AS "ToysSales",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Wares'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "WaresUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Wares'::text THEN a.value ELSE 0::numeric END) AS "WaresSales",
 sum(CASE WHEN a.pos_sub_dept_desc = ANY (ARRAY['Seasonal'::text, 'Christmas'::text, 'Halloween%'::text, 'Back to School'::text]) THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "SeasonalUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = ANY (ARRAY['Seasonal'::text, 'Christmas'::text, 'Halloween%'::text, 'Back to School'::text]) THEN a.value ELSE 0::numeric END) AS "SeasonalSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Share The Good'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "ShareTheGoodUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Share The Good'::text THEN a.value ELSE 0::numeric END) AS "ShareTheGoodSales",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Events'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "EventsUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Events'::text THEN a.value ELSE 0::numeric END) AS "EventsSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Commercial Services'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "Commercial-ICUnits",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Commercial Services'::text THEN a.value ELSE 0::numeric END) AS "Commercial-ICSales",
 sum(CASE WHEN a.pos_sub_dept_desc = 'Gift Card'::text THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "NoGiftCardsIssued",
    sum(CASE WHEN a.pos_sub_dept_desc = 'Gift Card'::text THEN a.value ELSE 0::numeric END) AS "TotalGiftCardsIssued",
 sum(CASE WHEN a.pos_dept_desc = 'Donation'::text THEN CAST(a.qty AS INTEGER) ELSE 0 END) AS "ChangeRoundupUnits",
    sum(CASE WHEN a.pos_dept_desc = 'Donation'::text THEN a.value ELSE 0::numeric END) AS "ChangeRoundup",
 sum(CASE WHEN a.pos_sub_dept_desc ~~ ANY (ARRAY['20LB%'::text, 'Dept%'::text, 'Mask%'::text]) OR a.pos_dept_desc IS NULL THEN CAST(a.qty AS INTEGER)ELSE 0 END) AS "OtherUnits",
    sum(CASE WHEN a.pos_sub_dept_desc ~~ ANY (ARRAY['20LB%'::text, 'Dept%'::text, 'Mask%'::text]) OR a.pos_dept_desc IS NULL THEN a.value ELSE 0::numeric END) AS "OtherSales",
 0::numeric AS "TotalTax",
 sum(CASE WHEN CAST(a.location_id AS TEXT) != '' THEN a.value ELSE 0::numeric END) AS "DebugTotalSales"
 into sales1
   FROM dw.pos_upc_sales_bal_v  AS a
  WHERE a.balance_start_dt = (CURRENT_DATE - '1 day'::interval) 
  GROUP BY a.location_id, a.balance_start_dt;

SELECT 
 sum(CASE WHEN c.tally_id = '3102'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "TransactionCount",
 sum(CASE WHEN c.tally_id = '130'::text THEN CAST(c.qty AS INTEGER) ELSE 0 END) AS "NoVISATransactions",
    sum(CASE WHEN c.tally_id = '130'::text THEN c.value ELSE 0::numeric END) AS "TotalVISACharges",
 sum(CASE WHEN c.tally_id = ANY (ARRAY['131'::text, '104'::text]) THEN CAST(c.qty AS INTEGER) ELSE 0 END) AS "NoMastercardTransactions",
    sum(CASE WHEN c.tally_id = ANY (ARRAY['131'::text, '104'::text]) THEN c.value ELSE 0::numeric END) AS "TotalMastercardCharges",
 sum(CASE WHEN c.tally_id = '132'::text THEN CAST(c.qty AS INTEGER) ELSE 0 END) AS "NoAMEXTransactions",
    sum(CASE WHEN c.tally_id = '132'::text THEN c.value ELSE 0::numeric END) AS "TotalAMEXCharges",
 sum(CASE WHEN c.tally_id = '133'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoDiscoverTransactions",
    sum(CASE WHEN c.tally_id = '133'::text THEN c.value ELSE 0::numeric END) AS "TotalDiscoverCharges",
 sum(CASE WHEN c.tally_id = '103'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoDebitTransactions",
    sum(CASE WHEN c.tally_id = '103'::text THEN c.value ELSE 0::numeric END) AS "TotalDebitCharges",
 0 AS "NoE-CheckTransactions",
 0 AS "TotalofE-ChecksCollected",
 sum(CASE WHEN c.tally_id = '102'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoPaperCheckTransactions",
    sum(CASE WHEN c.tally_id = '102'::text THEN c.value ELSE 0::numeric END) AS "TotalPaperChecksCollected",
 sum(CASE WHEN c.tally_id = '135'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoStoreCreditsRedeemed",
    sum(CASE WHEN c.tally_id = '135'::text THEN c.value ELSE 0::numeric END) AS "TotalStoreCreditsRedeemed",
 sum(CASE WHEN c.tally_id = ANY (ARRAY['124'::text, '134'::text]) THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoGiftCardsRedeemed",
    sum(CASE WHEN c.tally_id = ANY (ARRAY['124'::text, '134'::text]) THEN c.value ELSE 0::numeric END) AS "TotalGiftCardsRedeemed",
 sum(CASE WHEN c.tally_id = ANY (ARRAY['117'::text, '129'::text]) THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoGiftCertificatesRedeemed",
    sum(CASE WHEN c.tally_id = ANY (ARRAY['117'::text, '129'::text]) THEN c.value ELSE 0::numeric END) AS "TotalGiftCertificatesRedeemed",
 sum(CASE WHEN c.tally_id = ANY (ARRAY['153'::text, '125'::text]) THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoVouchersRedeemed",
    sum(CASE WHEN c.tally_id = ANY (ARRAY['153'::text, '125'::text]) THEN c.value ELSE 0::numeric END) AS "TotalVouchersRedeemed",
 sum(CASE WHEN c.tally_id = '3303'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoOtherCardTransactions", --same as GoodRewardsDiscount, but this is a tender
    sum(CASE WHEN c.tally_id = '3303'::text THEN c.value ELSE 0::numeric END) AS "TotalOtherCardCharges",
 sum(CASE WHEN c.tally_id = '101'::text THEN c.value ELSE 0::numeric END) AS "ExpectedCash",
 sum(CASE WHEN c.tally_id = '101'::text THEN c.value ELSE 0::numeric END) AS "TotalCashDeposit", --hand-counted cash
 0 AS "TotalChecksDeposit",
 sum(CASE WHEN c.tally_id = '1007'::text THEN c.value ELSE 0::numeric END) AS "PettyCash",
 sum(CASE WHEN c.tally_id = '3303'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoGoodRewardsDiscount",
    sum(CASE WHEN c.tally_id = '3303'::text THEN c.value*-1 ELSE 0::numeric END) AS "TotalGoodRewardsDiscount",
 sum(CASE WHEN c.tally_id = '3343'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoTMDiscount",
    sum(CASE WHEN c.tally_id = '3343'::text THEN c.value*-1 ELSE 0::numeric END) AS "TotalTMDiscount",
 sum(CASE WHEN c.tally_id = '3328'::text THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoSeniorsDiscount",
    sum(CASE WHEN c.tally_id = '3328'::text THEN c.value*-1 ELSE 0::numeric END) AS "TotalSeniorsDiscount",
 sum(CASE WHEN c.tally_id = ANY (ARRAY['3301'::text, '3302'::text]) THEN CAST(c.qty AS INTEGER)ELSE 0 END) AS "NoOtherDiscount",
    sum(CASE WHEN c.tally_id = ANY (ARRAY['3301'::text, '3302'::text]) THEN c.value*-1 ELSE 0::numeric END) AS "TotalOtherDiscount",
 c.location_id AS "StoreNo2",
    c.balance_start_dt AS "Date2"
 into sales2
   FROM dw.pos_clk_sales_bal AS c
  WHERE c.balance_start_dt = (CURRENT_DATE - '1 day'::interval) 
  GROUP BY c.location_id, c.balance_start_dt;

 SELECT * FROM
 sales1
 JOIN
 sales2
ON sales1."StoreNo" = sales2."StoreNo2" AND sales1."Date" = sales2."Date2"
;
-- Extract store donations data for previous day
SELECT DISTINCT gia_locations.location_name AS "StoreName",
    don_counts.don_dt AS "Date",
    sum(don_counts.count) AS "GGCDonors",
    0 AS "TextilesHung",
    0 AS "TextilesRotated",
   FROM dw.don_counts
     LEFT JOIN dw.gia_locations ON don_counts.location_id = gia_locations.location_id
  WHERE don_counts.dedupe_flg = 'Y'::text AND (don_counts.count_type <> ALL (ARRAY['Total'::text, 'Hr Sum'::text, 'Vol. Hrs'::text])) 
  AND gia_locations.location_type = 'R'::text 
  AND don_counts.don_dt = (CURRENT_DATE - '1 day'::interval)
  AND gia_locations.current_version_flg = 'Y'::text
  GROUP BY gia_locations.location_name, don_counts.don_dt;
-- Extract ADC donations data for previous day
SELECT DISTINCT 
 replace(replace(don_counts.location_id, '010', '1000'),'090','9000') AS "Store_no",
    don_counts.don_dt AS "PostingDate",
    sum(don_counts.count) AS "No_of_ADC_Donors"
FROM dw.don_counts
LEFT JOIN dw.gia_locations ON don_counts.location_id = gia_locations.location_id
WHERE don_counts.dedupe_flg = 'Y'::text 
 AND (don_counts.count_type <> ALL (ARRAY['Total'::text, 'Hr Sum'::text, 'Vol. Hrs'::text])) 
 AND (gia_locations.location_type = ANY (ARRAY['D'::text, 'W'::text, 'I'::text, 'B'::text])) 
 AND don_counts.don_dt = (CURRENT_DATE - '1 day'::interval)
 AND gia_locations.current_version_flg = 'Y'::text
GROUP BY don_counts.location_id, don_counts.don_dt;
-- Extract labor data for previous day
SELECT gia_locations.location_name AS "Department Description",
  EXTRACT(month FROM emp_hou_bal.balance_start_dt::date) ||
  '/' ||
  EXTRACT(day FROM emp_hou_bal.balance_start_dt::date) ||
  '/' ||
  EXTRACT(year FROM emp_hou_bal.balance_start_dt::date)
  AS "Date",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = ANY (ARRAY['REG'::text, 'TRNG'::text]) THEN emp_hou_bal.hours
            ELSE 0::numeric
        END) AS "Regular Hours",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = ANY (ARRAY['REG'::text, 'TRNG'::text]) THEN emp_hou_bal.earnings
            ELSE 0::numeric
        END) AS "Regular Earnings",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = 'OT'::text THEN emp_hou_bal.hours
            ELSE 0::numeric
        END) AS "Overtime Hours",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = 'OT'::text THEN emp_hou_bal.earnings
            ELSE 0::numeric
        END) AS "Overtime Earnings",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = 'BRVMT'::text THEN emp_hou_bal.hours
            ELSE 0::numeric
        END) AS "Bereav Hours",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = 'BRVMT'::text THEN emp_hou_bal.earnings
            ELSE 0::numeric
        END) AS "Bereav Earnings",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = ANY (ARRAY['STATH'::text, 'STATS'::text, 'STATW'::text]) THEN emp_hou_bal.hours
            ELSE 0::numeric
        END) AS "Holiday Hours",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = ANY (ARRAY['STATH'::text, 'STATS'::text, 'STATW'::text]) THEN emp_hou_bal.earnings
            ELSE 0::numeric
        END) AS "Holiday Earnings",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = ANY (ARRAY['VACH'::text, 'VAC'::text, 'WELL'::text, 'WELL-Covid'::text, 'PDSUS'::text, 'EBKTK'::text, 'PBKTK'::text,'PDLOA'::text]) THEN emp_hou_bal.hours
            ELSE 0::numeric
        END) AS "PTO Hours",
    sum(
        CASE
            WHEN emp_hou_bal.paycode_name = ANY (ARRAY['VACH'::text, 'VAC'::text, 'WELL'::text, 'WELL-Covid'::text, 'PDSUS'::text, 'EBKTK'::text, 'PBKTK'::text,'PDLOA'::text]) THEN emp_hou_bal.earnings
            ELSE 0::numeric
        END) AS "PTO Earnings",
    0 AS "FMLA Hours",
    0 AS "FMLA Earnings"
   FROM dw.emp_hou_bal
     LEFT JOIN dw.gia_locations ON emp_hou_bal.location_id = gia_locations.location_id
  WHERE emp_hou_bal.balance_start_dt = (CURRENT_DATE - '1 day'::interval) AND gia_locations.current_version_flg = 'Y'::text
  AND emp_hou_bal.paycode_name !~~ 'Analytics%'::text AND emp_hou_bal.paycode_name != 'Worked Hours'::text 
  AND (emp_hou_bal.paycode_name <> ALL (ARRAY['$WKND'::text, 'LATE'::text, 'NPD S'::text, 'NPD V'::text, 'NPDLV'::text, 'NOSHW'::text, 'NPWCB'::text, 'NPSUS'::text, 
            'FLEX'::text, 'FLEX TK'::text, 'Un-Approved OT'::text, 'EDUC'::text, 'VACPO'::text, 'VACPT'::text, 'RETRO'::text, 'Worked Hours'::text]))
  AND emp_hou_bal.fin_dept_id = ANY (ARRAY['500'::text, '528'::text, '501'::text])
  AND emp_hou_bal.volunteer = 'N'
  GROUP BY gia_locations.location_name, emp_hou_bal.balance_start_dt;

Once SQL scripts are completed, create a project folder. In this folder, create a sub-folder called sql_queries, then save the SQL scripts in the latter.

2. Data Transformation

Photo by Chris Lawton on Unsplash
Photo by Chris Lawton on Unsplash

Let’s jump into Python to carry out the rest of the ETL process.

Open the project folder with any code editor of your choice. I am using VS Code. Here, create 4 more files: main.py, emailing.py, .env, requirements.txt.

Your project folder structure should now look like this:

Project-Folder/
├── sql_queries/
│   ├── wesa_sales.sql
│   ├── wesa_donor_stores.sql
    ├── wesa_donor_adc.sql
    ├── wesa_labor.sql
├── .env
├── emailing.py
├── main.py
└── requirements.txt

Installing dependencies

To isolate this project’s libraries from your computer and avoid any future interference, it is advisable to create a virtual environment where all the dependencies pertaining to the project will be installed.

In your code editor, select your Python interpreter and in the terminal, create and activate a virtual environment by running the two lines below. My environment name is wesa_env and I’m using Windows Command Prompt.

python -m venv wesa_env

wesa_envScriptsactivate

Enter the following libraries into your requirenments.txt file and then go back to your terminal (command prompt) and install them into your virtual environment by running the next line.

pandas==2.2.1
psycopg2==2.9.9
paramiko==3.4.0
python-dotenv
python -m pip install -r requirements.txt

Helper function

In emailing.py, write a code that sends an alert email to you once an SFTP upload fails. The code below creates an email message with the error details and sends it to you using Gmail’s SMTP server and your credentials. Guidance to access Gmail’s SMTP server will be supplied further down in the article.

# import libraries
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import smtplib
import os
from dotenv import load_dotenv

load_dotenv() # this loads all the environment variables
#define the send email function
def send_alert_email(error_message):
    # Email configuration
    sender_email = os.getenv("SENDER")
    receiver_email = os.getenv("RECEIVER")
    password = os.getenv("PWD_EMAIL")
    # Email content
    message = MIMEMultipart()
    message['From'] = sender_email
    message['To'] = receiver_email
    message['Subject'] = "SFTP Upload Failed"
    body = f"Error occurred during SFTP upload:n{error_message}"
    message.attach(MIMEText(body, 'plain'))
    # Connect to SMTP server and send email
    with smtplib.SMTP('smtp.gmail.com', 587) as server:
        server.starttls()
        server.login(sender_email, password)
        server.sendmail(sender_email, receiver_email, message.as_string())
    print("Alert email sent")

Importing the libraries to your main Python script

In your main.py file, import the required libraries with the code block below:

import csv
from datetime import datetime, date, timedelta
import os
import emailing # this is in the helper module will be created above
import psycopg2
import paramiko
import pandas as pd
from dotenv import load_dotenv

I will explain the purpose of three of the above libraries:

  • Psycopg2 is a PostgreSQL database adapter to Python. It allows the Python program to connect with the database and execute SQL queries.
  • Paramiko is used to connect to the remote server through SSH.
  • dotenv is used to load environment variables stored in the .env file.

Authenticating the servers

Since we require access to the PostgreSQL server, the remote SFTP server of the supplier and Gmail’s SMTP server for error email alerts, for privacy purposes we’ll store all credentials as environment variables in our .env file as follows:

# Postgres database credentials
SERVER="enter server name"
DATABASE="enter database name"
UID_DB="enter your username"
PWD_DB="enter your password"

# supplier's FTP server credentials. These will be provided to you by the supplier
HOST_FTP ="enter host name"
USER_FTP="enter your username"
PWD_FTP="enter your password"
PORT_FTP="enter the port number"

# Your credentials for Gmail SMPT server
SENDER="enter your gmail address: [email protected]"
RECEIVER="enter the email address that will receive the alert"
PWD_EMAIL="enter your generated app password from your google account"

You can click on this link to learn how to generate an app password for GMAIL’s SMTP server.

Next, in your main.py, load the environment variables and fetch the database credentials and the remote server’s SSH credentials.

load_dotenv() # this loads all the environment variables

# Get your database credentials from environment variables
SERVER_NAME=os.getenv("SERVER")
DATABASE_NAME=os.getenv("DATABASE")
UID=os.getenv("UID_DB")
PWD=os.getenv("PWD_DB")
PORT="5432"

# Get remote server SSH credentials from environment variables
HostName = os.getenv("HOST_FTP")
UserName = os.getenv("USER_FTP")
PassWord = os.getenv("PWD_FTP")
Port = os.getenv("PORT_FTP")

The following code block will create a connection to the PostgreSQL database, execute each of the SQL queries in the sql_queries folder, save the data in a CSV file and convert the CSV file into a Pandas dataframe for cleaning.

After cleaning, the dataframe will be written back into the CSV file. Each of the resulting CSV files will be named as required by the supplier and saved in a temporary folder – temp.

It will then close the connection to the database.

Please note the descriptions attached to each line of code.

#Define path to local directory
WORK_SPACE = r"temp"

# Connect to the Postgresql database
    try:
        conn = psycopg2.connect(database=DATABASE_NAME, host=SERVER_NAME, user=UID, password=PWD, port=PORT)
        print("Database connected successfully")
    except:
        print("database not connected")

        # Create a cursor
    cur = conn.cursor()

    # Fetch the SQL queries from the sub-folder
    for filename in os.listdir("sql_queries"):
        sql_query_path = os.path.join("sql_queries", filename)
        # Read the SQL query from the file
        with open(sql_query_path, 'r') as file:
                sql_query = file.read()

        # Execute the SQL query
        cur.execute(sql_query)

        # Fetch all rows from the query result
        rows = cur.fetchall()

        # Specify the filename for the sales CSV file and dump the rows in it
        if "sales" in sql_query_path:
            csv_file_path = os.path.join(WORK_SPACE, ("WESA_SALE_NEW_" + str(((date.today())-timedelta(days = 1)).strftime("%Y%m%d")) + ".csv"))
            # Write the rows to a CSV file
            with open(csv_file_path, 'w', newline='') as f:
                # Create a CSV writer object
                writer = csv.writer(f)
                # Write the header (column names)
                writer.writerow([desc[0] for desc in cur.description])
                # Write the data rows
                writer.writerows(rows)

            # Clean the sales csv file: delete last 2 columns, add leading zeros to 'StoreNo' column and create the 'Credits' column
            data = pd.read_csv(csv_file_path)
            data = data.iloc[:, :-2]
            data['StoreNo'] = data['StoreNo'].astype(str).apply(lambda x: x.zfill(3))
            data['Credits'] = -(data['ShareTheGoodSales'] + data['TotalGiftCardsIssued'] + data['ChangeRoundup'] + data['TotalStoreCreditsRedeemed']).round(2)
            data.to_csv(csv_file_path, sep=',', encoding='utf-8', index=False)

        # Specify the filename for the store donations CSV file and dump the rows in it
        if "stores" in sql_query_path:
            csv_file_path2 = os.path.join(WORK_SPACE, ("WESA_DONR_NEW_" + str(((date.today())-timedelta(days = 1)).strftime("%Y%m%d")) + ".csv"))
            # Write the rows to a CSV file
            with open(csv_file_path2, 'w', newline='') as f:
                # Create a CSV writer object
                writer2 = csv.writer(f)
                # Write the header (column names)
                writer2.writerow([desc[0] for desc in cur.description])
                # Write the data rows
                writer2.writerows(rows)

            # Clean the store donations csv file: round the GGCDonors column
            data = pd.read_csv(csv_file_path2)
            data['GGCDonors'] = data['GGCDonors'].round(0).astype(int)
            data.to_csv(csv_file_path2, sep=',', encoding='utf-8', index=False)   

        # Specify the filename for the ADC donations CSV file and dump the rows in it
        if "donors_adc" in sql_query_path:
            csv_file_path3 = os.path.join(WORK_SPACE, ("WESA_ADC_" + str(((date.today())-timedelta(days = 1)).strftime("%Y%m%d")) + ".csv"))
            # Write the rows to a CSV file
            with open(csv_file_path3, 'w', newline='') as f:
                # Create a CSV writer object
                writer3 = csv.writer(f)
                # Write the header (column names)
                writer3.writerow([desc[0] for desc in cur.description])
                # Write the data rows
                writer3.writerows(rows)

            # Clean the ADC donations CSV file: round the No_of_ADC_Donors column
            data = pd.read_csv(csv_file_path3)
            data['No_of_ADC_Donors'] = data['No_of_ADC_Donors'].round(0).astype(int)
            data.to_csv(csv_file_path3, sep=',', encoding='utf-8', index=False)   

        # Specify the filename for the labor CSV file and dump the rows in it
        if "labor" in sql_query_path:
            csv_file_path4 = os.path.join(WORK_SPACE, ("Labor_Payroll." + str(((date.today())-timedelta(days = 1)).strftime("%Y%m%d%H%M%S")) + ".csv"))
            # Write the rows to a CSV file
            with open(csv_file_path4, 'w', newline='') as f:
                # Create a CSV writer object
                writer4 = csv.writer(f)
                # Write the header (column names)
                writer4.writerow([desc[0] for desc in cur.description])
                # Write the data rows
                writer4.writerows(rows)

            # Clean the labor CSV file: Rename some values in the Department Description column
            data = pd.read_csv(csv_file_path4)
            data['Department Description'] = data['Department Description'].replace({'Impact Centre Eic':'Edmonton Outlet Store', 'Impact Centre Cic':'Calgary Outlet Store'})
            data.to_csv(csv_file_path4, sep=',', encoding='utf-8', index=False)

    print("All 4 files created")

    # Close the cursor and connection
    cur.close()
    conn.close()

3. Data Upload

Photo by Claudio Schwarz on Unsplash
Photo by Claudio Schwarz on Unsplash

In the same main.py, continue the code with transferring the files to the supplier’s remote server as below.

The code will specify the destination directory, do an Secure Shell connection into the remote server using an SFTP client, loop through all the files in the temp folder, transfer them to the remote directory and then close the connection.

# Data transfer from temp folder to WESA remote Server

    # Specify the remote directory
    remote_directory = "/RemoteFTP-NEW"

    # create ssh client and connect to remote server
    SSH_Client= paramiko.SSHClient()
    SSH_Client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    SSH_Client.connect( hostname=HostName, port=Port, username=UserName,
                    password= PassWord)

    # create an SFTP client object for the connection
    sftp_Client = SSH_Client.open_sftp()

    # transfer files to the remote server
    #loop through all files in the local directory and upload only files to the remote directory
    for file_name in os.listdir(WORK_SPACE):
        local_file_path = os.path.join(WORK_SPACE, file_name)
        if os.path.isfile(local_file_path):
            remote_file_path = os.path.join(remote_directory, file_name)
            sftp_Client.put(local_file_path, remote_file_path)

    print("All 4 files transferred")

    # close the connection
    sftp_Client.close()
    SSH_Client.close()

Error Handling

The last two code blocs could be placed in a try-catch bloc so that in case of any errors, an exception report will be sent to you using an email alert. Sending this alert will be made possible by the emaling helper function, which was created further up in the article.

try:
  # insert the lines above from 'Connecting to the Postgresql database' to 'closing the SSH client'

except Exception as e:
    emailing.send_alert_email(str(e))  # Sending an email alert
    print("Upload failed:", str(e))

Finally, in the same main.py, add the following lines to empty the temp folder so that it will be ready to receive new files the next day.

#Empty the temporary local folder
for file_name in os.listdir(WORK_SPACE):
    local_file_path = os.path.join(WORK_SPACE, file_name)
    if os.path.isfile(local_file_path):
        os.remove(local_file_path)

4. Task scheduling

Photo by Homa Appliances on Unsplash
Photo by Homa Appliances on Unsplash

Using Windows Task Scheduler, schedule the main.py Python file to run at 6:00 am daily.

When necessary, you can use WinSCP to view and ensure that the transferred files are actually sitting in the remote server. However, this is optional, and not part of this ETL process.

Caution

Please note that the entire ETL process above applies to Windows users only. If you are a Mac or Linux user, you will require alternative ways to go about some of the steps.

Conclusion

To conclude, this article demonstrates the power of automation in streamlining data transfer processes by leveraging SQL and Python libraries.

From extracting data from a PostgreSQL database and cleaning the data to securely transferring it through SFTP, the automated system significantly reduces both manual effort and human error.

The integration of a reliable email alert system through Gmail’s SMTP server further ensures that any issues in the file transfer process are promptly addressed.

Scheduling the script to run daily ensures consistent data updates, making this solution a dependable data pipeline.


Before I forget

If you like what you just read, please could you click my ‘Follow’ button here on medium or on LinkedIn, push in some claps, highlight what catches your attention or better still, throw in some comments? Any or all of those will be greatly appreciated.


Related Articles