
Most of our Data is produced organically in organization’s internal systems, but a very large portion of data we source from (and send to) third party systems either for analytics or operational purposes. These needs can be either for finance, logistics, marketing or for any other business departments. I will provide some examples below on the marketing front, as I have recently worked on creating and standardizing a lot of third-party marketing data assets.
- Email/Push marketing: One of our significant involvements in data-api interaction is with Braze, which is our email & push marketing vendor. We gather different diner behavioral metrics using Braze SDK and marry those with other internal & external data and send deep personalized communications/promotions to our diners. So, this is kind of a feedback loop being continuously refined using our data and third party API. With Braze we use asynchronous call, direct API call and event data collection where the vendor streams events to an S3 bucket in real time.
- Search Engine Marketing(SEM): We partner with several search engines where our product is ranked higher and shown as a paid advertisement on certain keyword searches. We collect our ad performance & other third party data for SEM analytics. Some of the APIs we use for this purpose are – Ad campaigns performance (Apple, Bing, Google, Yahoo …), Ad analytics (Google, Facebook, Snapchat …) etc.
- Search Engine Optimization (SEO): We need to do everything to increase our product’s visibility on relevant searches to best utilize our marketing dollars. Some of our API interactions are – Ahrefs (Backlink/inbound link checker), Keyword performance reports (Google AdWords, Google AdWords PPA, Keyword building …) etc.
- App Store Optimization (ASO): We gather ratings, reviews, mobile attributions, competitors data etc. from Android & IOS app stores to optimize our app installation conversions and app ratings. Some examples are – AppFollow (ASO data), Appsflyer (mobile attribution data collections) etc.
- Affiliate Marketing: We collect affiliate marketing data from Affiliate Marketing marketplaces e.g. Buttons, Impact Radius, Friends Buy etc.
- Prospects: We collect data from different partners who get us prospects, they get paid if these prospects are converted to customers.
- Others: A slew of other third party systems where we source from or send data using SFTP/S3 or events streams.
Problems:
We use Python & Spark in EMR and store the result datasets in Hive over S3.Over the period many data scientists and engineers from different teams worked on these third party integrations. The problems we faced are –
- Lack of abstractions: The major problem was that different data collection patterns were not identified and abstracted. This led to a longer development cycle, code repetition, not benefitting from best practices and other problems which could have been hidden away using abstractions or making factories.
- Performance: Many developers did not make use of parallel processing or distributed processing, instead used a single threaded python process. In these cases performance suffered severely.
- Scalability: Single threaded python processes are not scalable. In certain cases distributed processing was used, but in a non-scalable way e.g. using pandas_df or collecting all data to master nodes.
- Retry mechanism: A call to the API can fail due to many reasons. A blind retry can be implemented but this can backfire. As the API can block the client (identified by API key) for quota limit exhaustion or too many attempts which results in the same error code. In these cases we should use an ‘adaptive retry’ mechanism.
- Logging: Current process did not have a mechanism to store the API response details. So, even though the job was completed there was no guarantee that all data was extracted. A reusable logging mechanism quantifies failures with reason codes and helps to retry failed calls later.
In the process of ETL pipeline I will leave the ‘T’ part as this is specific to each process, but we can identify and abstract major patterns in data collection, i.e. Extract(E) and of course in Load (L).
Data Collection Patterns (E)
We identified five different third party data collection patterns –
- API – Synchronous Pull: Here we call a SOAP or REST API endpoint and pass a payload using JSON as an example. API works on that request and when done a response is returned with structured format of data (JSON, CSV, XML etc.) This data is saved by the callee and the process moves on to the next request.
- API – Async Pull: Sometimes the API takes a request, but does not provide the requested data through a synchronous fashion, instead it returns an acknowledgment and works in the background to prepare the requested data. So, the callee process does not wait and moves to the next task. Here, in the payload we optionally can pass a ‘callback url’ which can be hit by the API to signal that the data is prepared and saved to a certain previously agreed upon cloud storage location.
- API Client Libraries: If available, vendor specific libraries can be used. These may be used to pull canned reports directly in JSON, CSV etc. formats and as a higher level of abstraction on the underlying raw API endpoint.
- Event Stream: In certain cases third party vendors stream event data to a cloud storage, which can be ingested in batches or using streaming technologies (e.g. spark streaming).
- SFTP/Cloud Storage: Data is uploaded to a SFTP server or a cloud storage location on a particular schedule or upon request, which needs to be pulled and ingested to the Data Warehouse.
- Web Scraping: We don’t do much of this.
Now that we have finalized the patterns, it’s time to see for each of these what are the reusable parts and how we can standardize those.
API Synchronous Pull
The main components here are API endpoint and payload. But as each API can have a different structure of storing and serving data, collections can be nested and convoluted. e.g. if I am collecting some time series data for all my campaigns in the last month, first I have to hit the campaigns/list? endpoint to collect the campaign_ids, then pass those ids to campaigns/details? to collect corresponding details data to either use specific filters or get fields which I can pass as final payload to campaigns/data_series? endpoint. In each response data can be provided in pages or with a limited number of rows with a next URL marker signifying more data. All of these pages or data chunks need to be navigated till we consume them all. So, this can run many levels deep or can have other API specific implementations which need to be considered. But still there is a lot of scope for standardization here-
a. Retry – It’s easy to create a wrapper using the ‘retry’ module where every API call uses that and retries a number of times before failing. This is too generic and blind, instead we can make an adaptive retry module where we can customize the ‘retry’ behavior based on the response received from API. E.g. for Facebook Analytics API if the status_code of the response is 400, that means – ‘Rate Limit Exceeded’ and there will be corresponding text advising number of minutes to wait before next retry, the thread should sleep for that much time before retrying. Similarly response code 500 means ‘Too much data being asked, reduce data size ask’, here we can reduce the ‘page limit’ and retry. As seen in these examples, a blind Retry is actually harmful, because it will not successfully return any data, instead the calls will be counted towards API rate limit and wait time will be further pushed ahead. This ‘Adaptive Retry’ module can be rule-based which will govern the behavior of retry based on API and the error code, the details of which need to be obtained from corresponding API documentation.
b. Batch – APIs normally accepts ‘batch’ payloads, i.e. an API will take a batch of payloads and return a batch of responses. Batching 50 requests is way more efficient than sending 50 requests one by one. Corresponding API documentation needs to be consulted regarding the usage and limitations, but irrespective of that this batching functionality can be abstracted.
c. Log – Before the response from the API is parsed and processed, it can go through a logging wrapper, which logs the response in a persistent storage. With this, we can quantify the errors with reasons and based on that, retry later if needed.
d. Structured Parse – Response from most of the business APIs is structured and well documented. Structured data should be parsed using well defined schema, e.g. creating a spark schema **** and parsing the response json using that. While parsing we can leave out some data elements or even transform. This way it’s easy to manage, can handle complex data types easily and is way more performant than treating the response as a text and using plain python to glean data elements from text. A schema repository should be created in the code base and used for parsing different API responses.
API Asynchronous Pull
For Asynchronous pull one particular thing which can be streamlined is ‘callback url’ and how the next step is triggered in the data pipeline.Usually we want to pass an AWS API Gateway url as a ‘callback url’ along with our payload and when API asynchronously finish processing the request, it can hit that url to let us know that the requested task is complete and data is available in a prearranged location, e.g. a S3 path. We can integrate a lambda function with the API gateway and can either trigger an EMR step function or any other custom process to process subsequent steps in the pipeline. But we can abstract this into one single process, which can be reused by any job which is pulling API data asynchronously. Only one API gateway is created in the format- https://xxxxxxx.execute-api.us-east-1.amazonaws.com/{environment}/{unique identifier}. From the extraction code while using this as a callback url, we pass an environment (dev, prod etc.) and a unique identifier. The lambda function which is integrated with the API gateway either creates a unique empty ‘signal’ file in S3 to signal the kick off readiness of the next step (which can be polled from the callee job) or takes other actions as defined in a rule set.
Once the data is extracted, during ingestion logging and structured parsing also can be used.
API Client Libraries
API client libraries give programmatic access to interact with the underlying API. In most cases this reduces code volume, makes code robust and provides simpler to set up authentication and authorization. If available, then this is a preferred method to direct API calling. API client documents need to be consulted for installation and usage guidance. As these client libraries already provide a higher level abstraction layer on top of raw API, there is not much specific scope of improvement here apart from what is already being followed in other cases.
Event Stream
Sometimes third party vendors stream events to a cloud storage, which we ingest either in batch mode or using streaming technologies, e.g. spark streaming. In batch mode we can checkpoint the file which was processed last and in the next run we can process files from there. In the case of streaming, we can create a streamingFactory where checkpointing (metadata and/or data), discretizing of streams, combining smaller files to larger files for Hive performance, partition refresh, error handling etc. can be abstracted. Basically both the processes can be highly configurable as the major moving parts only being table name, schema, data location etc.
SFTP/Cloud Storage
Cloud storage or SFTP is a popular and secure way of transferring data and many small vendors prefer that where they can’t make use of API, event stream or other technologies. But this is obviously not a preferable way for third party Data Integration, because it does not have a data contract (unlike API, event stream or API libraries) or seamless integration. Nevertheless, if cloud storage (e.g. S3) is used, then when data arrives a lambda function can trigger an ingestion job and for SFTP we have to keep polling and checking for data arrival.
Here is a pictorial representation of the different data collection processes at a high level.

Data Storage (L)
After extracting and processing, we need to store the processed data to make it readily consumable. In our ecosystem, we use Hive with S3 as external persistent storage. We also use Presto on EMR on top of hive to minimize SQL read latency. As the S3 file system is immutable, so while writing processed data we have the option of overwriting all existing data, appending to it or writing the data to a whole new path and repointing the table there. Here we can handle our different use cases as below –
a. Insert: This is the easiest of all where the processed data can be appended to the existing path for a non-partitioned table or a new partition be created and written there.
b. Update/Delete: As the S3 file system is immutable, the final data set as a whole with updated or deleted data needs to be calculated during data processing and written back to S3 in the same way as Insert.
c. Retention/Purging: We can retain previous states of a table by putting a _history hive table metadata around, and partition it by run_date or run_id to facilitate time series/trend analysis. Older partitions & corresponding data in S3 can be automatically purged by passing a number of retention days or run ids.
All of the above common tasks can be abstracted and made into reusable processes. I am not going into the technical details as the goal here is to identify and discuss the commonalities which can be abstracted.
Final Thought
Ingesting various third party data in various different formats is part of daily life of a data engineer/scientist. Understanding different patterns and abstracting those to reusable tools and processes has a lot of benefits in terms of shortening development lifecycle, cleaner code base, easy maintenance, performance & scalability improvement to name a few. In many cases we do send data or messages back to the external system, where similar processes can be followed. We continuously send user profile, customer journey, ordering behavior etc. events to our CRM & email/push marketing external system using API from various internal systems and processes. We are doing POC to have an intermediate messaging system to make this process more reliable and robust.