In [None]:
import datetime
import dateutil.parser
import glob
import pytz
import re
import xml.etree.ElementTree as ET

import altair as alt
import numpy as np
import pandas as pd

from fitparse import FitFile

In [None]:
TZ = pytz.timezone("America/Los_Angeles")
HIIT_12 = pd.DataFrame({ "begin": [ 120_000, 300_000, 480_000 ], "end": [ 180_000, 360_000, 540_000 ] })

alt.data_transformers.enable('default', max_rows=None)

In [None]:
def heart_rate_zone(hr, hr_max=180):
    hr_base = hr_max / 2
    hr_step = (hr_max - hr_base) / 5
    hr_relative = hr_max - hr.clip(lower=hr_base - 1, upper=hr_max - 1)
    return 6 - np.ceil(hr_relative / hr_step).astype(int)

def read_fit(filename, tz):
    data = []
    for record in FitFile(filename).get_messages("record"):
        value = record.get_value("heart_rate")
        if value is not None:
            data.append({
                "timestamp": pytz.UTC.localize(record.get_value("timestamp")).astimezone(tz),
                "heart_rate": value,
                "source": filename
        })
    return to_df(data)

def read_gpx(filename, tz):
    ns = {
        "gpx": "http://www.topografix.com/GPX/1/1",
        "gpxtpx": "http://www.garmin.com/xmlschemas/TrackPointExtension/v1"
    }
    data = []
    for trkpt in ET.parse(filename).getroot().findall(".//gpx:trkpt", ns):
        timestamp = trkpt.findtext("gpx:time", None, ns)
        heart_rate = trkpt.findtext(".//gpxtpx:hr", None, ns)
        if heart_rate is not None:
            data.append({
                "timestamp": dateutil.parser.parse(timestamp).astimezone(tz),
                "heart_rate": int(heart_rate),
                "source": filename
            })    
    return to_df(data)

def read_file(filename, tz=TZ):
    if filename.endswith(".fit"):
        return read_fit(filename, tz)
    if filename.endswith(".gpx"):
        return read_gpx(filename, tz)
    raise Exception("unsupported file type: " + filename)

def to_df(data):
    df = pd.DataFrame(data)
    df["heart_rate_zone"] = heart_rate_zone(df.heart_rate)
    df["time"] = (df.timestamp - df.timestamp[0]).dt.seconds * 1000
    df["duration"] = df["time"] - df.shift(1)["time"]
    return df

def plot_zones(df, intervals, title):
    chart = alt.Chart(df).mark_point(opacity=0.4).encode(
        alt.X("time:T", axis=alt.Axis(grid=True, title=None, format="%M:%S")),
        alt.Y("heart_rate:Q", scale=alt.Scale(domain=[50, 200]), axis=alt.Axis(grid=False, title="bpm")),
        color=alt.Color("heart_rate_zone:O", scale=alt.Scale(domain=[0, 1, 2, 3, 4, 5], range=["lightgray", "rgb(184, 184, 184)", "rgb(30, 185, 219)", "rgb(163, 185, 40)", "rgb(248, 196, 0)", "rgb(221, 3, 82)"]), legend=alt.Legend(title="Zone"))
    ).properties(
        width=1000,
        title=title
    )
    if intervals is not None:
        chart = alt.layer(alt.Chart(intervals).encode(
            x="begin:T",
            x2="end:T"
        ).mark_rect(opacity=0.4, color="lightgray"), chart)
    return chart

def plot_zones_from_files(file_pattern, intervals=None):
    return alt.vconcat(*[plot_zones(read_file(filename), intervals, filename) for filename in glob.glob(file_pattern)])

def plot_sources(df, intervals):
    chart = alt.Chart(df).mark_line(opacity=0.4).encode(
        alt.X("time:T", axis=alt.Axis(grid=True, title=None, format="%M:%S")),
        alt.Y("heart_rate:Q", scale=alt.Scale(domain=[50, 200]), axis=alt.Axis(grid=False, title="bpm")),
        color=alt.Color("source:O", scale=alt.Scale(range=["steelblue"]), legend=None)
    ).properties(
        width=1000
    )
    if intervals is not None:
        chart = alt.layer(alt.Chart(intervals).encode(
            x="begin:T",
            x2="end:T"
        ).mark_rect(opacity=0.4, color="lightgray"), chart)
    return chart


def plot_sources_from_files(file_pattern, intervals=None):
    df = pd.concat([read_file(filename) for filename in glob.glob(file_pattern)])
    return plot_sources(df, intervals)

assert (heart_rate_zone(pd.Series([0, 89, 90, 107, 108, 125, 126, 143, 144, 161, 162, 179, 180, 200]), hr_max=180) == pd.Series([0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 5, 5])).all()

In [None]:
plot_sources_from_files("*-E??.fit", intervals=HIIT_12)

In [None]:
plot_zones_from_files("*-SWM.fit")

In [None]:
plot_sources_from_files("*-RWM.fit", intervals=HIIT_12)

In [None]:
plot_zones_from_files("2019-08-14-WLK.gpx")

In [None]:
def trimp_exp(data, hr_rest=60, hr_max=180, f=1.92):
    hr_minutes = data.groupby(data["heart_rate"]).sum()["duration"] / (1000 * 60)
    hr_percent = (hr_minutes.index - hr_rest) / (hr_max - hr_rest)
    return int(np.sum(hr_percent * 0.64 * np.exp(f * hr_percent) * hr_minutes))
    
assert trimp_exp(read_file("2019-01-17-EXB.fit")) == 22

In [None]:
def read_stats(filenames):
    data = []
    for filename in filenames:
        m = re.search("-([A-Z]+)\.", filename)
        df = read_file(filename)
        data.append({
            "filename": filename,
            "activity": m.group(1) if m else "???",
            "trimp_exp": trimp_exp(df),
            "duration": datetime.datetime(1970, 1, 1, 0, 0) + datetime.timedelta(seconds=df.iloc[-1].time / 1000) # ugh
        })
    return pd.DataFrame(data)

def plot_stats(df):
    return alt.Chart(df).mark_point(opacity=0.6, size=60).encode(
        alt.X("duration:T", axis=alt.Axis(grid=False, title=None, format="%H:%M:%S")),
        alt.Y("trimp_exp:Q", axis=alt.Axis(grid=False, title="TRIMPexp")),
        color=alt.Color("activity:O", scale=alt.Scale(scheme="category10")),
        tooltip="filename"
    ).interactive()

stats = read_stats(glob.glob("*.fit") + glob.glob("*.gpx"))
print(stats)
plot_stats(stats)