Hands-on Tutorials

Imagine you’ve been driving on the road for a while and it started to get dark. You feel your eyes getting heavier and know you should stop for a rest, but want to keep going so you can get to your destination on time…
This is a situation that I’m sure a lot of people have experienced at some point, a typical example of ‘drowsy driving’, that leaves us [3X more likely to be in a road accident](http://1. https://www.nsc.org/road-safety/safety-topics/fatigued-driving).
What if there was a way we could identify when we start to feel tired?
This is the problem statement we will try to solve using my personal Fitbit data. Another example I’ve written using Fitbit data can be found here.
The Challenge
We will use sleep and heart rate data collected from my personal Fitbit watch. For this task sleep data gives an idea about when I started to sleep, and can be used as a point to delineate between being awake and asleep. Furthermore this helps us estimate a time interval for being tired; for convenience this will be defined as the last thirty minutes before sleep. Defining these different stages helps to label our heart rate data, which measures heart beats per minute.
This gives us a labelled univariate sequence of heart rate measurements classified by sleep interval, which will be used as an input into our sequence classification model. Our model will be a GRU built using Keras.
After building and training our sequence classifier we’ll see how well it performs on some unseen data.
Data Loading and Preparation
Sleep and heart rate data comes as a mix of timestamped values stored in either a .csv or .json which represents either a day of month, therefore we need to load all these files and append them together.
#Import sleep data data and combine files
def Dataimport():
datasets = ['sleep_files *.csv']
for datatype in datasets:
file_list=[]
path = 'folder pathway'
os.chdir(path)
for file in glob.glob(datatype):
file_list.append(file)
dfs = []
for file in file_list:
data = pd.read_csv(path + file)
print('Reading: ' + str(file))
dfs.append(data)
concatenated = pd.concat(dfs, ignore_index=True)
concatenated = concatenated[['sleep_start','sleep_end']]
return concatenated
sleepdata = Dataimport()
This gives us a table with the time when sleep started and ended, with a value like this:
Sleep start: 2021-01-12 22:10:00 Sleep end: 2021-01-13 05:37:30
We can split the date and time since we will later need to find only the days where both sleep data and heart rate data are recorded:
#Splitting the date and time
def Datasplit(sleepdata):
sleepdata['date_start'] = sleepdata['sleep_start'].str.split('T', 1, expand=True)[0]
sleepdata['time_start'] = sleepdata['sleep_start'].str.split('T', 1, expand=True)[1]
sleepdata['date_end'] = sleepdata['sleep_end'].str.split('T', 1, expand=True)[0]
sleepdata['time_end'] = sleepdata['sleep_end'].str.split('T', 1, expand=True)[1]
sleepdata['start_of_sleep'] = pd.to_datetime(sleepdata['date_start'] + ' ' + sleepdata['time_start'])
sleepdata['end_of_sleep'] = pd.to_datetime(sleepdata['date_end'] + ' ' + sleepdata['time_end'])
sleepdata = sleepdata[['start_of_sleep', 'end_of_sleep']]
sleepdata = sleepdata.sort_values(by="start_of_sleep")
return sleepdata
sleepdata = Datasplit(sleepdata)
For importing heart rate data we can reuse the same Dataimport() function by replacing the filename to include _"heartrate" in the filename. This data will need to be cleaned in a slightly different way, since we want to remove all unnecessary strings and resample the time series to be at the same resolution as the sleep data (i.e. in minutes):
#cleaning the columns of the heart rate data
def HRclean(heart):
heart = heart.sort_values(by="dateTime")
heart = heart.set_index('dateTime')
heart["value"] = heart["value"].apply(str)
heart["value"] = heart["value"].str.split("{'bpm':").str[1]
heart["value"] = heart["value"].str.split(",", 1, expand = True)[0]
heart["value"] = heart["value"].astype(int)
heart = heart.resample('1Min').mean()
heart['value'] = heart['value'].round(0)
heart['date'] = heart.index
heart = heart[['date', 'value']]
return heart
heartdata = Dataimport()
heart = HRclean(heartdata)
Since we will define our ‘tired’ interval as thirty minutes before sleep, we can already imagine there will be a large class imbalance compared to the time awake and asleep. To mitigate this we can filter the heart rate data to a smaller interval, say from early evening to the early hours of the morning:
#selecting only values in the evening (times around the tiredness since mornings are irrelevant)
heart = heart.between_time('19:00', '03:00')
heart["only_date"] = [d.date() for d in heart["date"]]
sleepdata["only_date"] = [d.date() for d in sleepdata["start_of_sleep"]]
#Identifying rows where sleep data exists for the given day of heart rate data
heart['sleep_data_exists'] = pd.Series(heart.only_date.isin(sleepdata.only_date).values.astype(int), heart.date.values)
heart = heart[heart['sleep_data_exists'] == 1]
We can now label the heart rate data as being either asleep or awake based on the date-time range of sleep:
#for each HR row, need to see if that time was during sleep or not, and label as 1 or 0
def Labelling(heart):
print('labelling the data...')
heart['sleep_label'] = 0
#for each heartrate value, for each dt range, if hr date in dt range (per row), =1 else = continue
for i in range(len(heart)):
print(str(i) + ' of '+ str(len(heart)))
for j in range(len(sleepdata)):
if heart['date'][i] >= sleepdata['start_of_sleep'][j] and heart['date'][i] <= sleepdata['end_of_sleep'][j]:
heart['sleep_label'][i] = 1
else:
continue
return heart
heart = Labelling(heart)
The final preparation step will be to label our ‘tired’ class:
#selecting the time n rows before sleep starts
idx = heart.index.get_indexer_for(heart[heart['sleep_label'] == 1].index)
subset = heart.iloc[np.unique(np.concatenate([np.arange(max(i-30,0), min(i-30+1, len(heart)))
for i in idx]))]
subset = subset[subset.sleep_label == 0]
heart['tired_label'] = pd.Series(heart.date.isin(subset.date).values.astype(int))
#cleaning the final labels into numerical values
heart['label'] = pd.Series()
heart['label'][heart.tired_label == 1] = 2
heart['label'][heart.sleep_label == 1] = 1
heart['label'] = heart['label'].fillna(0)
heart = heart.dropna()
Exploring the data
We can now begin to look at our dataset in more detail. Let’s begin by importing useful libraries. It’s then a good idea to visualise some of our labelled heart rate data so we can get an idea about how heart rate varies per class:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow import keras
import matplotlib.pyplot as plt
import scipy.stats as stats
import seaborn as sns
from tensorflow.keras import backend as K
from keras.layers import Dense, Dropout
sns.set(rc={"figure.figsize":(30, 8)})
sns.scatterplot(x = df.index, y = df['heart_value'][:1500], hue = df.label, palette = 'Spectral')
plt.xlabel("Heart Rate (BPM)")
plt.ylabel("Time (Minutes)")
plt.show()

We can clearly see that heart rate varies between being asleep and being awake. Heart rate values labelled as ‘tired’ are transitionary and therefore are less distinct, which will make our modelling interesting. Despite narrowing the time interview we still have an class imbalance that needs to be addressed later.
It’s also a good idea to look at the distribution of heart rate values per class:

And see their summary statistics:

In this case, 0 is awake, 1 is asleep and 2 is tired. The ‘tired’ class has a mean similar to the ‘asleep’ class however has a standard deviation similar to the ‘awake’ class. This echoes our first pass observations from looking at the heart rate time series.
Sequence Preprocessing
Now let’s divide our data into fixed length sequences. Our sequences will be 30 minutes long to align with the length of our tired interval. We also need to split our data:
def create_dataset(X, y, time_steps, step):
Xs, ys = [], []
for i in range(0, len(X) - time_steps, step):
v = X.iloc[i:(i + time_steps)].values
labels = y.iloc[i: i + time_steps]
Xs.append(v)
ys.append(stats.mode(labels)[0][0])
return np.array(Xs), np.array(ys).reshape(-1, 1)
interval = 30
X_train_full, y_train_full = create_dataset(x1, y1, 1, interval)
X_train, X_test, y_train, y_test = train_test_split(X_train_full, y_train_full, test_size=0.05, stratify = y_train_full, random_state=42, shuffle = True)
We can use Keras to convert our labels to encoded categories, and further split our dataset so we have separate training (76%), validation (19%) and testing sets (5%)…a rather strange split looking back in hindsight:
y_train1 = keras.utils.to_categorical(y_train, num_classes = None)
y_test1 = keras.utils.to_categorical(y_test, num_classes = None)
X_train = X_train.astype(np.float32)
y_train1 = y_train1.astype(np.float32)
#making the validation set separately
X_train, X_val, y_train1, y_val = train_test_split(X_train, y_train1, test_size=0.2, stratify = y_train1, random_state=42, shuffle = True)
We use the ‘stratify’ argument in _train_testsplit() so that our labels are distributed proportionately per class. To help with class imbalance we will use class weights, where our model will weight a correct ‘tired’ prediction higher than the other classes. The weight will be proportional to the number of observations in the dataset:
class_weight = {3: 1., 1: 1., 2: int((sum(y_train1.iloc[:,0]) + sum(y_train1.iloc[:,2])) / sum(y_train1.iloc[:,1]))}
In this case, our ‘tired’ class has 13x more weight compared to being awake or asleep. Since this is a multiclass sequence classification we will use a F1 score to measure accuracy. A useful guide on this as well as code for a Keras implementation can be found here.
Model Building
Finally onto the model building. As mentioned we will use a GRU model since its faster to train. We build a model using Keras as follows, making sure to add Dropout layers to minimise overfitting, and passing the output sequence from one GRU to the next. Our GRU layers will start with 32 units that will be multiplied in successive layers. We use an Adam optimiser, set the activation to softmax (since we have multiple classes) and measure the cross entropy loss:
def create_gru_model(unit):
inputs = keras.Input(shape=(X_train.shape[1],X_train.shape[2]))
x = layers.GRU(unit*1, activation='tanh', return_sequences=True) (inputs)
x = layers.Dropout(0.25)(x)
x = layers.GRU(unit*2, activation='tanh', return_sequences=True)(x)
x = layers.Dropout(0.25)(x)
x = layers.GRU(unit*2, activation='tanh', return_sequences=True)(x)
x = layers.Dropout(0.25)(x)
x = layers.GRU(unit*3, activation='tanh')(x)
outputs = layers.Dense(y_train1.shape[1], activation="softmax")(x)
model = keras.Model(inputs, outputs)
opt = keras.optimizers.Adam(learning_rate=1e-3)
model.compile(loss='categorical_crossentropy', optimizer= opt, metrics=[custom_f1])
return model
model_2 = create_gru_model(32)
history2 = model_2.fit(X_train, y_train1, validation_data = (X_val, y_val), epochs = 200, batch_size = 256, shuffle= False, class_weight=class_weight)
After 200 epochs of training and validation we can see the F1 score and model loss:
sns.set(rc={"figure.figsize":(12, 12)})
plt.plot(history2.history['custom_f1'])
plt.plot(history2.history['val_custom_f1'])
plt.ylabel('F1 Score')
plt.xlabel('Epoch')
plt.legend()
plt.show()
plt.plot(history2.history['loss'])
plt.plot(history2.history['val_loss'])
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend()
plt.show()

A final validation score above 0.95 seems pretty good. Finally we will predict on our test set and evaluate the results in a confusion matrix. We’ll need to use np.argmax() to help us when dealing with multiple classes:
y_pred = np.argmax(model_2.predict(X_test), axis=1)
y_pred = np.expand_dims(y_pred, axis=-1)
from sklearn.metrics import plot_confusion_matrix
matrix = confusion_matrix(y_test, y_pred)
sns.heatmap(matrix, annot=True, fmt='g', yticklabels= [ 'Awake', 'Asleep', 'Tired'], xticklabels= [ 'Awake', 'Asleep', 'Tired'])

It appears the large majority of classes were correctly predicted. We see some incorrect classifications between being tired and awake, which is understandable from the mean and histograms we previously saw.
Summary / Next steps
We’ve taken a univariate time series of heart rate data, combined with labels derived from sleep data, to create a sequence classification model able to tell when we’re asleep, awake or tired. Our model has performed well, but maybe there are additional aspects that could be taken into account:
- How would a model perform for binary sequence classification (i.e. tired / not tired)?
- Would data measuring the number of steps taken help to differentiate between being awake and tired (assuming that physical activity decreases when tired)?
- If we expanded our time interval to also record tiredness when we wake up, would it look the same or different compared to ‘evening tiredness’?
There’s a lot more to do and explore, I’ll keep you updated with my progress. Thanks for reading!
Note: This article does not claim to predict or prevent tiredness-related accidents, and should only be read and used as a data science learning exercise.