Machine Learning Pipeline

** WORK IN PROGRESS ** This guide isn’t complete but the code examples may be useful as is.

This guide assumes you are familiar with all the content in the Introductory Guide.

A typical machine learning pipeline consists of loading data, extracting features, training models and storing the models for later use in a production system or further analysis. In some cases the feature extraction process is quick and the features are transitory without any need of saving them independently of the finished trained model. Other times the features are a representation of the data that you wish to reuse in different settings, e.g. in a dashboard explaining predictions, ad-hoc analysis, further model development.

In the end a good deal of plumbing is required to wire up an app/service with the latest models and features in such a way that API calls can be traced back to the originating model, features, and even data sources. provenance abstracts much of this plumbing so you can focus on writing parsimonious pythonic pipelines™ with plain old functions.

In [1]:
%load_ext yamlmagic
In [2]:
%%yaml basic_config
blobstores:
    disk:
        type: disk
        cachedir: /tmp/provenance-ml-artifacts
        read: True
        write: True
        delete: True
artifact_repos:
    local:
        type: postgres
        db: postgresql://localhost/provenance-ml-guide
        store: 'disk'
        read: True
        write: True
        delete: True
        # this option will create the database if it doesn't exist
        create_db: True
default_repo: local
In [3]:
import provenance as p


p.load_config(basic_config)
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.runtime.migration] Running stamp_revision  -> e0317ab07ba4
Out[3]:
<provenance.repos.Config at 0x11200ebe0>
In [4]:
import numpy as np
import pandas as pd
import time
from sklearn.utils import check_random_state
import toolz as t
In [5]:
@p.provenance()
def load_data(query):
    # fetch something from the DB in real life...
    random_state = check_random_state(abs(hash(query)) // (10**10))
    return random_state.uniform(0, 10, 10)


@p.provenance()
def extract_features_a(data, hyperparam_a=5):
    time.sleep(2)
    rs = check_random_state(hyperparam_a)
    return data[0:5] + 1 + rs.rand(5)


@p.provenance()
def extract_features_b(data, hyperparam_x=10):
    time.sleep(2)
    rs = check_random_state(hyperparam_x)
    return data[5:] + 1 + rs.rand(5)


@p.provenance()
def build_model(features_a, features_b, num_trees=100):
    return {'whatever': 'special model with {} trees'.format(num_trees)}


@p.provenance()
def evaluate(model, data):
    return {'some_metric': 0.5, 'another_metric': 0.4}


def pipeline(train_query='some query', valid_query="another query", hyperparam_a=5, hyperparam_x=10):
    data = load_data("some query")
    features_a = extract_features_a(data, hyperparam_a)
    features_b = extract_features_b(data, hyperparam_x)
    model = build_model(data, features_a, features_b)

    validation_data = load_data("another query")
    evaluation = evaluate(model, validation_data)

    return {'features_a': features_a, 'features_b': features_b,
            'model': model, 'evaluation': evaluation}


@p.provenance()
def make_decision(model, request):
    # make some sort of prediction, classification, with the model
    # to help make a 'decision' and return it as the result
    return {'prediction': 0.5, 'model': model.artifact.id}

TODO explain everything.. including the concept of artifact sets and how they simpify the building and deployment of models.

In [6]:
def run_production_pipeline():
    with p.capture_set('production'):
        return pipeline()
In [7]:
res = run_production_pipeline()
In [8]:
res = p.load_set_by_name('production')
In [9]:
res
Out[9]:
ArtifactSet(id='08f3c7c6a84132faa155ca9996a26c4df92bd798', artifact_ids=frozenset({'2411521185b4267706a24f85b16c46e3a24b4e66', '96c47ddbeff008e2b3a27913611c9648c3e74aa2', 'd3bb8e7625b7093b079bdc8b7d50c6eaaa62f835', '46268ac8c40932b63033b387aa0217974c82c717', 'd3c930d243d6ec4d7be481ddd1f4c3e9277d5f09', '3fdafd792f113c669d55b416bed9b5091f954029'}), created_at=datetime.datetime(2017, 5, 1, 0, 1, 9, 119196), name='production')
In [10]:
build_artifacts = res.proxy_dict(group_artifacts_of_same_name=True)
In [11]:
build_artifacts.keys()
Out[11]:
dict_keys(['__main__.load_data', '__main__.build_model', '__main__.extract_features_b', '__main__.evaluate', '__main__.extract_features_a'])
In [12]:
model = build_artifacts['__main__.build_model']
In [13]:
model
Out[13]:
<provenance.ArtifactProxy(46268ac8c40932b63033b387aa0217974c82c717) {'whatever': 'special model with [ 9.01053908  9.49144101  2.69614552  5.28085722  3.44221989] trees'} >