
"An ounce of prevention is worth a pound of cure."
― Benjamin Franklin
Introduction
In Monitor tab in Azure Data Factory can be seen a lot of information about the status of all the executions. It’s an easy and graphical way to check if something has failed and the point in which it has failed.
The problem comes when, apparently, nothing is wrong with the pipelines but yes with the data (EX: Delta with less data than usual, huge structure change…). This type of information cannot be obtained from only monitoring your pipelines. This is why it is necessary to build a specific monitoring system that stores specific information on each of these executions.
Why do you need custom logs for your data pipelines?
- Troubleshooting. You can track when the problems with a table started and replicate the environment to study the problem.
- Statistics. Being able to solve questions about your meta-store with simple SQL queries: how many rows do you process by day? What is the size of your tables? How many jobs do you execute each hour?
- Monitoring. You can easily see the status of all your executions or you can implement Cron jobs to constantly monitor several metrics in order to raise alerts.
- Data Quality detection with simple metrics (table timestamp, number of rows, structure change…).
Example of structure of a logging table:
CREATE TABLE logs.EXAMPLE_LOG_TABLE (
ID bigint IDENTITY(1,1) NOT NULL,
ID_TRACK_PROCESSING bigint NOT NULL,
TABLE_NAME nvarchar(MAX) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL,
SCHEMA_NAME nvarchar(50) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL,
PRIMARY_KEYS nvarchar(MAX) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
STATUS nvarchar(10) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL,
TIME_TAKEN bigint NOT NULL,
RUN_DT datetime NOT NULL,
CREATED_BY_ID nvarchar(50) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL,
CREATED_TS datetime NOT NULL,
DATABRICKS_JOB_URL nvarchar(MAX) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
DATAFACTORY_PIPELINE_URL nvarchar(MAX) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
LAST_DSTS nvarchar(20) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
LIVE_ROWS nvarchar(100) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
REPLICATION_ROWS nvarchar(20) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
DELTA_VERSION nvarchar(10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
COLUMNS nvarchar(MAX) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
CONSTRAINT PK__EX_LOG__YYY432 PRIMARY KEY (ID)
)
- ID: Id of the log. This will be an auto generated value (See Constraint).
- ID_TRACK_PROCESSING: Id (in track_processing table) of the table to ingest that triggered the execution of the job.
- SCHEMA_NAME & TABLE_NAME: Schema and table name of the table being inserted/processed.
- PRIMARY_KEYS: In case that the table has Primary Keys and these are being used to perform the Merge.
- STATUS: Process status (Success or Failed).
- RUN_DT: Timestamp of when the job was started.
- TIME_TAKEN: Time needed by the job to finish.
- CREATED_BY_ID: To identify the tool that created the log (Azure Data Factory in our example).
- CREATED_TS: Timestamp of when the log was created.
- DATABRICKS_JOB_URL: URL in which the code and stages of every step of the execution can be found.
- DATAFACTORY_JOB_URL: URL of the ADF pipeline that identified the job as finished.
- LAST_DSTS: Latest timestamp of the table.
- LIVE_ROWS: Number of rows of the table after the execution of the job.
- REPLICATION_ROWS: Number of rows inserted/processed in the latest execution (If FULL LOAD, it will be equal than LIVE_ROWS).
- DELTA_VERSION: Databricks Delta version of the table after the ingestion job.
- COLUMNS: Structure (column names and types) of the table after the ingestion job.
You can use whichever fields you think that could be useful for your use case. The fields that I selected where the ones that were needed in my context.
How to generate logs
Every data job should generate metadata about the process that it just completed. Following the example described in one of my other articles:
How to orchestrate Databricks jobs from Azure Data Factory using Databricks REST API

Once the job its identified as finished (Successful or Failed) an activity that will collect data from the execution and write it to its corresponding log table will be executed (log generation):

Query used:
INSERT INTO logs.EXAMPLE_LOG_TABLE
(ID_TRACK_PROCESSING, TABLE_NAME, SCHEMA_NAME, PRIMARY_KEYS, STATUS, TIME_TAKEN, RUN_DT, CREATED_BY_ID, CREATED_TS, DATABRICKS_JOB_URL, DATAFACTORY_PIPELINE_URL, LAST_DSTS, LIVE_ROWS, REPLICATION_ROWS, DELTA_VERSION, COLUMNS)
VALUES(
-- Data from the table that triggers the execution.
@{item().ID},
'@{item().TABLE_NAME}',
'@{item().SCHEMA_NAME}',
'@{item().PRIMARY_KEY}',
'SUCCESS',
-- Statistics and metadata of the execution.
@{activity('Check job status').output.metadata.execution_duration}/1000,
DATEADD(SECOND , -@{activity('Check job status').output.metadata.execution_duration}/1000,CURRENT_TIMESTAMP),
'AZURE DATA FACTORY',
CURRENT_TIMESTAMP,
'@{activity('Check job status').output.metadata.run_page_url}',
'https://adf.azure.com/monitoring/pipelineruns/@{pipeline().RunId}...',
-- Output from the execution.
'@{split(activity('Check job status').output.notebook_output.result,'|')[0]}', --Table timestamp
'@{split(activity('Check job status').output.notebook_output.result,'|')[1]}', --Table nº of rows
'@{split(activity('Check job status').output.notebook_output.result,'|')[2]}', --Rows ingested/updated in the job
'@{split(activity('Check job status').output.notebook_output.result,'|')[3]}', --Databricks Delta version.
'@{split(activity('Check job status').output.notebook_output.result,'|')[4]}' --Schema of the table.
); SELECT 1+1
Logs are generated with:
- Data from the table that triggers the execution.
- Statistics and metadata of the execution.
- Output from the execution.
To extract an output from the executions and, as Databricks is being used as the core processing tool, the latest command executed in the data job will be:
dbutils.notebook.exit(string)
Not all the processes have to write new entries to the same logs table, different processes could have different needs and different information to store in its logs tables could be needed. If you want to write all your logs to the same table, then a good option is to add a new field to identify the process that has generated them.
Data Catalog creation using logs
"Simplicity is a great virtue but it requires hard work to achieve it and education to appreciate it. And to make matters worse: complexity sells better."
― Edsger Wybe Dijkstra
The Data Catalog built here is a simple solution that is not mean to replace a more complete solution (EX: Linkedin Data Hub ) but that could be enough in a high percentage of the cases.
Once that we have enough entries in the log table, we can create a view that is going to use that information and that will generate a "Data Catalog" with a complete snapshot of your meta-store.
Benefits:
- Don’t reinventing the wheel: something that is already being used for other purposes (troubleshooting, statistics…) is reused for creating a data catalog.
- As a view is being used, the "Data Catalog" will be always as updated as possible and no other pipelines/processes will be needed.
CREATE VIEW logs.all_tables_data_catalog AS
SELECT id,
schema_name,
table_name,
status ,
time_taken ,
created_ts ,
databricks_job_url ,
datafactory_pipeline_url,
'SAP_INGESTION' AS type,
last_dsts ,
columns,
Cast(live_rows AS DECIMAL) AS "LIVE_ROWS",
delta_version
FROM (
SELECT *,
Row_number() OVER (partition BY table_name, schema_name ORDER BY created_ts DESC ) AS row_no
FROM [logs].[SAP_INGESTION_LOG_TABLE]
WHERE status ='SUCCESS') logs_SAP
WHERE row_no=1
UNION
SELECT id,
schema_name,
table_name ,
status ,
time_taken_copy+time_taken_merge AS "TIME_TAKEN" ,
created_ts ,
databricks_job_url ,
datafactory_pipeline_url,
'MYSQL_INGESTION' AS type,
last_dsts,
columns,
Cast(live_rows AS DECIMAL) AS "LIVE_ROWS",
delta_version
FROM (
SELECT *,
Row_number() OVER (partition BY table_name, schema_name ORDER BY created_ts DESC ) AS row_no
FROM [logs].[MYSQL_INGESTION_LOG_TABLE]
WHERE status ='SUCCESS') logs_MYSQL
WHERE row_no=1
UNION
SELECT id,
schema_name,
table_name ,
status ,
time_taken ,
created_ts ,
databricks_job_url ,
datafactory_pipeline_url,
'ORACLE_INGESTION' AS type,
last_dsts,
columns,
Cast(live_rows AS DECIMAL) AS "LIVE_ROWS",
delta_version
FROM (
SELECT *,
Row_number() OVER (partition BY table_name, schema_name ORDER BY created_ts DESC ) AS row_no
FROM [logs].[ORACLE_INGESTION_LOG_TABLE]
WHERE status ='SUCCESS') logs_HANA
WHERE row_no=1
where status ='SUCCESS') x WHERE row_no=1
Only the common fields are being selected from every log table (a union can only be performed for the same number of columns), only the successful ingestion are searched and the most recent log for every table (identified by its schema and its name) is being extracted using a r_ownumber window function.
This is providing users a way to check, for every table in the meta-store, among others:
- Structure of the table (Column names and types).
- Last time in which it was updated.
- Latest timestamp of the table rows (Maybe the table is being loaded successfully but with outdated data).
- Number of rows.
It could also be used to easily detect data quality problems in the whole meta-store.
This view could be used in different dashboards that could show that same information to the users but in a more "prettify" way. This simple "Data Catalog" could solve most of the needs of a high percentage of all teams that are considering purchasing a third party Data Catalog solution.
What if your log tables are in different DB instances?
In case the data you want to display in your "Data Catalog" is in different systems (EX: SQL Server, Azure SQL and HANA), you can use SQL Server Linked Servers to query the other systems as if their tables belonged to the first one.
Benefits:
- Avoid unnecessary data movements as data its being queried directly from the source systems.
- Data is always as updated as possible (as no snapshot of the source tables is being used and the live table is being queried).
- In case of network limitations or a large number of external sources to be queried, something similar to a Materialized View could be implemented, but the data displayed may not be as up-to-date as possible.
SQL Server’s connection to HANA:
First download the necessary ODBC Drives from the official SAP website and then follow the steps of the previous article. After completing the installation on the machine where the HANA instance is installed, the ODBC will looks like this:

The Linked Server is created using following commands in SQL Server:
EXEC sp_addlinkedserver
@server='HANA_PRO', -- this is just a descriptive name
@srvproduct='HANA1.0', -- this is just a descriptive name
@provider='MSDASQL', -->This is a fixed value as this is the standard name for this type of provider connection
@datasrc='HANAPRO'; -->Here needs the Data Source Name to be entered that was created before
EXEC sp_addlinkedsrvlogin
@useself = 'FALSE',
@rmtsrvname = 'HANA_PRO', -- You need to use the name that you have used in the sp_addlinkedserver as @server name
@locallogin = NULL,
@rmtuser = 'YOUR HANA USER',
@rmtpassword = 'YOUR HANA PASSWORD';
And, after completing all previous steps, a connectivity check between system is executed by running following query in SQL Server:
SELECT * FROM
(SELECT * FROM OPENQUERY([HANA_PRO],'SELECT * FROM LOGS.HANA_LOG_TABLE;')) a;
SQL Server’s connection to Azure SQL:
Its an easier process as no ODBC is needed:
Finally, as using Linked Servers is like adding the other DB systems as a new DB in SQL Server, the code of the view will be very similar to the previous one (the view that was only reading from the same DB instance):
CREATE VIEW logs.all_tables_data_catalog
AS
SELECT id,
schema_name,
table_name,
status,
time_taken,
created_ts,
databricks_job_url,
datafactory_pipeline_url,
'SAP_INGESTION' AS TYPE,
last_dsts,
columns,
Cast(live_rows AS DECIMAL) AS "LIVE_ROWS",
delta_version
FROM (SELECT *,
Row_number()
OVER (
partition BY table_name, schema_name
ORDER BY created_ts DESC ) AS row_no
FROM [logs].[sap_ingestion_log_table] -- Query to an internal table
WHERE status = 'SUCCESS') sql_server_row_no
WHERE row_no = 1
UNION
SELECT id,
schema_name,
table_name,
status,
time_taken_copy + time_taken_merge AS "TIME_TAKEN",
created_ts,
databricks_job_url,
datafactory_pipeline_url,
'MYSQL_INGESTION' AS TYPE,
last_dsts,
columns,
Cast(live_rows AS DECIMAL) AS "LIVE_ROWS",
delta_version
FROM (SELECT *,
Row_number()
OVER (
partition BY table_name, schema_name
ORDER BY created_ts DESC ) AS row_no
FROM [AZURE SQL (PROD)].[BD].[logs].[azure_sql_ingestion_log] -- Query to an external table in AZURE SQL.
WHERE status = 'SUCCESS') azure_sql_row_no
WHERE row_no = 1
UNION
SELECT id,
schema_name,
table_name,
status,
time_taken,
created_ts,
databricks_job_url,
datafactory_pipeline_url,
type,
last_dsts,
columns,
Cast(live_rows AS DECIMAL) AS "LIVE_ROWS",
delta_version
FROM (SELECT *,
Row_number()
OVER (
partition BY table_name, schema_name
ORDER BY created_ts DESC ) AS row_no
FROM (SELECT *
FROM Openquery([hana_pro], 'SELECT ID,SCHEMA_NAME, TABLE_NAME,STATUS , TIME_TAKEN , CREATED_TS , DATABRICKS_JOB_URL ,DATAFACTORY_PIPELINE_URL, ''HANA_INGESTION'' as TYPE, LAST_DSTS ,COLUMNS, CAST(LIVE_ROWS as decimal) as "LIVE_ROWS", DELTA_VERSION, STATUS FROM LOGS.HANA_INGESTION_LOG
WHERE STATUS= 'SUCCESS' ;') ) hana1) hana_row_no -- Query to an external table in HANA.
WHERE row_no = 1
-- Note that the Row_function could be implemented directly in the source system being queried, but has been left as is for code clarity.
Conclusion
During this article, we have seen:
- Why custom logs are a good addition to data pipelines.
- How To generate custom logs just before finishing a data pipeline execution.
- Benefits of creating a data catalog.
- How to create a data catalog using custom logs.
- How to combine logs from different source systems without ETLs.
Although concrete examples are given about how to start creating custom logs with certain tools, the concept can be applied for other orchestrators and other processing tools.
And that’s all. I hope that this has been useful for you and that you didn’t have to invest the time that I did.
Thanks for reading. You can find me on Linkedin.
"The success formula: solve your own problems and freely share the solutions."
― Naval Ravikant