At the stamped con, AI Summit in St. Louis, Colaberry Consulting presented how to take a complex idea in the AI domain, apply ML algorithms to it, and deploy it in production using the refactored.ai platform. More details are available including code examples on the platform. This, as we see, is a common area of interest to many Organizations, large & small. To achieve this, it necessitates an efficient pipeline that is effective in having Data Scientists, Machine Learning Engineers, and Data Engineers working in collaboration.
Often Machine Learning Engineers build models that are used by Data Scientists who apply statistical techniques to tweak the models for improving the accuracy of generated outputs. This by no means is the only arrangement that exists across companies. We do see the treatment of all areas of AI as the same which affects the organization negatively as people with misaligned skills are expected to work on problems that aren’t their fit. This is largely due to a lack of understanding of the history of the domain itself by the people in power of decision-making. Also, it is easier to teach programming to people with Math backgrounds (such as EE, Math, Stats, and Biostats who are a better fit for Data Analytics/AI than CS grads who generally lack skills in Linear Algebra/Probability) than teaching Math to programmers. However, the discussion elucidated here mostly deals with the coding guidelines. We shall consider a problem in the AI domain and see what steps we can take to production.
Building a Credit Classifier
Consider a credit rating system where the objective is to classify the datasets into good credit and bad credit.
German Credit Data
German Credit Data is a dataset in the UCI Repository that has information on the credit of various customers. Our task is to segregate customers into Good Credit customers and Bad Credit customers. The data is very extensive and consists of 20 attributes, mainly categorical. The dataset was provided by Prof. Hofmann and contains categorical/symbolic attributes.
Spark for Raw Data Ingestion
The raw data needs to be sampled so that we can start to extract features from it for modeling.
# Load and parse the data
credit_data = spark.read.format("csv").load("/german_credit.txt", sep=" ")
training_sample = credit_data.sample(False, 0.1, 20171001)
Data Cleaning
We shall start by adding a new column called ‘status’ and assigning it as either good/bad from the ‘good/bad’ column.
%matplotlib inline
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn import model_selection
from sklearn.metrics import classification_report, confusion_matrix,accuracy_score, roc_curve, roc_auc_score
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.svm import SVC
from sklearn.preprocessing import scale
import numpy as np
import pandas as pd
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
german_data = pd.read_csv('../data/german.txt', sep=" ")
columns = ['checkin_acc', 'duration', 'credit_history', 'purpose', 'amount',
'saving_acc', 'present_emp_since', 'inst_rate', 'personal_status',
'other_debtors', 'residing_since', 'property', 'age', 'inst_plans',
'housing', 'num_credits', 'job', 'dependents', 'telephone', 'foreign_worker', 'good/bad']
german_data.columns = columns
german_data["status"] = np.where(german_data['good/bad'] == 1, "Good", "Bad")
german_data.head()
Exploratory Data Analysis
Countplot of status, Histogram of credit amount, Amount comparison by credit status, Age of Borrowers, Regression plot of duration vs amount.
try:
german_data = german_data.drop("status", 1)
except:
print("Status is dropped")
german_data.head(5)
german_data['good/bad'] = german_data['good/bad']-1
features = ['checkin_acc', 'credit_history', 'purpose','saving_acc',
'present_emp_since', 'personal_status', 'other_debtors',
'property','inst_plans', 'housing','job', 'telephone', 'foreign_worker']
credit_features = pd.get_dummies(german_data, prefix=features, columns=features)
credit_features.head()
Prepare Dataset for Modeling
Split for modeling.
X=credit_features.drop('good/bad',1)
Y=credit_features['good/bad']
#Standardizing the dataset
names = list(X.columns.values)
num=names[:5]
cat=names[5:]
#Performing the Scaling funcion
X_scale=pd.DataFrame(scale(X[num]))
X_scale.columns = num
X = pd.concat((X_scale,X[cat]), axis=1)
#Split the Dataset into Train and Test
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.3, random_state=0)
Model Selection
Let us apply various classifiers to the same dataset and discover the best-performing model.
- Append the CV scores to the list, cv_scores.
- Make predictions on the test set with the best model, best_model (var name), and assign the predictions to a variable, y_hat.
Machine Learning Solutions
We see that the models have not used any causal features. Looking at causal features would involve looking up research journals in the area and going for a custom implementation.
During the problem discovery phase, it is good to look up ML solutions.
models = []
models.append(('KNN', KNeighborsClassifier()))
models.append(('CART', DecisionTreeClassifier()))
models.append(('NB', GaussianNB()))
models.append(('SVM', SVC()))
seed = 7
scoring = 'accuracy'
cv_scores = []
names = []
accuracy_scores = list()
for name, model in models:
kfold = model_selection.KFold(n_splits=10, random_state=seed)
cv_results = model_selection.cross_val_score(model, X_train, y_train, cv=kfold, scoring=scoring)
cv_scores.append(cv_results)
names.append(name)
msg = "%s: %f (%f)" % (name, cv_results.mean(), cv_results.std())
pd.DataFrame(accuracy_scores.append(msg))
#Printing the accuracy achieved by each model
print(accuracy_scores)
#PLotting the model comparision as box plot
fig = plt.figure()
fig.suptitle('Algorithm Comparison')
ax = fig.add_subplot(111)
plt.boxplot(cv_scores)
ax.set_xticklabels(names)
plt.show()
# Make predictions on Test dataset using SVM.
best_model = SVC()
best_model.fit(X_train, y_train)
y_hat = best_model.predict(X_test)
svm_score = accuracy_score(y_test, y_hat)
print(svm_score)
print(confusion_matrix(y_test, y_hat))
print(classification_report(y_test, y_hat))
0.75
[[197 17]
[ 58 28]]
precision recall f1-score support
0 0.77 0.92 0.84 214
1 0.62 0.33 0.43 86
avg / total 0.73 0.75 0.72 300
Scaling to Big Data
Since we have already done the cleaning in the phase of the experiment, let us borrow some portions of data cleaning to build out the ingestion portion. We shall use a Python notebook to experiment and set up the ingestion pipeline.
- Copy the data frame map functions and other related data cleaning regular expressions to Spark.
- Pandas data frames & Spark data frames have similar functions.
- Save ingested data, and sample.
- Save feature vectors and feature samples.
Data Ingestion Spark Module
This module can be packaged as an ingestion module and added to the automation tasks by the IT.
* The module uses regular expressions, map, reduce functions in spark.
```
# display(dbutils.fs.ls("dbfs:/FileStore/tables/ohvubrzw1507843246878/"))
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql import SQLContext
from copy import deepcopy
sqlContext = SQLContext(sc)
credit_df = spark.read.format("csv").option("header", "true").option("inferSchema", "True").load("dbfs:/FileStore/tables/ohvubrzw1507843246878/german_credit.csv")
credit_df_sample = credit_features.sample(False, 1e-2, 20171010)
credit_df.write.parquet("dbfs:/FileStore/tables/credit_df.parquet")
credit_df_sample.write.parquet("dbfs:/FileStore/tables/credit_df_sample.parquet")
credit_data = credit_df.rdd
feature_cols = credit_df.columns
target = 'Creditability'
feature_cols.pop(0)
def amount_class(amount):
'''
Classify the amount into different classes.
Args:
amount (float): Total amount
Returns:
class (int): Type of class.
'''
if amount < 5000:
return 1
elif amount < 10000:
return 2
elif amount < 15000:
return 3
else:
return 4
credit_data_new = credit_data.map(lambda row: (row, amount_class(row['Credit Amount'])))
Spark MLLib
Use the Spark Machine Learning Library to train the SVM on a sample of the dataset. This module can involve custom ML implementations depending on the type of problem.
- Increase sample size as the training succeeds.
- Ingest all data
```
def features(row):
'''
Gathers features from the dataset.
Args:
row (rdd): Each row in the dataset.
Returns:
(LabeledPoint): Labeled Point of each row.
'''
feature_map = row.asDict()
label = float(feature_map['Creditability'])
feature_list = [float(feature_map[feature]) for feature in feature_cols]
return LabeledPoint(label, feature_list)
credit_features = credit_data.map(features)
credit_features_df = credit_features.toDF()
credit_features_df.write.parquet("dbfs:/FileStore/tables/credit.parquet")
# Sample the features and save it.
credit_sample_df = credit_features.sample(False, 1e-2, 20171010)
credit_sample_df.write.parquet("dbfs:/FileStore/tables/credit_sample.parquet")
# Read the credit data features
credit_features = sqlContext.parquetFile('dbfs:/FileStore/tables/credit.parquet')
model = SVMWithSGD.train(credit_features, iterations=100)
# Evaluating the model on training data
labels_preds = lp.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labels_preds.filter(lambda lp: lp[0] != lp[1]).count() / float(lp.count())
print("Training Error = " + str(trainErr))
# Print the coefficients and intercept for linearsSVC
print("Coefficients: " + str(model.weights))
print("Intercept: " + str(model.intercept))
# Save and load model
model.save(sc, "credit/SVMWithSGDModel")