From Streaming Data to COVID-19 Twitter Analysis: Using Spark and AWS Kinesis

A guide to fiddle with social media data, big data platform, and code.

Zhong Hongsheng
Towards Data Science

--

Pic cited from www.siasat.com

As the COVID-19 spreading and confirmed cases soaring, the world is shutting down. Places that were once teeming with the hustle and bustle of daily life have become ghost-towns. In contrast, social media is noisier than usual. Good news, bad news, and fake news are circulating on Twitter, Facebook, Instagram. Fear and solace, misinformation and clarification, can be easily found in all heated discussions. Social media may have never been played such a critical role as right now, for people to experience the external world and themselves.

For the data science community, it is time to react and track COVID-19’s global impact. In this post, I will demonstrate a simple way to study social media — analyzing Twitter.

Overviews

This article will talk about how to build a data pipeline using popular big data platforms, Spark and AWS, to achieve sentiment and time series analysis on Twitter. The pipeline would transform the tweets filtered by hashtags and keywords, into the streaming data format, which can be manipulated in Spark and Pandas. The streaming data is handled in a way of near real-time processing.

The architecture of the data pipeline

As the above architecture graph depicts, the process would go through the following steps:

  • Step 1: A python program using Tweepy would run on an EC2 instance, fetching the tweets filtered with hashtags and keywords. Data would be sent into a Kinesis Data Stream.
  • Step 2: A Kinesis Data Stream digests the data in a managed, scalable way. Monitor the metrics and check if data goes into Kinesis.
  • Step 3: Relying on the notebook provided by Databricks Community Edition, an application would implement the connection between Spark Structured Streaming and the Kinesis Data Stream. The application builds a sentiment analyzer using Spark SQL and TextBlob.
  • Step 4: Converted into Pandas Dataframes, the data would be indexed with the timestamp and visualized with time series using Matplotlib.

The source code for tweet capture and analysis has been uploaded to GitHub.

Part 1: Capture tweets

To analyze data, we need to have data at first. Tweepy is a powerful python library to capture real-time tweets. Imagine the program of capturing tweets may take a few hours, it would be consuming for CPU and memory. In this case, running it on a remote machine — EC2 — would be a good idea.

A snippet of the tweet capture code is attached below. Tweepy streaming is used here. The complete code is uploaded here.

stream_name = ''  # fill the name of Kinesis data stream you createdif __name__ == '__main__':
# create kinesis client connection
kinesis_client = boto3.client('kinesis',
region_name='', # enter the region
aws_access_key_id='', # fill your AWS access key id
aws_secret_access_key='') # fill you aws secret access key
# create instance of the tweepy tweet stream listener
listener = TweetStreamListener()
# set twitter keys/tokens
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
# create instance of the tweepy stream
stream = Stream(auth, listener)

Remember to fill the missing fields with your AWS and twitter credentials.

Part 2: Process streaming data using Kinesis Data Stream

If we start running the code, data would be fetched continuously like water. It needs to go through a hose or pipe, or it needs to be stored. AWS Kinesis Data Stream (KDS) is such a hose, providing features like elastic scaling, real-time metrics, and data analytics.

Creating a KDS is quite simple. Go to the AWS Kinesis service page, choose to create Data Stream, fill the stream name. For the number of shards, as this is just a demo, 1 shard should be OK.

The page of creating Kinesis Data Stream

Once the KDS is created, we can run the tweet capture code in part 1. In this demo, the captured tweets should contain hashtag “#COVID-19” and keyword “Canada”.

Running the tweet capture code

After waiting a few minutes, from the monitoring tab of Kinesis Data Stream, we can see metrics for the tweet stream.

Metrics reflected on Kinesis Data Stream

Part 3: Ingest the data using Spark Structured Streaming

Spark is a popular cluster-computing framework to handle big data problems. Since the birth in 2014, Spark keeps evolving and has experienced many changes. Certain changes are critical and may confuse, so I will explain a bit about them.

Cited from javatpoint.com

Spark resilient distributed dataset (RDD) is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are the main logical data units in Spark. Based on RDD, Spark Streaming is built up as a scalable fault-tolerant streaming processing system that natively supports both batch and streaming workloads.

Changes were introduced since Spark 2.x. The DataFrame was released as an abstraction on top of the RDD, followed by the Dataset. In Spark 1.x, the RDD was the primary API, but as of Spark 2.x use of the DataFrame API is encouraged. To support DataFrame and Dataset, Spark Structured Streaming comes up and provides a higher-level interface than Spark Streaming API.

Sited from Linked SlideShare

Spark SQL allows users to manipulate DataFrames. From the name, we can guess Spark SQL provides SQL language support, which is pretty like Hive.

Now we have done the theory, time to get hands dirty. The programming paradigm for Spark Structured Streaming is like:

  • import the necessary classes and create a local SparkSession
  • create the Spark Streaming DataFrames
  • make operations on the DataFrame

More content can be found about how to program using Spark Structured Streaming.

Databrick Community Edition is a one-stop big data platform. Users can write Python code like in Jupyter notebooks but being able to interact with the Spark clusters smoothly. Besides, Databricks Community Edition also provides useful features like mix languages and visualization.

First, we need to create a spark session:

spark = SparkSession.builder\
.master("local")\
.appName("Structured Streaming")\
.getOrCreate()

Then create a Spark Streaming DataFrame from the Kinesis Data Stream:

pythonSchema = StructType() \
.add("id", StringType(), True) \
.add("tweet", StringType(), True) \
.add("ts", StringType(), True)
awsAccessKeyId = "" # update the access key
awsSecretKey = "" # update the secret key
kinesisStreamName = "" # update the kinesis stream name
kinesisRegion = ""
kinesisDF = spark \
.readStream \
.format("kinesis") \
.option("streamName", kinesisStreamName)\
.option("region", kinesisRegion) \
.option("initialPosition", "LATEST") \
.option("format", "json") \
.option("awsAccessKey", awsAccessKeyId)\
.option("awsSecretKey", awsSecretKey) \
.option("inferSchema", "true") \
.load()

With the running of tweet capture program on EC2 and Kinesis Data Stream, we can see how the data sourced from the Kinesis Data Stream, coming into the Spark Streaming Dataframe:

df = kinesisDF \
.writeStream \
.format("memory") \
.outputMode("append") \
.queryName("tweets") \
.start()
Streaming data is coming into Spark

df is a StreamingQuery to handle the active streaming query. format(“memory”) denotes that the output would be stored in memory as an in-memory table. Output modes specify what gets written to the output sink, and here we choose the append mode, which means only the new rows added to the result table.

Once streaming is started, the incoming data can be monitored via the dashboard. Through checking the status of df, we know the data is available and we can start exploring the data.

Apply SQL commands on the query:

tweets = spark.sql("select cast(data as string) from tweets")

See the heading data on the query:

Explore the top 5 rows of data in the Dataframe

Part 4: Sentiment analysis using Spark DataFrames

In order to analyze the data, it needs to be more structured. All Twitter APIs that return tweets provide that data encoded using JavaScript Object Notation (JSON). Let us take a brief look at a sample Tweet JSON:

{
"created_at": "Thu May 10 15:24:15 +0000 2018",
"id_str": "850006245121695744",
"text": "Here is the Tweet message.",
"user": {
...
},
"place": {},
"entities": {},
"extended_entities": {
...
}
}

When the tweet capture code is running, we have specified the needed fields: “id” denoted as the tweet id, “text” denoted as the tweet content, “ts” denoted as the time stamp.

# tweet capture code running in EC2def on_data(self, data):
# decode json
tweet = json.loads(data)
# print(tweet)
if "text" in tweet.keys():
payload = {'id': str(tweet['id']),
'tweet': str(tweet['text'].encode('utf8', 'replace')),
'ts': str(tweet['created_at']),
},
print(payload)
try:
put_response = kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(payload),
PartitionKey=str(tweet['user']['screen_name']))
...

So the data we captured has been organized. Using UDF, we can add these fields into the Spark Dataframe.

def parse_tweet(text):
data = json.loads(text)
id = data[0]['id']
ts = data[0]['ts']
tweet = data[0]['tweet']
return (id, ts, tweet)

# Define your function
getID = UserDefinedFunction(lambda x: parse_tweet(x)[0], StringType())
getTs = UserDefinedFunction(lambda x: parse_tweet(x)[1], StringType())
getTweet = UserDefinedFunction(lambda x: parse_tweet(x)[2], StringType())
# Apply the UDF using withColumn
tweets = (tweets.withColumn('id', getID(col("data")))
.withColumn('ts', getTs(col("data")))
.withColumn('tweet', getTweet(col("data")))
)
Explore the Dataframe after adding fields

Now we step to implement sentiment analysis on it. Sentiment analysis is part of Natural Language Processing (NLP), interpreting and classifying emotions from the text. There are many academic and industrial works involving in this field, and most of NLP libraries are capable of doing it. For demonstration purposes only, we use TextBlob to perform this job.

The basic task in sentiment analysis is to classify the polarity of the given text, into negative, neutral and positive. The polarity number is a float which lies in the range of [-1,1]. 1 means a positive statement and -1 means a negative statement. Thus, we can classify the data like this:

import textblobdef get_sentiment(text):
from textblob import TextBlob
tweet = TextBlob(text)
if tweet.sentiment.polarity < 0:
sentiment = "negative"
elif tweet.sentiment.polarity == 0:
sentiment = "neutral"
else:
sentiment = "positive"
return sentiment

Now we have the field for sentiment information, we can make use of it to have a high-level insight about the tweets. Here we want to see the sentiment classification over these captured tweets, guessing what kinds of emotion were revealed. With the features of mix languages and visualization on the Databricks Community Edition platform, a sentiment analysis about the relationship between sentiment and tweet number is achieved.

%sql
select sentiment, count(*) as cnt from tweets_parsed group by sentiment
Sentiment analysis expressed by a bar chart

Part 5: time series analysis using Numpy, Pandas, Matplotlib

The above analysis is based on PySpark DataFrames. However, the combination of Numpy/Pandas/Matplotlib is more professional to make operation on data. Thus, we need to convert the PySpark DataFrames into Panas DataFrames. Luckily, it is not that hard.

tweets_pdf = tweets.toPandas()

Now we can use the rich functionality from Pandas to explore the data:

To do time series analysis, the best practice is to index the data with time units. Here we need to convert the field ts into datatime type, and then use it to index the Dataframe.

pd.to_datetime(tweets_pdf['ts'])
idx = pd.DatetimeIndex(pd.to_datetime(tweets_pdf['ts']))
Check if the ts field is parsed into timestamp index correctly

The following code is to manipulate the data, and then visualize it into a time series line chart using Matplotlib. The chart depicts how the number of tweets containing the hashtag “#COVID-19” and keyword “Canada” changes along with the time.

# Plotting the series
%matplotlib inline
fig, ax = plt.subplots()
ax.grid(True)
ax.set_title("Tweet Numbers")
interval = mdates.MinuteLocator(interval=10)
date_formatter = mdates.DateFormatter('%H:%M')
datemin = datetime(2020, 3, 28, 16, 00)
datemax = datetime(2020, 3, 28, 17, 10)
ax.xaxis.set_major_locator(interval)
ax.xaxis.set_major_formatter(date_formatter)
ax.set_xlim(datemin, datemax)
max_freq = per_minute.max()
min_freq = per_minute.min()
ax.set_ylim(min_freq-100, max_freq+100)
ax.plot(per_minute.index, per_minute)
display(fig)
x-axis: tweet published time, y-axis: number of tweets

Conclusion

In this work, we go through a path from building a data pipeline to doing meaningful analysis. But this is just a small step in mining social media. In the future, I will keep exploring and find out other ways to gain insight over big data.

Note from the editors: Towards Data Science is a Medium publication primarily based on the study of data science and machine learning. We are not health professionals or epidemiologists, and the opinions of this article should not be interpreted as professional advice. To learn more about the coronavirus pandemic, you can click here.

--

--