Warragamba Dam

Elixir vs Python for real world AI/ML (Part 1)

Chris Hopkins profile picture

Chris Hopkins

2 March 2023 · 6 min read

In the world of machine learning, Python is often considered the go-to language for developing and deploying models. However, Elixir - a functional, concurrent programming language - is becoming increasingly well-suited for machine learning tasks.

Build a neural network from scratch

In this blog post, we will explore a real-world machine learning problem in Python and Elixir. By the end you will have two end-to-end examples of how you can combine clean and unify data sources. Then use that data to train and evaluate a neural network.

We will be using data from the New South Wales Water API to analyse rainfall and stream height measurements surrounding Warragamba Dam, a large dam located near Sydney, Australia. The goal is to try and predict change in height of the dam for the next day, given the condition of the current day.

Setup Jupyter and Livebook notebooks

For this project I implemented the Python using Jupyter notebook and the Elixir with Livebook.

Livebook is Elixir’s code notebook offering, and runs on top of Phoenix. It allows you to program solutions interactively and collaboratively and saves to live markdown - so it’s easy to share.

In this project Python dependencies are installed using Poetry . This is the relevant portion of the pyproject.toml and the import at the top of my Jupyter notebook:

# pyproject.toml
[tool.poetry.dependencies]
python = ">=3.8,<3.12"
pandas = "^1.5.2"
numpy = "^1.24.0"
tensorflow = "^2.11.0"
requests = "^2.28.1"
altair-saver = "^0.5.0"

[tool.poetry.group.dev.dependencies]
altair = "^4.2.0"
jupyter = "^1.0.0"

# top of Jupyter notebook
import numpy as np # Numerical python lib
import pandas as pd # Data frame lib
import altair as alt # Chart plotting lib
import tensorflow as tf # Neural Network lib
import requests
import json
import pprint

from tensorflow.keras import layers

In the Elixir Livebook its a similar story:

Mix.install(
  [
    {:httpoison, "~> 1.8"}, # HTTP client
    {:jason, "~> 1.4"}, # JSON encoder/decoder
    {:vega_lite, "~> 0.1.5"}, # Elixir Vega-lite binding
    {:kino, "~> 0.8.0"}, # Provides beautiful outputs for Livebook
    {:kino_vega_lite, "~> 0.1.1"}, # Kino utils for Vega-lite
    {:explorer, "~> 0.4.0"}, # Data frame library build on Polars
    {:axon, "~> 0.3.0"}, # Neural network library compiles to exla, torchlib through nx
    {:exla, "~> 0.4.0"}, # Elixir's binding for Google's XLA Linear Algebra Optimiser
    {:nx, "~> 0.4.0"}, # Elixir's numerical computation library
 ],
  config: [
    nx: [default_backend: EXLA.Backend]
  ]
)

# Sets the global compilation options
Nx.Defn.global_default_options(compiler: EXLA)
# Sets the process-level compilation options
Nx.Defn.default_options(compiler: EXLA)

alias VegaLite, as: Vl
alias Explorer.DataFrame, as: DF
alias Explorer.Series

Extract input data into DataFrames

First, we need to download the data from its source and put it into a container of some kind. In Python this would be a pandas DataFrame and in Elixir it would be an Explorer DataFrame. Explorer is a DataFrame for Elixir which uses Polars under the hood, a library written in Rust widely accepted as one of the fastest DataFrames.

I’ll gloss over the details of the API itself. All you need to know is it returns time series data in a JSON format where v is the data value, t is the time the measurement was taken and q is the quality code for the measurement of the item. In this format:

[{"t": "20230101000000", "v": "-0.124", "q":"5"}, ...]

To get the data from the API in Python:

# see here for more information about stations https://realtimedata.waternsw.com.au/
weather_station_ids = ["563035", "563046", "563079", "568045", "568051"]
stream_station_ids = ["212250", "212270"]

def cast_data_types(df):
    df["v"] = df["v"].astype(float)
    df["q"] = df["q"].astype(float)
    df["t"] = pd.to_datetime(df["t"], format="%Y%m%d%H%M%S")
    return df

json_response = get_ts_trace(...)
water_level_df = pd.json_normalize(json_response)
cast_data_types(water_level_df)

rainfall_dfs = []
for station in weather_station_ids:
    json_response = get_ts_trace(station, ...)

    rainfall_dfs.append(
        cast_data_types(
            pd.json_normalize(json_response)
        )
    )

stream_dfs = []
for station in stream_station_ids:
    json_response = get_ts_trace(station, ...)

    stream_dfs.append(
        cast_data_types(
            pd.json_normalize(json_response)
        )
    )

To get the data from the API in Elixir:

# see here for more information about stations https://realtimedata.waternsw.com.au/
weather_station_ids = ["563035", "563046", "563079", "568045", "568051"]
stream_station_ids = ["212250", "212270"]

water_level_df =
  WaterAPI.get_ts_trace(...)
  |> DF.new()
  |> DF.mutate(v: Series.cast(v, :float))

rainfall_dfs =
  for station_id <- weather_station_ids  do
     WaterAPI.get_ts_trace(station_id, ...)
    |> DF.new()
    |> DF.mutate(v: Series.cast(v, :float))
  end

stream_dfs =
  for station_id <- stream_station_ids  do
     WaterAPI.get_ts_trace(station_id, ...)
    |> DF.new()
    |> DF.mutate(v: Series.cast(v, :float))
  end

We do this for the rainfall and stream data too, which means we end up with 8 pieces of time series data in total:

  • 1 measuring Warragamba Dam water level stored in water_level_df
  • 2 measuring the major rivers which flow into the dam, stored as a list of DataFrames in stream_dfs
  • 5 measuring the rainfall at weather stations near the dam, stored as a list of DataFrames in rainfall_dfs

Derive the ground truth for Water Level Difference

The data we are receiving is the average water level of the dam each day. From this we need to derive the change in water level each day, as this is what we want our model to predict.

This can be done like so in Python:

water_level = water_level_df["v"]
water_level_tomorrow = water_level_df["v"].copy().shift(1, fill_value=0.0)
water_level_df["water_level_difference"] = water_level - water_level_tomorrow

Elixir:

water_level_tomorrow =
  water_level_df["v"]
  |> Series.shift(-1)
  |> then(fn s -> DF.new(water_level_tomorrow: s) end)

water_level_df =
  water_level_df
  |> DF.concat_columns(water_level_tomorrow)
  |> DF.mutate(water_level_difference: water_level_tomorrow - v)
  |> DF.discard("water_level_tomorrow")

Notice how here we have to .copy() the a column in Python to prevent .shift() from causing a side effect on column "v" when calculating the "water_level_difference". This is something you have to consider fairly often when using Pandas, some functions give you views into the data, others give you copies of the data. In Elixir and with Explorer its easy, everything is immutable all of the time.

Clean data in preparation for training the neural net

We need to make sure there are no invalid values in our water_level_df as it will negatively effect our predictions. A value v is invalid when the corresponding quality value q is 201 or 255.

For our stream_dfs and rainfall_dfs we will set invalid values to 0. This is ok because during the training process the neural network should learn how to treat 0 values, so that they have little effect on the outcome of the prediction.

Python:

water_level_df = water_level_df[~water_level_df["q"].isin([201,205])]

for stream_df in stream_dfs:
    stream_df.loc[stream_df["q"].isin([201,255]), "v"] = 0.0

for rainfall_df in rainfall_dfs:
    rainfall_df.loc[rainfall_df["q"].isin([201,255]), "v"] = 0.0

Elixir:

water_level_df = DF.filter(water_level_df, q != 201 and q != 255)

rainfall_dfs =
  for df <- rainfall_dfs do
    df
    |> DF.mutate(m: Series.cast(q != 201 and q != 255, :integer))
    |> DF.mutate(v: v * m)
    |> DF.discard(:m)
  end

stream_dfs =
  for df <- stream_dfs do
    df
    |> DF.mutate(m: Series.cast(q != 201 and q != 255, :integer))
    |> DF.mutate(v: v * m)
    |> DF.discard(:m)
  end

Notice how the Python code is difficult to understand without knowing how Pandas works. The Elixir code is much more self explanatory.

However the Elixir code could be more succinct. There may be a better way to transform the values without an intermediate column in the DataFrame, as there is in Python, but I couldn’t find one. Let me know if you know of one!

Joining the dataset into DataFrames

We want to join this data together into one DataFrame to make it easier to work with. However each DataFrame has the same three column names t,v,q we want to join on time column t, so we need to rename the v and q columns to something unique for each DataFrame. I decided to rename them by appending the station_id and the variable number to the column name.

Finally we need to join all of the data frames together with an inner join. This is important because we want to ensure that only rows are included where there is corresponding data across all the DataFrames (for a given timestamp t).

Renaming and joining the data frames in Python:

def rename_cols_with_id(dfs, station_ids: list, variable_no: str):
    for df, station_id in zip(dfs, station_ids):
        df.rename(
            columns={
                "v": f"v_{station_id}_{variable_no}",
                "q": f"q_{station_id}_{variable_no}",
            },
            inplace=True,
        )

water_level_df = water_level_df.rename(
    columns={"v": "v_212242_130", "q": "q_212242_130"}
)

rename_cols_with_id(rainfall_dfs, weather_station_ids, "10")
rename_cols_with_id(stream_dfs, stream_station_ids, "100")

df = water_level_df

for rainfall_df in rainfall_dfs:
    df = pd.merge(left=df, right=rainfall_df, how="inner", on="t")

for stream_df in stream_dfs:
    df = pd.merge(left=df, right=stream_df, how="inner", on="t")

df.columns
# ['v_212242_130', 't', 'q_212242_130', 'water_level_difference',
#  'v_563035_10', 'q_563035_10', 'v_563046_10', 'q_563046_10',
#  'v_563079_10', 'q_563079_10', 'v_568045_10', 'q_568045_10',
#  'v_568051_10', 'q_568051_10', 'v_212250_100', 'q_212250_100',
#  'v_212270_100', 'q_212270_100']

Renaming and joining the data frames in Elixir:

# snippet of the helper module
defmodule Helper do
  def rename_cols_by_id(dfs, station_ids, variable_no) do
    for {df, station_id} <- Enum.zip(dfs, station_ids) do
      DF.rename(
        df,
        q: "q_#{station_id}_#{variable_no}",
        v: "v_#{station_id}_#{variable_no}"
      )
    end
  end

  def join_dfs(dfs) when is_list(dfs) do
    Enum.reduce(dfs, fn df, acc ->
      DF.join(df, acc, how: :inner)
    end)
  end

    ...
end

water_level_df =
    DF.rename(water_level_df, q: "q_212242_130", v: "v_212242_130")

rainfall_df =
  rainfall_dfs
  |> Helper.rename_cols_by_id(weather_station_ids, "10")
    |> Helper.join_dfs()

stream_df =
    stream_dfs
    |> Helper.rename_cols_by_id(stream_station_ids, "100")
    |> Helper.join_dfs()

df =
    water_level_df
    |> DF.join(rainfall_df)
    |> DF.join(stream_df)

DF.names(df)
# ["q_212242_130", "t", "v_212242_130", "water_level_difference", "q_568051_10", "v_568051_10",
#  "q_568045_10", "v_568045_10", "q_563079_10", "v_563079_10", "q_563046_10", "v_563046_10",
#  "q_563035_10", "v_563035_10", "q_212270_100", "v_212270_100", "q_212250_100", "v_212250_100"]

What have we done so far?

Our data is now cleaned and in a single place, let’s step back to look at what we’ve achieved. We’ve:

  • Set up our journal and imported our dependency.
  • Downloaded eight pieces of time series data (via an API) and placed each of them in a data frame.
  • Created a new column which measures the change in water level of the dam
  • Removed all invalid values from the table
  • Renamed and Joined all the data frames together into one single data frame

Summary

Our data is now clean and in a single DataFrame, lets take a look at our achievements. We’ve:

  • Set up our journal and imported our dependency.
  • Downloaded eight pieces of time series data (via an API) and placed each of them in a data frame.
  • Created a new column which measures the change in water level of the dam
  • Removed all invalid values from the table
  • Renamed and joined all the data frames together into one single data frame

What’s next?

That’s enough for now. If you want to take a closer look the notebooks are on GitHub. In part 2 we’ll focus on building, training and evaluating the neural network and will introduce Nx - Elixir’s incredible tensor library and Axon - Elixir’s neural network library.

Read more in Part 2

Chris Hopkins profile picture

WRITTEN BY

Chris Hopkins

Chris is a full-stack software developer, with a keen interest in writing reliable, fault tolerant and resilient software. He has in-depth experience with load testing, automated testing and observability for deep application introspection. He's a big fan of functional and actor model programming utilising Test Driven Development.

Igniter

Ash Framework Consulting

Igniter - Rethinking code generation with project patching

Zach Daniel

Zach Daniel

16 July 2024 – 8 min read

Building skyscrapers from the top down

Ash

Building skyscrapers from the top down

Ben Melbourne profile picture

Ben Melbourne

3 July 2024 – 4 min read

Want to read more?

The latest news, articles, and resources, sent to your inbox occasionally.