The world’s leading publication for data science, AI, and ML professionals.

Utilization of Twitter’s API with R – Advanced Techniques.

The maddening adventures of extracting millions of tweets.

Hands-on Tutorials

Connectivity Map of Twitter Activity. Source : Justin Cocco
Connectivity Map of Twitter Activity. Source : Justin Cocco

Twitter’s API is free-to-use and is overall a very useful resource for data analysis. Extracting tweets for one, or several, users can happen without much additional work other than using an R package. Yet – if you are interested in harvesting millions of tweets from tens of thousands of users, you will have to sacrifice additional tears. Here, the general strategy will be outlined for doing just that so that you don’t also lose your collective minds.

The goal is to outline the progressive steps taken to produce a robust and functioning script to extract millions of timelines from a list of thousands of users.

The general flow of this write-up will be:

1 – Introduction

2 – Explanation of requirements in code

3 – Examples of code

4 – Putting it all together


Introduction

There are two main strategies for getting the timeline or tweets from a Twitter user’s timeline. You can either webscrape the data or use Twitter’s API. Twitter has two types of APIs: the free version and the paid version. The paid version is very expensive and not ideal unless you’re Bill Gates.

The free version can be accessed by anyone with a developer account. Setting up the developer account is easy enough. From there, you grab your keys and head over to R.

The walk through for this will be focused on the various and sundry ways that twitter’s API can throw you for a loop (ha!) and how to prevent it. There are two main issues at hand: the first is that each request type is monitored on a time-basis. The one that is most utilized here is the get-timeline get request which has a 900 times per 15-minute limit. The second is that, given a list of user names one may want to pull tweets from, some may be protected, or some may only return small fragments of timelines. As such, the program will have to know how to handle those two errors, along with the mysterious error of never-completing user timelines (more on that maddening experience later).

In doing so, the below script utilizes several techniques: tryCatch() calls, appending dataframes, hard-writing to disk, a dynamic re-start function, automatic shut-down functions, and lastly a manner by which the program can monitor its progress and react accordingly.

Quick Note: Aside from the outline of the program, my other goal in writing this is to help consolidate a lot of common Programming questions asked but are spread far and wide throughout the internet. Here, these questions are answered in a pragmatic and concise manner to help aspiring R programmers (hence the length).


Code Requirements

Thus, we need a code that can do the following things:

1 – Able to iterate through a list of users

2 – Make the request for a specific users timeline

3 – Keep track of the responses already gathered, so as to not get duplicates

3 – Keep track of the responses already gathered, so as to not get duplicates

5 – Able to export to disk

6— Able to handle errors due to rate-limit responses

7— Able to handle errors due to unavailable users

8— Able to handle never-ending loops of requests

8— Shut program down periodically to allow for system reboot

9 – Give periodic updates on the status of the program

10 – Able to have a dynamic reboot modality

Overall, this is a simple for command in R which exports to a tryCatch() function housing a get_timeline request for Twitter‘s API. It’ll iterate through a per-established list of user names, feeding the users into the get timeline request. There are two possible outcomes from this, an error or a dataframe with the requested information. As such, the request will be wrapped into a tryCatch() handler whose output can then be parsed into either an error response or the requested data.

If the output is the requested data, then the program will extract the least status_id (each Twitter status has an associated ID) which will serve as the ceiling for the next request. It will update the array signifying that the response was good, as well as update other key components. Then it appends the dataframe with the new information and then proceeds to the next loop.

If the output of the GET request is an error, it updates the array for the appropriate error that was generated and reacts accordingly. If it is a time-related error, the program saves the data that has been retrieved thus far, exports it, then waits the appropriate amount of time.

If the output is an unauthorized user response (a user is private or some other excluding issue) then the program saves that user’s user ID in a dataframe, and moves on to the next user.

There are also instances wherein the response will be minimal: maybe only one or two tweets per response. This will cause you to quickly reach your maximum causing lengthy delays without receiving sufficient return. For instance: if you only receive 2 tweets per request, for 3,200 tweets that is 1,800 requests which would take 30 minutes minimum. Now multiply that by approximately 5% of your database. Thus,if the output is minimal: the program closes that user and proceeds to the next.

Furthermore – the program recycles its memory vehicles requiring a dynamic start after either a scheduled shut down or an unexpected loss of function.

Lastly, throughout all of this the script should update the user on key items, most importantly being the number of users completed and the estimated time of completion.

Once all of the users have been iterated through, the script will simply export as a csv and be done!

This tutorial will utilize the following packages:

library(tidyverse)
library(rtweet)
library(dplyr)
library(lubridate)
library(readxl)

Lets look at the steps individually:


Retweeting Behavior of Users on Twitter. Source : Justin Cocco
Retweeting Behavior of Users on Twitter. Source : Justin Cocco

Let the Madness Begin

Setting up

Be sure to establish your developer’s account and have your keys. From there, use rtweet’s function _createtoken() to create a token for the API calls. Here, my token is simply: token.

Iterating through a list of users

Supposing you have some dataframe, say, users.df that have a list of users of whom you want to receive their timelines, then the command is a simple for loop:

for(users in users.df$user_id){<expression>}

Request Timeline

With these users, we want to request the timeline. For this the rtweet package will be used. It houses a function called get_timeline() that will call on Twitter’s API.

get_timeline(user, n=200,  max_id = NULL, token = token, check = FALSE)->   output

Quick Note: Twitter’s free API has two distinct rate limits. GET requests for a user’s timeline can only be performed 900 times per 15 minutes. GET requests for the remaining amount of GET requests available can occur 150 times per 15 minutes.

The get_timeline() function from rtweet inherently performs a GET request for available request limits. Setting check = FALSE eliminates this. Otherwise, instead of being able to perform 900 requests before having to wait 15 minutes, you could only perform 150.


Keeping track

To keep track of a lot of the values required, we will use an array. This array will be called master.array. Thus we can update the _gettimeline() function as follows:

get_timeline(user = master.array["user"]], n = 200, max_id = NULL, token= token, check = FALSE) -> output

_Quick Note : Setting maxid to NA rather than NULL will return an empty dataset that will not be caught in the error-catching mechanisms discussed below.

Ideally, with this call, a table is returned with approximately 200 entries. Each entry will be a tweeting activity by that user associated with 90 columns of data. These data include items such as "_isretweet" to specify if the activity is a retweet and other markers useful for investigations.

But the response may also be an error or an empty table. These potentials will have to be addressed. But – before diving into the errors, let’s look at the desired outcomes:

Since we want each GET request will solicit a different set a tweets, we have to tell Twitter’s API which tweets we already have obtained. This is done via the (slightly confusing) _maxid argument. The _maxid will set the ceiling from which the next iteration of tweets will be pulled. The _maxid of the next set is the minimum _statusid of the previous set.

So our first function for propagation becomes:

update.max.id <- function(df, master.array, new = FALSE){
  if(new == TRUE){
     master.array[["max.id"]] <- list(NULL)
  } else {
    as.character(min(df$status_id)) -> max.id
    master.array[["max.id"]] <- as.character(max.id)
   }
   return(master.array)
 }

Quick Note: When storing NULL value in an array, you have to use the list(NULL) command, otherwise you will simply delete the entry. Retrieving this value then is different from other calls: it must be unlisted as we will see below.

As can be seen, the function can nicely handle if a new user is being called as the _n_ew argument will either insert a NULL if it is a new user or will produced the necessary transfers if it is the same user.

A good time to introduce the master.array creation function:

make.array <- function(){
  max.id.df <- tibble("max.id" = NA)
  k.df <- tibble("k" = 0)
  user.df <- tibble("user" = NA)
  number.df <- tibble("number.users" = 0)
  master.array <- data.frame(max.id.df, k.df, user.df, number.df)
  return(master.list)
}

Now we can just update our calling function _gettimeline() to the following:

get_timeline(master.list[["user"]], n=200, max_id = master.list[["max.id"]], token = token) -> output

Since each user can only return 3,200 tweets as a response a method for monitoring how many responses have been obtained should be added. Here, a while() loop will be utilized, thus a required function will be the creation and modification of a constant k.

update.k <- function(df, master.array, new = FALSE) {
   if(new = TRUE){
     master.array[["k"]] <- 0
   } else {
     master.array[["k"]] <- master.array[["k"]] + nrow(df)
     }
   return(master.array)
 }

Where the introduced df will be the dataframe that houses all of the responses obtained thus far from a user.

The last step before we have the outline of our master function is what to do after each request. Here, the output will be a dataframe that needs to be appended to the pre-existing dataframes. Subsequently, after each user, it is a good idea to append to another dataframe so that the dataframe resulting from the user-specific requests can be deleted and remade thereby reducing memory load.

The appending is a simple rbind() where the output is appended with a previously made dataframe, say – rolling.dataframe.

user.dataframe <- data.frame()
rbind(user.dataframe, rolling.dataframe) -> user.dataframe

Quick Note: rbind is considered one of the circles of R-hell. I know this. We all now this. Until I learn how to append the vectorization of a large dataframe onto another this is where we all live now.

And lastly, since there are going to be quite a lot of updates in the array, so we might as well create a mater-update function:

update.function <- function(df, master.array, new){
  update.k(df, master.list, new) -> master.list
  update.max.id(df, master.list, new) -> master.list
  return(master.list)
}

Lastly – it would be prudent to occasionally do two things: shuffle the working memory somewhere else, and export your work.

To improve working times, every 1,000,000 tweets obtained is transferred to an overflow.dataframe and the user.dataframe is deleted. Concurrently, every 100 users the user.dataframe is exported for insurance.

Quick Note: A condition will have to be added to ensure saving any remaining data if the difference between the last overflow save and the current appending dataframe is less than 1,000,000.

An export function could resemble:

export.function(df, name, master.array){
 if(as.numeric(master.array[["number.users"]] %% 100 == 0)) &amp; name = "combined"){
  write_csv(df, path = paste("<path>", name, "csv", sep="."))
 } else if (name == "overflow"){
  write_csv(df, path = paste("<path>", name, "csv", sep="."))
}

Thus, we have the makings of the master.function:

master.function <- function(users.df){
  make.array() -> master.array
  rolling.dataframe <- data.frame()
  user.dataframe <- data.frame()
  for(user in users.df$user_id) {
     update.k(new = TRUE)
     as.character(user) -> master.array[["user"]]
     while(k <= 3200){
       get_timeline(master.array, token) -> output
       update.function(output, master.array, new = FALSE) -> master.list
       rbind(rolling.dataframe, output) -> rolling.dataframe
     }
     rbind(user.dataframe, rolling.dataframe) -> user.dataframe
     export.function(user.dataframe, "combined", master.array)
     if(nrow(user.dataframe) >= 1000000){
      rbind(user.dataframe, overflow.dataframe) -> overflow.dataframe
      export.function(overflow.datdaframe, "overflow", master.array)     
      rm(user.dataframe)
      }
     }
  if(nrow(user.dataframe) >0){
   rbind(user.dataframe, overflow.dataframe) -> overflow.dataframe
  }
  return(overflow.dataframe)
   }

Bidirectional Associations of Twitter Accounts. Source : Justin Cocco
Bidirectional Associations of Twitter Accounts. Source : Justin Cocco

My Bad

But, as mentioned before, there will be errors (and not just from my coding). Thus – this script is not robust enough (nowhere near) to be able to handle the variety of issues that might arise.

Setting the foundation, let’s keep track of how many tweets have been harvested, if an error has been produced, and which error it is:

make.array <- function(){
  max.id.df <- tibble("max.id" = NA)
  k.df <- tibble("k" = 0)
  user.df <- tibble("user" = NA)
  number.completed.df <- tibble("total.amount" = 0)
  error.df <- tibble("error" = FALSE)
  reason.df <- tibble("reason" = NA)
  master.list <- data.frame(max.id.df, k.df, user.df, number.completed.df, error.df, reason.df)
  return(master.array)
}

Then updating this value by creating a function to keep track of the total number of tweets extracted:

total.number.update <- function(master.array, df) {
 master.array[["total.amount"]] <- master.array[["total.amount"]] + nrow(df)
}

Rate limit Errors

The first step in identifying errors and bugs is to convince yourself that there was no way you could have possibly foreseen them coming and any suggestion to the contrary is baseless.

Now that that’s done, the second step is wrapping the function that may produce an error in a tryCatch() environment. Thus, a new function is required:

get.timeline <- function(token = token.full, master.array, user.dataframe) {
  tryCatch(
   expr = {
    get_timeline(master.list[["user"]], n = 200, max_id = master.list[["max.id"]], token, check = FALSE) -> output
    return(output)
  }, 
  warning = function(w){
   message("Warning thrown!")
   print(w)
   master.list[["error"]] <- TRUE
   master.list[["reason"]] <- "wait"
   wait.function(user.dataframe, amount = 901, master.array) -> master.list
   return(master.array)
  }) -> output
 return(output)
}

Quick Note: Keep in mind your environments when utilizing tryCatch(). Nesting multiple tryCatch() creates a difficult environment to transfer various components between sections.

Here, two things occur when a warning for the rate limit is thrown: the first thing is the master.array is updated: an error is indicated and the reason for the error is a rate-limit causing a wait. It then calls the wait.function().

When the program goes to wait, in case of any failure during the 15 minute wait portion (power, internet, server issues, etc.) it is a good idea to save your work, perform a count-down so that you know how much longer the count down is (I’m impatient), update the user on key items of information and then update the array when the wait is done.

That equates to the following functions:

To write the data to disk an export function is crafted:

export.function(df, name, master.array){
 if(as.numeric(master.array[["number.users"]] %% 100 == 0)) &amp; name = "combined"){
  df[, -which(sapply(df, class) == "list")] -> df
  write_csv(df, path = paste("<path>", name, "csv", sep="."))
 } else if (name == "overflow"){
   df[, -which(sapply(df, class) == "list")] -> df
   write_csv(df, path = paste("<path>", name, "csv", sep="."))
   }else if(master.array[["error"]] == TRUE){
     if(master.array[["reason"]] == "wait"){
      df[, -which(sapply(df, class) == "list")] -> df
      write_csv(df, path = paste("<path>", name, ".csv", sep =".")
  }
 }
}

Quick Note: Some of the columns return from the GET request will be lists. Lists cannot be stored as a vector in a csv. Thus, they must be removed. This is performed via the df[, -which…)] command.

The count function will count down every 10 seconds in the console:

count.function <- function(amount){
 print(amount)
 while(amount != 0){
  Sys.sleep(1)
  amount <- amount - 1
  if(amount %% 10 == 0)
   print(amount)
   }
  }
}

A quick update is provided:

user.update.wait <- function(amount, master.list){
 print(paste("Waiting for ", amount/60, "minutes due to ", master.list[["reason"]]))
 print(master.list)
}

Then the wait.function() created:

wait.function <- function(df, amount, master.array){
 export.function(df, "wait", master.array)
 user.update.wait(amount, master.array)
 count.function(amount)
 return(master.array)
}

So then the master function becomes:

master.function <- function(users.df){
  make.array() -> master.array
  rolling.dataframe <- data.frame()
  user.dataframe <- data.frame()
  for(user in users.df$user_id) {
     update.k(new = TRUE)
     as.character(user) -> master.array[["user"]]
     while(k <= 3200){
       get.timeline(master.array, token, user.dataframe) -> output
       update.function(output, master.array, new = FALSE) -> master.list
       rbind(rolling.dataframe, output) -> rolling.dataframe
     }
     rbind(user.dataframe, rolling.dataframe) -> user.dataframe
     export.function(user.dataframe, "combined", master.array)
     if(nrow(user.dataframe) >= 1000000){
      rbind(user.dataframe, overflow.dataframe) -> overflow.dataframe
      export.function(overflow.datdaframe, "overflow", master.array)     
      rm(user.dataframe)
      }
     }
  if(nrow(user.dataframe) >0){
   rbind(user.dataframe, overflow.dataframe) -> overflow.dataframe
  }
  return(overflow.dataframe)
   }

Yet – a keen observer will note that the output can come in two varieties: in the successful case it will be a dataframe of the GET response and the other will be an updated master.list. Thus, we have to add a condition:

master.function <- function(users.df){
  make.array() -> master.array
  rolling.dataframe <- data.frame()
  user.dataframe <- data.frame()
  for(user in users.df$user_id) {
     update.k(new = TRUE)
     as.character(user) -> master.array[["user"]]
     while(k <= 3200){
       get.timeline(master.array, token, user.dataframe) -> output
        if(!inherits(output, "tbl")){
         output -> master.list
         } else {
         print("Response received")
        update.function(output, master.array, new = FALSE) -> master.list
       rbind(rolling.dataframe, output) -> rolling.dataframe
     }
     rbind(user.dataframe, rolling.dataframe) -> user.dataframe
     export.function(user.dataframe, "combined", master.array)
     if(nrow(user.dataframe) >= 1000000){
      rbind(user.dataframe, overflow.dataframe) -> overflow.dataframe
      export.function(overflow.datdaframe, "overflow", master.array)     
      rm(user.dataframe)
      }
     }
  if(nrow(user.dataframe) >0){
   rbind(user.dataframe, overflow.dataframe) -> overflow.dataframe
  }
  return(overflow.dataframe)
   }

The if statement ensures that the output is not in fact a table, but rather a dataframe. Thus, if it is a table, then it must be the master.array and sets it appropriately. It then breaks the remaining statements and retries the get.timeline() call.


Unauthorized User Error

Sometimes, you’ll get users whose timelines cannot be gathered. This will present itself as a warning – just like the rate limits. So, without protections against those, every user (sometimes they can be 100s in a row) will cause a 15 minute wait. This can be taken care of by updating the tryCatch():

get.timeline <- function(master.array, token, user.dataframe) {
  tryCatch(
   expr = {
    get_timeline(master.array[["user"]], n = 200, max_id = master.array[["max.id"]], token, check = FALSE) -> output
    return(output)
  }, 
  warning = function(w){
   message("Warning thrown!")
   print(w)
   master.array[["error"]] <- TRUE
   if(!str_detect(w, "Not Authorized")){
     master.array[["reason"]] <- "wait"
     wait.function(user.dataframe, amount = 901, master.array) ->  master.array
   return(master.array)
   } else {
     message("Unathorized User")
     master.array[["reason"]] <- "blocked"
     print(master.array)
     return(master.array)
    }
  }) -> output
 return(output)
}

It would be nice if in the event of meeting an authorized user, the script would: update a list containing user_ids that cannot be requested; the list exported as a csv; the user is then skipped.

So the master function becomes:

master.function <- function(users.df){
  make.array() -> master.array
  rolling.dataframe <- data.frame()
  user.dataframe <- data.frame()
  for(user in users.df$user_id) {
     update.k(new = TRUE)
     as.character(user) -> master.array[["user"]]
     while(k <= 3200){
       get.timeline(master.array, token, user.dataframe) -> output
        if(!inherits(output, "tbl")){
         output -> master.list
         } else {
         print("Response received")
        if(master.list[["error"]] == FALSE){
         update.function(output, master.array, new = FALSE) -> master.list
         rbind(rolling.dataframe, output) -> rolling.dataframe
        } else if(master.list[["error"]] == TRUE &amp; master.list[["reason"]] == "blocked"){
         print(paste("Removed user", user))
         add.removed.function(removed.users, user) -> removed.users
         update.removed(removed.users, master.array) -> master.array
         export.function(removed.users, "removed", master.array)
         break
      }
       if(nrow(rolling.dataframe) != 0){
       rbind(user.dataframe, rolling.dataframe) -> user.dataframe
       rm(rolling.dataframe)
       export.function(user.dataframe, "combined", master.list)
      }
     if(nrow(user.dataframe) >= 1000000){
      rbind(user.dataframe, overflow.dataframe) -> overflow.dataframe
      export.function(overflow.datdaframe, "overflow", master.array)     
      rm(user.dataframe)
      }
     }
  if(nrow(user.dataframe) >0){
   rbind(user.dataframe, overflow.dataframe) -> overflow.dataframe
  }
  return(overflow.dataframe)
   }

Oh boy that’s a lot of change! Let’s look at each function then:

Quick Note: The if(nrow(….!=0)) statement is protection against the first user attempted providing no response and either not provoking an error (just a NULL response) or provoking an error and still proceeding to the secondary rbind().

The add.removed.function() just keeps track of how many removed users there have been. This is accomplished with another update to the array:


make.array <- function(){
  max.id.df <- tibble("max.id" = NA)
  k.df <- tibble("k" = 0)
  user.df <- tibble("user" = NA)
  number.completed.df <- tibble("total.amount" = 0)
  error.df <- tibble("error" = FALSE)
  reason.df <- tibble("reason" = NA)
  user.removed.df <- tibble("users.removed" = 0)
  master.list <- data.frame(max.id.df, k.df, user.df, number.completed.df, error.df, reason.df, user.removed.df)
  return(master.array)
}

Then the add.removed.function(): This appends the existing dataframe of the removed users with a new entry. The as.character() is added just in case (the _userid is a long string of numbers; occasionally the lazy-evaluating of R will convert it to an integer and thus scientific notation):

add.removed.function <- function(df, add){
 df[nrow(df) + 1, ] <- as.character(add)
 return(df)
}

Then to keep track of how many users have been removed, update the k-value so that the next user is picked, and resets the array for errors:

update.removed <- function(df, master.array){
 master.array[["users.removed"]] <- nrow(df)
 master.array[["error"]] <- FALSE
 master.array[["reason"]] <- NA
 master.array[["k"]] <- 999999
 return(master.array)
}

Then the export function gets updated as well:

export.function(df, name, master.array){
 if(as.numeric(master.array[["number.users"]] %% 100 == 0)) &amp; name = "combined"){
  write_csv(df, path = paste("<path>", name, "csv", sep="."))
  } else if (name == "overflow"){
  write_csv(df, path = paste("<path>", name, "csv", sep="."))
   } else if(master.array[["error"]] == TRUE){
      if(master.array[["reason"]] == "wait"){
       df[, -which(sapply(df, class) == "list")] -> df
       write_csv(df, path = paste("<path>", name, ".csv", sep =".")
      } else if (master.list[["reason"]] == "blocked"){
      write_csv(df, path = paste("<path>", name, ".csv", sep=",")
    }
  }
}

Now – there is a mechanism by which if an error is thrown due to an unauthorized request on a user’s timeline, the user’s ID will be cataloged and exported, then the array updated appropriately so as to remove the signs of an error as well as move to the next user.


Never-Ending Loops

Occasionally, and at no fault of my own, requests will proceed in a seemingly endless loop quickly approaching the 900 per 15 minute limit without actually accomplishing anything. Although during these times responses are received, it isn’t tenable to let these users waste precious time.

Thus, a mechanism should be implemented to catch these and to break the function when they occur.

If there are 3,200 tweets per user, at 200 per request, that’s approximately 18 requests per user. Sometimes there are less than 200 per request received so the limit is set to 20 per user.

This limit can be monitored by another array value:

make.array <- function(){
  max.id.df <- tibble("max.id" = NA)
  k.df <- tibble("k" = 0)
  user.df <- tibble("user" = NA)
  number.completed.df <- tibble("total.amount" = 0)
  error.df <- tibble("error" = FALSE)
  reason.df <- tibble("reason" = NA)
  user.removed.df <- tibble("users.removed" = 0)
  requests.df <- tibble("requests" = 0)
  master.list <- data.frame(max.id.df, k.df, user.df, number.completed.df, error.df, reason.df, user.removed.df, requests.df)
  return(master.array)
}

Then the updated request count is added after the get.timeline() call:

total.calls <- function(master.array){
 master.array[["requests"]] <- master.array[["requests"]] + 1
 return(master.array)
}

Then we set an if statement in the master function:

master.function <- function(users.df){
  make.array() -> master.array
  rolling.dataframe <- data.frame()
  user.dataframe <- data.frame()
  removed.users.dataframe <- data.frame()
  for(user in users.df$user_id) {
     update.k(new = TRUE)
     as.character(user) -> master.array[["user"]]
     while(k <= 3200){
       if(master.array[["requests"]] == 20){
        print(paste("Call count for", user, "exceeded. Breaking"))
        master.array[["k"]] <- 99999
        break
        }
        get.timeline(master.array, token, user.dataframe) -> output
        if(!inherits(output, "tbl")){
         output -> master.list
         } else {
         print("Response received")
        if(master.list[["error"]] == FALSE){
         total.calls(master.array)-> master.array
         update.function(output, master.array, new = FALSE) -> master.list
         rbind(rolling.dataframe, output) -> rolling.dataframe
        } else if(master.list[["error"]] == TRUE &amp; master.list[["reason"]] == "blocked"){
         print(paste("Removed user", user))
         add.removed.function(removed.users, user) -> removed.users
         update.removed(removed.users, master.array) -> master.array
         export.function(removed.users, "removed", master.array)
         break
      }
       if(nrow(rolling.dataframe) != 0){
       rbind(user.dataframe, rolling.dataframe) -> user.dataframe
       rm(rolling.dataframe)
       export.function(user.dataframe, "combined", master.list)
      }
     if(nrow(user.dataframe) >= 1000000){
      rbind(user.dataframe, overflow.dataframe) -> overflow.dataframe
      export.function(overflow.datdaframe, "overflow", master.array)     
      rm(user.dataframe)
      }
     }
  if(nrow(user.dataframe) >0){
   rbind(user.dataframe, overflow.dataframe) -> overflow.dataframe
  }
  return(overflow.dataframe)
   }

This will ensure that after 20 requests for a specific user, the script is stopped and the next user is requested.

Errors Errors everywhere and not a (data) byte to eat.

Once the errors are accounted for, the system state should be reset. This can be accomplished by crafting an error-be-gone function:

error.done <- function(master.array){
 master.array[["error"]] <- FALSE
 master.array[["reason"]] <- NA
 master.array[["is.wait"]] <- FALSE
 return(master.array)
}

This is then inserted wherever the end-point of an error is.


Community Hierarchies on Twitter. Source : Justin Cocco
Community Hierarchies on Twitter. Source : Justin Cocco

Periodic Shut Downs

This program will take a while: days to weeks depending on the number of users being requested. As such, periodic restarts are advised. To do so, one must keep track of the date and time that the program was initiated, the current date and time. If the condition is met that two days have passed since the program has been running (you can set your own limit), the program should save the most recent versions of the data, and then quit.

Thus, as always, the array is updated:

make.array <- function(){
  max.id.df <- tibble("max.id" = NA)
  k.df <- tibble("k" = 0)
  user.df <- tibble("user" = NA)
  number.completed.df <- tibble("total.amount" = 0)
  error.df <- tibble("error" = FALSE)
  reason.df <- tibble("reason" = NA)
  user.removed.df <- tibble("users.removed" = 0)
  requests.df <- tibble("requests" = 0)
  start.time.df <- tibble("start.time" = Sys.Date())
  shut.down.df <- tibble("shut.down" = FALSE)
  master.list <- data.frame(max.id.df, k.df, user.df, number.completed.df, error.df, reason.df, user.removed.df, requests.df, start.time.df, shut.down.df)
  return(master.array)
}

Then, choosing where to put the check: I did so after finishing a user in the master function:

master.function <- function(users.df){
  make.array() -> master.array
  rolling.dataframe <- data.frame()
  user.dataframe <- data.frame()
  removed.users.dataframe <- data.frame()
  for(user in users.df$user_id) {
     reset.function(master.list, user.dataframe)
     update.function(df, new = TRUE)
     as.character(user) -> master.array[["user"]]
     while(k <= 3200){
       if(master.array[["requests"]] == 20){
        print(paste("Call count for", user, "exceeded. Breaking"))
        master.array[["k"]] <- 99999
        break
        }
        get.timeline(master.array, token, user.dataframe) -> output
        if(!inherits(output, "tbl")){
         output -> master.list
         } else {
         print("Response received")
        if(master.list[["error"]] == FALSE){
         total.calls(master.array)-> master.array
         update.function(output, master.array, new = FALSE) -> master.list
         rbind(rolling.dataframe, output) -> rolling.dataframe
        } else if(master.list[["error"]] == TRUE &amp; master.list[["reason"]] == "blocked"){
         print(paste("Removed user", user))
         add.removed.function(removed.users, user) -> removed.users
         update.removed(removed.users, master.array) -> master.array
         export.function(removed.users, "removed", master.array)
         error.done(master.list)
        } else if(master.list [["error"]] == TRUE &amp; master.array[["reason"]] == "rate"){
         print("Re-attempting")
         error.done(master.list)
        }
       if(nrow(rolling.dataframe) != 0){
       rbind(user.dataframe, rolling.dataframe) -> user.dataframe
       rm(rolling.dataframe)
       export.function(user.dataframe, "combined", master.list)
      }
     if(nrow(user.dataframe) >= 1000000){
      rbind(user.dataframe, overflow.dataframe) -> overflow.dataframe
      export.function(overflow.datdaframe, "overflow", master.array)     
      rm(user.dataframe)
      }
     }
  if(nrow(user.dataframe) >0){
   rbind(user.dataframe, overflow.dataframe) -> overflow.dataframe
  }
  return(overflow.dataframe)
   }

The reset function being:

reset.function <- function(master.array, df){
 if(Sys.Date() >= master.array[["start.time"]] + 2) {
  master.list[["shut.down"]] <- TRUE
  export.function(df, "shutdown", master.array)
  shut.down(master.array)
 }
}

This requires an update to the export function:

export.function(df, name, master.array){
 if(as.numeric(master.array[["number.users"]] %% 100 == 0)) &amp; name = "combined"){
  write_csv(df, path = paste("<path>", name, "csv", sep="."))
  } else if (name == "overflow"){
   write_csv(df, path = paste("<path>", name, "csv", sep="."))
   } else if(master.array[["error"]] == TRUE){
      if(master.array[["reason"]] == "wait"){
       df[, -which(sapply(df, class) == "list")] -> df
       write_csv(df, path = paste("<path>", name, ".csv", sep =".")
      } else if (master.list[["reason"]] == "blocked"){
      write_csv(df, path = paste("<path>", name, ".csv", sep=".")
    }
  } else if (master.array[["shut.down"]] == TRUE){
    df[, -which(sapply(df, class) == "list")] -> df
    write_csv(df, path = paste("<path>", name, ".csv",sep="."
  }
}

Then the shut.down() function is called:

shut.down <- function(master.array){
 if(master.array[["shut.down"]] == TRUE){
  quit(save = "no")
 }
}

This will end the current R session while making sure that the most recent set is saved.


Activity surrounding a Node in Twitter. Source : Justin Cocco
Activity surrounding a Node in Twitter. Source : Justin Cocco

Giving Updates

I’m impatient. And I need to know things now. With this in mind, I want my program to update me every time a user is completed, or a wait is initiated, or a user presents as an unauthorized GET request. Along with these hard-points, I want updates on the dynamic state of events: number of users completed, number of users remaining, number of tweets extracted, average run time per user, and the estimated completion date.

The hard-point will be added to the master function. But lets look at the dynamic states. The function that will give us the number of completed users:

get.completed <- function(df1 = overflow.dataframe, df2 =      user.dataframe){
 if(!exists(df1)){
  df2 %>% distinct(user_id) %>% count() %>% as.numeric() -> completed.amount
 } else if (!exists(df2)){
  df1 %>% distinct(user_id) %>% count() %>$ as.numeric() -> completed.amount
 } else if (!exists(df1) &amp;&amp; !exists(df2)){
  print("Unable to locate users")
  completed.amount <- 9999
 } else if(exists(df1) &amp;&amp; exists(df2)){
   df1 %>% distinct(user_id) %>% count() %>$ as.numeric() -> completed.amount1
   df2 %>% distinct(user_id) %>% count() %>% as.numeric() -> completed.amount2
   completed.amount <- completed.amount1 + completed.amount2
 }
 return(completed.amount)
}

Due to the dynamic state of the function, the completed number of users could be anywhere.

Quick Note : This could (and should) be done by wrapping the final argument in a tryCatch() environment. But I already typed this version out so. The tryCatch() technique will be shown below.

The same procedure for getting the total number of users in the pool:

get.total <- function(total.df = retweeters.users) {
 tryCatch(
  expr = total.df %>% distinct(user_id) %>% count() %>% as.numeric() -> total
  return(total)
 }, 
  error = function(e){
   print("Unable to locate the total number of users")
   total <- 999
   return(total)
  }
 ) -> total
 return(total)
} 

Why would the dataframe from which we are pulling all of our users from suddenly become hidden in the environment? Who knows – but has it happened? Yes.

Now that we have both the total number of users in the pool and the number of users completed, we can discover our user.update() function:

user.update <- function(master.array, user.dataframe, overflow.dataframe, retweeters.users){
 get.completed(overflow.df, user.dataframe) -> number.completed
 get.total(retweeters.users) -> number.total
 message("User completed!")
 master.list[["total.amount"]] -> number.of.tweets
 number.of.users.remaining <- number.total - number.completed
 master.list[["current.time"]] <- Sys.time()
 as.numeric(master.list[["current.time"]]) - as.numeric(master.list[["start.time"]]) -> time.passed
 time.passed / number.completed -> time.per.user
 number.of.users.remaining * time.per.user -> remaining.time
 as.numeric(master.list[["current.time"]]) + remaining.time -> est.finish
 as_datetime(est.finish, tz="EST") -> est.finish
##update messages
 print(paste(number.of.users.completed, " completed. ", number.of.users.remaining, " users remaining with ", number.of.tweets, " tweets extracted"))
  print(paste(time.passed, " seconds running. Avg time per user: ", time.per.user, "remaining time: ", remaining.time, ". Estimated completion: ", est.finish))
  print(master.list)
}

This will produce lovely output such as:

user.update() output (Don't worry about that estimated completion time, the times get wonky during errors)
user.update() output (Don’t worry about that estimated completion time, the times get wonky during errors)

The user.update() function is then added along with the other status updates into the master function:

master.function <- function(users.df){
  make.array() -> master.array
  rolling.dataframe <- data.frame()
  user.dataframe <- data.frame()
  removed.users.dataframe <- data.frame()
  for(user in users.df$user_id) {
     reset.function(master.list, user.dataframe)
     update.function(df, new = TRUE)
     as.character(user) -> master.array[["user"]]
     while(k <= 3200){
       if(master.array[["requests"]] == 20){
        print(paste("Call count for", user, "exceeded. Breaking"))
        master.array[["k"]] <- 99999
        break
        }
        get.timeline(master.array, token, user.dataframe) -> output
        if(!inherits(output, "tbl")){
         output -> master.list
         } else {
         print("Response received")
        if(master.list[["error"]] == FALSE){
         total.calls(master.array)-> master.array
         update.function(output, master.array, new = FALSE) -> master.list
         rbind(rolling.dataframe, output) -> rolling.dataframe
         user.update(master.array, user.dataframe, retweeters.users, overflow.dataframe)
         } else if(master.list[["error"]] == TRUE &amp; master.list[["reason"]] == "blocked"){
         print(paste("Removed user", user))
         add.removed.function(removed.users, user) -> removed.users
         update.removed(removed.users, master.array) -> master.array
         export.function(removed.users, "removed", master.array)
         break
       } else if(master.array[["error"]] == TRUE &amp; master.array[["reason"]] == "rate"){
         print("Re-attempting")
         user.update(master.array, user.dataframe, retweeters.users, overflow.dataframe)
         error.done(master.list)
        }
       if(nrow(rolling.dataframe) != 0){
       rbind(user.dataframe, rolling.dataframe) -> user.dataframe
       rm(rolling.dataframe)
       export.function(user.dataframe, "combined", master.list)
       }
     if(nrow(user.dataframe) >= 1000000){
      rbind(user.dataframe, overflow.dataframe) -> overflow.dataframe
      export.function(overflow.datdaframe, "overflow", master.array)     
      rm(user.dataframe)
      }
     }
  if(nrow(user.dataframe) >0){
   rbind(user.dataframe, overflow.dataframe) -> overflow.dataframe
  }
  return(overflow.dataframe)
   }

Quantitative Look into Interactions on Twitter. Source : Justin Cocco.
Quantitative Look into Interactions on Twitter. Source : Justin Cocco.

Restart your engines!

Now that the bulk of our program is written. There needs to be a way in which the program can restart with minimal (read: I’m lazy) input so as to do several things:

1 – I want the program on restart to upload the appropriate files into R

2 – I want the files to be manipulated and appended

3 – Then the files should be re-written to disk but in different paths specifying back-up file names

4 – Make me a cup of coffee.

Furthermore, as an over-arching theme fetal errors may arise spontaneously and totally in an unpredictable not-related-to-me-not-knowing-how-to-spell some hundreds of thousands of tweets into a script. Thus, the restart function should be able to differentiate from a clean restart versus a "ops" restart.

Quick Note: This function is way too long. I know it. You know it. But it seemed like a better idea to have a self-contained start function since it writes and reads and moves files around on disk ; I didn’t want unforeseen backups deleted or moved because of a calling in some other part of the script.

Let’s begin!:

start.function <- function(new = TRUE){
 if(new == TRUE) {
  date <- as.character(Sys.Date())
  # Reading files
  print("Uploading most current")
  read_delim("<path>", delim =",", col_types = (cols(user_id = col_character(), status_id = col_character(), retweet_status_id = col_character())) -> most.current
  print("Uploading combined")
   read_delim(<..same as above just different path..>) -> combined
  print("Uploading unauth users")
   read_delim(<..same as above just different path..>) -> removed
#Appending
  print("Appending combined and most current)
  rbind(most.current, combined) %>% distinct(user_id, status_id, .keep_all = TRUE) -> new.most.current
# Writing backups
  print("Backing up previous most current")
  write_csv(most.current, path = paste("<path>", date, "csv", sep  =".")
  print("Backing up previous removed")
  write_csv(<same as above, just different path>)
#Checking for overflow file
  if(file.exists("<path for overflow>")){
   print("Uploading overflow")
   read_delim("<path for overflow, same as above otherwise>") -> overflow
   print("Appending overflow")
   rbind(new.most.current, overflow) -> new.most.current
# Writting backups for overflow  
   write_csv(overflow, path = paste("<path", date, "csv", sep = ".")
  }
 } else if (new == FALSE){
   read_delim("<path for most current data>")-> new.most.current
 }
 return(new.most.current)
}

From here, the exiting output will be a dataframe that houses the most up-to-date information extracted. From there, the user_ids should be removed from the dataframe that is being iterated across (users.df). And, users.df should be brought in:

get.df <- function(){
 read_delim("<path for users.df>") -> df
 df %>% distinct(user_id) -> df
 return(df)
}

Now that we have the dataframe housing all of the user_ids that we are interested in, and we have the dataframe of the user_ids that we have already extracted, the next obvious step is to remove one from the other. The _userids that were removed for being unauthorized should be removed as well:

captured.users <- function(users.df, most.current, removed){
 users.df[!users.df$user_id %in% most.current$user_id, ] -> output
 output[!output$user_id %in% removed$user_id, ] -> output
 return(output)
}

Et voila, now we have all the pieces we need!


Putting it all together

After all that the final call looks like:

{
  token.function() -> token.full
  make.table() -> master.array
  start.function(new = TRUE) -> most.current 
  get.df(master.array) -> users.df 
  captured.users(users.df, most.current) -> users.df
  get.TLretweeters(token.full, master.array, users.df) -> retweetersTL.df
}

Cluster Analysis of Twitter Behavior. Source : Justin Cocco
Cluster Analysis of Twitter Behavior. Source : Justin Cocco

Conclusions

That’s it folks! That’s the nuts and bolts of a program that can run and extract millions of tweets from users on Twitter using Twitter’s API along with R’s rtweet package. Here we’ve covered many topics: dynamic scripts utilizing an array to store and retrieve relevant information; utilized tryCatch() environments to catch the various and wonderful errors that Twitter’s API can throw; developed a method of waiting when rate-stopped and writing to and uploading from the disk when necessary.

The script definitely has room for improvements and additions. For instance: how would one switch from using rbinds() to per-allocating data and subsequently utilizing vectorization to append the pre-established data? (Seriously how!?). Or how could one incorporate sending updates to the user when they are not present at the computer? The options are endless!

I hope you enjoyed and that this helped!


Related Articles