Challenges & Solutions for Production Recommendation Systems

Introduction

There are lots of articles about training and evaluating recommenders, but few explain how to overcame the challenges involved in setting up a full-scale system.

Most libraries don’t support scalable production systems out of the box. The challenges are usually:

• Predicting dynamically – When you have a very large user/items dimensionality, it can be very inefficient – or impossible – to precompute all the recommendations.
• Optimising response times – When you create predictions dynamically, the time you need to retrieve them is very important.
• Frequently updating models – When the system needs to incorporate new data as it becomes available, it’s crucial to frequently update the models.
• Predicting based on unseen data – This means dealing with unseen users or items and continously changing features.

This post will tell you how you can modify a model to extend its functionality for a full-scale production environment.

Hybrid Recommender Models Deal with Real-World Challenges Better

We use a LightFM model, a very popular python recommendation library that implements a hybrid model. It’s best suited for small- to middle-sized recommender projects – where you don’t need distributed training.

Short Recap of Different Recommender Approaches

There are two basic approaches to recommendation:

Collaborative models use only collaborative information – implicit or explicit interactions of users with items (like movies watched, rated, or liked). They don’t use any information on the actual items (like movie category, genre, etc.).

Collaborative models can achieve high precision with little data, but they can’t handle unkown users or items (the cold start problem).

Content-based models work purely on the available data about items or users – entirely ignoring interactions between users and items. - So they approach recommendations very differently than collaborative models.

Content-based models usually:

• Require much more training data (you need to have user/item examples available for almost every single user/item combination), and
• Are much harder to tune than collaborative models.

But they can make predictions for unseen items and usually have better coverage compared to collaborative models.

Hybrid Recommenders – like LightFM – combine both approaches and overcome a lot of the challenges of each individual approach.

They can deal with new items or new users:

When you deploy a collaborative model to production, you’ll often run into the problem that you need to predict for unseen users or items – like when a new user registers or visits your website, or your content team publishes a new article.

Usually you have to wait at least until the next training cycle, or until the user interacts with some item, to be able to make recommendations for these users.

But the hybrid model can make predictions even in this case: It will simply use the partially available features to compute the recommendations.

Hybrid models can also deal with missing features:

Sometimes features are missing for some users and items (simply because you haven’t been able to collect them yet), which is a problem if you’re relying on a content-based model.

Hybrid recommenders perform for returning users (those who are known from training) as well as new users/items, as long as you have features about them. This is especially useful for items, but also for new users (you can ask users what they’re interested in when they visit your site for the first time).

System Components

This system assumes that there are far fewer items than users, since it always retrieves predictions for all items. But it can serve as the basis for more complex recommenders.

The core of the system is a flask app that receives a user ID and returns the relevant items for that user. It will (re)load the LightFM model and query a redis instance for item and/or user features.

We’ll assume that user and item features are stored and serialised in a redis database and can be retrieved by the flask app at any time.

All applications will be deployed as microservices via docker containers.

How LightFM Makes Predictions

But how does it work?

The LightFM paper is very informative for an academic reader, but maybe a little brief for someone who isn’t very familiar with the domain. I’ll outline the LightFM model predicition process more simply below.

Explanation of the formulas:

• Lowercase letters refer to vectors, and uppercase letters refer to matrices.
• The subscript $$u$$ refers to a single user, and $$U$$ refers to the complete set of all users. Items are referred to in the same way.

Most of the naming here is consistent with the LightFM paper.

Model Components

So LightFM combines to best of the collaborative and the content-based approaches. You might say it models one component for each of the two approaches. Both are necessary to give us the properties we want from the recommender.

Collaborative component

The collaborative component allows you to fall back on a collaborative filtering algorithm in case you don’t have any features – or the features aren’t informative.

State-of-the-art collaborative filtering algorithms are implemented with a matrix factorisation. They estimate two latent (unobserved) matrix representations, which, when multiplied by each other, will reproduce the matrix of interactions for each item and user the model saw during training. Of course, there’s an error term to allow for noise and avoid overfitting.

A simple analogy: Try to factorise 12. We can do this with 2 and 6, 3 and 4, 1 and 12, etc. It’s similar for matrices.

We’ll call those matrices latent represenations, since they’re a compressed form of our interaction data.

Content-based components

The content-based component allows you to get predictions even if you have no interaction data.

LightFM incorporates user and item features by associating the features with the latent representations. The assumption is that features and latent representation are linearly related. So in vector form:

$$q_u$$ is the latent user representation, $$f_u$$ is a single user’s features row vector, $$E_u$$ are the estimated item embeddings, and $$b_u$$ are the biases for the user emeddings. (For simplicity, we’ll leave them out from now on.)

Looks similar to linear regression, right? Except $$E_U$$ is a matrix, as opposed to $$ß$$, which is usually a vector. In fact, this actually performs multiple regressions: one for each model component. Again, it’s analogous for items.

During training, both the user embeddings and the item embeddings are estimated with the help of gradient descent algorithms. The embedding matrix will have a row for each feature. The columns of the embedding matrix are called components. The number of components is set as a model hyperparameter, which we’ll refer to from now as $$d$$.

The above image recaps this process for all users and all items. So in Step I, we have a matrix multiplication of the user feature matrix of shape $$N_{users} \times N_{user_{features}}$$ with the embedding matrix of shape $$N_{user_{features}} \times d$$. The same applies for the second multiplications of the item features by the item embeddings, respectively. The result from Step I is two matrices of shape $$N_{users} \times d$$ and $$N_{items} \times d$$, respectively. So each user/item is represented as a latent vector of size $$d$$.

In the final step, these two matrices are mutliplied, resulting in the final score for each user and item of shape $$N_{users} \times N_{items}$$.

Now you can easily get all the representations for a single user with the following term:

$$q_u$$ is a row vector of the user’s latent representations, and $$Q_I$$ is a matrix of the latent representations of all items.

Enable Fallback to Collaborative Mode with Indicator Matrices

LightFM can only generate models with collaborative information.

It uses a very effective trick: If no user or item features are used at all, the model will accept an identity matrix of the size respective to $$N_{users}$$ or $$N_{items}$$. This is very effective, since it then learns $$d$$ components – one for each user. This way, the model can always fall back on the best pure collaborative method. You can think of these components as the model’s memory of the users and items it has already seen during training.

You can also force the model to fall back to collaborative mode – even when you do have features: You can modify the feature matrix by appending an identity matrix to it. Sometimes you’ll need this to get your model to converge. However, this usually means your features are too noisy or don’t carry enough information for the model to converge to a minimum by itself.

Finally, using this trick increases the effort needed to take the model to production: During training, the user’s index is used to retrieve the correct row of the corresponding feature/identity matrix – and this information might no longer be available in a production environment; plus the LightFM model hands this responsibility off to the user.

Interesting fact

The latent representations of similar items/users (in terms of collaborative information) you can obtain by using only indicator features will be close in Euclidean space. This model estimates them based on collaborative information. So you can use those to find similarities between your items or users.

Recreating Indicators and Features on the Fly

Now let’s implement a model that can fall back on collaborative mode, keeps track of IDs, and is thus able to reconstruct the correct features and indicators.

We’ll focus on implementing a complete approach. This is quite complex, because at the same time, it should be able to give predictions in most situations. We’ll subclass the LightFM class and add a special predict_online method, which is intended to be used during production.

This way, we can still use LightFM’s cythonised predictions functions and avoid handling user and item ID mappings separately.

It should satisfy these requirements:

1. Reconstruct the indicator feature if the user/item was seen during training;
2. Make online predictions no matter what data is available on a certain user;
3. Make those predictions as quickly as possible.

ID Mappings

To achieve the first requirement, you’ll have to use the same class during training as well. You also need to adjust your subclass so it only accepts sparsity SparseFrame objects during training, and therefore creates and saves ID mappings.

Reconstructing Features

In order to achieve the second requirement, you need to check the available data every time a request comes in. There are 16 cases you’ll have to handle:

In cases IV, VIII, and XII, we simply return our baseline predictions. For cases XIII through XVI, we can’t give any predictions, because we don’t know enough about the items.

To summarise: We basically want to create a row vector which contains the user features, if they’re available. Otherwise it’s all zeros at the respective indices. It will also contain the user indicator feature set at the correct index, if the user was seen during training.

The item features are analogous to the user features, except we expect them to fit into memory easily to allow for a cache. You might consider using a different caching strategy (like TTLCache) based on your usecase, or not caching at all.

We also want to support not adding indicators, or only adding them to user or item features, which might make the implementation a little more complex. Still, we’ve tried to keep it as simple as possible.

Below you’ll find a sample implementation of the approach described above. This implementation should handle all cases up to VIII correctly. But it’s possible that not all item cases are implemented, since our application didn’t require it. So predicting known items without item features isn’t possible, but it should be very easy to add.

Part II of this post uses this class, connects it to a redis database, and serves its prediction dynamically with flask. We’ll also show you how to update the model without downtime with a background thread that starts from within in the flask application.

Implementation

"""Recommendation module.
This module deals with using LightFM models in production and includes a
LightFm subclass which provides a predict_online method to use in API or
similar scenarios.
"""
import operator

import numpy as np
import pandas as pd
import sparsity as sp
from scipy import sparse
from lightfm import LightFM
from cachetools import cachedmethod, LRUCache
from cachetools.keys import hashkey

class LFMRecommender(LightFM):
"""Recommender class based on the LightFM Model.
The LightFM model is more expressive if an identity matrix is appended to
feature matrices. It acts like a memory for the model, since it
creates an individual embedding (vector of no_comp) for each user previously
seen (during training).
If the user is unknown from training but user_features are available,
these can be passed to the model/class, and the model will try to give the best recommendations
based on the available data. There will be an embedding for each
feature used during training.
Furthermore, baseline recommendations are computed and returned
if the user is unknown and no user features are available.
Finally, this class contains lots of checks on data integrity and can
recover from things like shuffled or additional features.
Parameters
----------
indicators: 'users', 'items', 'both' or False
whether to add identity matrices to the respective
features matrices. Adds a user/item memory to the model.
kwargs:
remaining arguments are passed to the LightFM model.
"""

def __init__(self, indicators='both', **kwargs):
"""Initialise model.
Parameters
----------
indicators: 'users', 'items', 'both' or False
whether to add identity matrices to the respective
features matrices. Adds a user/item memory to the model.
kwargs:
remaining arguments are passed to the LightFM model.
"""
super().__init__(**kwargs)
self.uid_map = pd.Series([])
self.iid_map = pd.Series([])
if indicators in ['both', 'users', 'items', False]:
self.indicator_setting = indicators
elif indicators:
self.indicator_setting = 'both'
else:
raise ValueError("Invalid identity_matrix parameters: {}"
.format(indicators))
self.user_feature_names = pd.Index([])
self.item_feature_names = pd.Index([])
self.baseline = pd.Series([])
self._user_indicator = None
self._item_indicator = None
self._item_cache = LRUCache(maxsize=8)

def fit_partial(self, interactions: sp.SparseFrame,
user_features: sp.SparseFrame = None,
item_features: sp.SparseFrame = None,
sample_weight=None,
epochs=1,
num_threads=1,
verbose=False):
try:
self._check_initialized()
except ValueError:
self.prepare(interactions, item_features, user_features)

interactions = interactions.data
user_features = getattr(user_features, 'data', None)
item_features = getattr(item_features, 'data', None)

user_features, item_features = self.append_indicators(
user_features, item_features
)

super().fit_partial(interactions, user_features, item_features,
sample_weight, epochs, num_threads, verbose)

def prepare(self, interactions, item_features, user_features):
"""Prepare model for fit and prediction.
This method initialises many model attributes, like
item and user mappings, as well as features. This is
usually done automatically.
In some rare cases, it might be useful – like
when using append_identity on a untrained model
(used in train_with_early_stopping).
Parameters
----------
interactions: SparseFrame
train interactions
item_features: SparseFrame, None
item metadata features
user_features: SparseFrame, None
user metadata features
Returns
-------
None
"""
self.uid_map = pd.Series(np.arange(interactions.shape[0]),
index=interactions.index)

# TODO fix part where interactions are created with MultiIndex in cols
if isinstance(interactions.columns, pd.MultiIndex):
interactions._columns = interactions.columns.levels[0]

self.iid_map = pd.Series(np.arange(interactions.shape[1]),
index=interactions.columns)
if self.indicator_setting:
self._init_indicators()
if not self.indicator_setting and \
(user_features is None or item_features is None):
raise ValueError("Can't estimate embeddings without indicators. "
"Try setting identity_matrix='both' or pass user "
"and item features to estimate embeddings.")

self.user_feature_names = getattr(user_features, 'columns', None)
self.item_feature_names = getattr(item_features, 'columns', None)

self.baseline = pd.Series(
np.asarray(interactions.mean(axis=0)).flatten(),
index=interactions.columns,
name='score') \
.sort_values(ascending=False)

def append_indicators(self, user_features, item_features):
"""Append indicator like used during training.
Helper function mainly to use with LightFM evaluation functions.
Parameters
----------
user_features: csr_matrix
user features without identity/indicators
item_features: csr_matrix
item_features without identity/indicators
Returns
-------
uf_with_indicator, if_with_inidcator: csr_matrix
"""
if self.indicator_setting in ['users', 'both']:
if user_features is not None:
user_features = sparse.hstack([user_features,
self._user_indicator[:-1, :]])
else:
user_features = self._user_indicator[:-1, :]
if self.indicator_setting in ['items', 'both']:
if item_features is not None:
item_features = sparse.hstack([item_features,
self._item_indicator])
else:
item_features = self._item_indicator
return user_features, item_features

def _init_indicators(self):
"""Initialize indicator matrices."""
if self.indicator_setting in ['both', 'users']:
D = len(self.uid_map)
self._user_indicator = sparse.vstack([
sparse.identity(D, format='csr'),
sparse.csr_matrix((1, D))
])
if self.indicator_setting in ['items', 'both']:
self._item_indicator = sparse.identity(
len(self.iid_map), format='csr')

def append_user_identity_row(self, v, idx):
"""Append single identity row to vector.
Parameters
----------
v: csr_matrix
row_vector
idx:
identity index will determine the position of the positive
entry in the appended identity
Returns
-------
appended: csr_matrix
"""
return sparse.hstack([v, self._user_indicator[idx, :]])

def _check_missing_features(self, item_feat, user_feat):
"""Check for any missing features."""
if user_feat is not None:
user_feat_diff = set(self.user_feature_names) - \
set(user_feat.columns)
if len(user_feat_diff):
raise ValueError('Missing user features: {}'
.format(user_feat_diff))

if item_feat is not None and self.user_feature_names is not None:
item_feat_diff = set(self.item_feature_names) -\
set(item_feat.columns)

if len(item_feat_diff):
raise ValueError('Missing item features: {}'
.format(item_feat_diff))

@cachedmethod(cache=operator.attrgetter('_item_cache'),
key=lambda _, __, item_ids: hashkey(item_ids))
def get_item_data(self, item_features, item_ids):
"""Return item data.
This creates the item feature csr and corresponding item names and
numerical ids. Caches result in case same items are requested again.
"""
item_ids = np.asarray(list(item_ids))
if item_features is not None:
assert item_features.shape[0] >= len(item_ids)
assert set(item_ids).issubset(set(item_features.index))
iid_map = pd.Series(np.arange(len(item_features)),
index=item_features.index)
else:
iid_map = self.iid_map
iid_map = iid_map.reindex(item_ids)
return self._construct_item_features(item_features, item_ids), \
iid_map.values,\
iid_map.index

def predict_online(self, user_id, item_ids, item_features=None,
user_features=None, num_threads=1, use_baseline=False):
"""Helper method to use during API use.
This method reads all available data and gives the best possible
recommendation for a received sample.
It also executes various checks on data integrity.
Parameters
----------
user_id: scalar
user ids as provided during training
item_ids: array like
item ids as provided during training
item_features: SparseFrame
user_features: SparseFrame
num_threads: int
Number of threads to use during prediction
use_baseline: true
in case user is not known and no user features are passed and
use_baseline=True baseline predictions will be returned . If
use_baseline=False a KeyError will be raised.
Returns
-------
predictions: pd.Series
a mapping from item id to score (unsorted)
"""
self._check_missing_features(item_features, user_features)

if item_ids is not None:
if isinstance(item_ids, pd.Index):
item_ids = item_ids.tolist()
item_names = tuple(item_ids)
else:
item_names = tuple(self.iid_map.index.tolist())

item_feat_csr, num_item_ids, item_labels = \
self.get_item_data(item_features, item_names)
try:
user_feat_csr = self._construct_user_features(user_id,
user_features)
except KeyError:
if use_baseline:
return self.baseline
else:
raise

# for single case prediction we always use id 0 as lightFm uses it as
# index into the user feature matrix if the user was known during
# training we append an identity matrix to indicate that the user
# was known.
pred = super().predict(0, num_item_ids,
item_feat_csr, user_feat_csr,
num_threads)

pred = pd.Series(pred, index=item_labels)
return pred

def _construct_item_features(self, item_features, item_ids):
"""Create item features during predict."""
# align feature names
if self.indicator_setting in ['both', 'items']:
item_indicator = sp.SparseFrame(self._item_indicator,
index=self.iid_map.index)
item_indicator = item_indicator.reindex(item_ids).data
else:
item_indicator = None

if self.item_feature_names is None:
return item_indicator

item_feat_csr = item_features\
.loc[:, self.item_feature_names]\
.reindex(item_ids, axis=0)\
.data
if item_indicator is not None:
item_feat_csr = sparse.hstack([item_feat_csr,
item_indicator])
return item_feat_csr

def __setstate__(self, state):
"""Support unpickling older versions of this class."""
if 'identity_matrix' in state:
state['indicator_setting'] = state['identity_matrix']
self.__dict__ = state

def _construct_user_features(self, user_id, user_features):
"""Create user features for a single user."""
# retrieve numerical user ids
# abort and return baseline recommendations if user is not known
# and no user features are passed
user_known = True
try:
num_user_id = self.uid_map.loc[user_id]
except KeyError:
# Case we have no features nor the user was known we abort.
if user_features is None:
raise
user_known = False
num_user_id = 0

if user_features is not None:
if self.user_feature_names is None:
raise ValueError('Model was trained without user features. '
'But received user features for prediction.')

user_feat_csr = user_features.loc[:, self.user_feature_names].data

if user_feat_csr.shape[0] > 1:
raise ValueError(
'Received user feature matrix with more than 1 row.')
else:
user_feat_csr = None
if self.user_feature_names is not None and \
self.indicator_setting in [False, 'users']:
raise ValueError("Need user features as used "
"during training: {}"
.format(self.user_feature_names))

if self.indicator_setting in ['users', 'both']:
# if no user_features were used during training
# no need to handle further cases just use indicator row.
if self.user_feature_names is None:
user_feat_csr = self._user_indicator[num_user_id]
# Append identity matrix only if user is known from training,
# features have been passed and the identity_matrix flag is set.
elif user_feat_csr is not None and user_known:
user_feat_csr = self.append_user_identity_row(user_feat_csr,
num_user_id)
elif user_feat_csr is None and user_known:
empty_features = sparse.csr_matrix(
(1, len(self.user_feature_names)))
user_feat_csr = self.append_user_identity_row(empty_features,
num_user_id)
elif user_features is not None and not user_known:
user_feat_csr = self.append_user_identity_row(
user_feat_csr, -1)
return user_feat_csr


Need help with a machine learning challenge?

OR send me a description to: m.schmitt@datarevenue.de