经典案例:对user-movie-rating数据建模,用户获得可能喜爱的电影推荐,电影获得潜在观看用户以做营销推广。
movie数据下载地址: http://files.grouplens.org/datasets/ movielens/ml-100k.zip
解压后可以看到主要的三个数据文件,用户信息数据 u.user , 电影信息数据 u.item , 以及用户对电影的评分数据 u.data。
- 初始化PySpark
-
#Initializing PySpark
-
from pyspark import SparkContext, SparkConf
-
conf = SparkConf().setMaster( 'local[4]').setAppName( 'movies_app')
-
sc = SparkContext(conf=conf)
-
import matplotlib.pyplot as plt
-
import numpy as np
-
- 加载数据,并做数据统计、绘制分布直方图、查看数据整体分布情况
-
#加载数据
-
user_data = sc.textFile( "/Users/gao/Desktop/Toby/5Spark-JDK/data/ml-100k/u.user")
-
user_data.first()
-
#统计数据
-
user_fields = user_data.map( lambda line: line.split( "|"))
-
num_users = user_fields.map( lambda fields: fields[ 0]).count()
-
num_genders = user_fields.map( lambda fields:fields[ 2]).distinct().count()
-
num_occupations = user_fields.map( lambda fields:fields[ 3]).distinct().count()
-
num_zipcodes = user_fields.map( lambda fields:fields[ 4]).distinct().count()
-
print ( "Users: %d, genders: %d, occupations: %d, ZIP codes: %d" % (num_users, num_genders, num_occupations, num_zipcodes))
-
-
#建立分布直方图做统计
-
ages = user_fields.map( lambda x: int(x[ 1])).collect()
-
fig = plt.gcf()
-
fig.set_size_inches( 8, 6)
-
plt.hist(ages, bins= 20, color= 'lightblue', normed= True)
-
plt.title( 'histogram of ages')
-
#建立职业的饼图,使用 Map-Reduce的方法做统计
-
count_by_occupation2 = user_fields.map( lambda fields: fields[ 3]).countByValue()
-
x_axis1 = []
-
y_axis1 = []
-
for k,v in count_by_occupation2.items():
-
x_axis1.append(k)
-
y_axis1.append(v)
-
arg_sort = np.argsort(y_axis1)
-
x_axis = np.array(x_axis1)[arg_sort]
-
y_axis = np.array(y_axis1)[arg_sort]
-
#绘制图形
-
pos = np.arange(len(x_axis))
-
width = 1.0
-
fig = plt.gcf()
-
fig.set_size_inches( 12, 8)
-
ax = plt.axes()
-
ax.set_xticks(pos + (width / 2))
-
ax.set_xticklabels(x_axis)
-
plt.bar(pos, y_axis, width, color= 'lightblue')
-
plt.xticks(rotation= 30)
-
plt.title( 'distribution of occupations')
-
#探索下评分数据
-
rating_data_raw = sc.textFile( "/Users/gao/Desktop/Toby/5Spark-JDK/data/ml-100k/u.data")
-
print (rating_data.first())
-
num_ratings = rating_data.count()
-
print ( "Ratings: %d" % num_ratings)
-
rating_data = rating_data_raw.map( lambda line: line.split( "\t"))
-
ratings = rating_data.map( lambda fields: int(fields[ 2]))
-
max_rating = ratings.reduce( lambda x, y: max(x, y))
-
min_rating = ratings.reduce( lambda x, y: min(x, y))
-
mean_rating = ratings.reduce( lambda x, y: x + y) / num_ratings
-
median_rating = np.median(ratings.collect())
-
ratings_per_user = num_ratings / num_users
-
ratings_per_movie = num_ratings / num_movies
-
print ( "Min rating: %d" % min_rating)
-
print ( "Max rating: %d" % max_rating)
-
print ( "Average rating: %2.2f" % mean_rating)
-
print ( "Median rating: %d" % median_rating)
-
print ( "Average # of ratings per user: %2.2f" % ratings_per_user)
-
print ( "Average # of ratings per movie: %2.2f" % ratings_per_movie)
-
- 缺失值填充
-
years_pre_processed = movie_fields.map( lambda fields: fields[ 2]).map( lambda x: convert_year(x)).collect()
-
years_pre_processed_array = np.array(years_pre_processed)
-
-
years_not_1900_index = years_pre_processed_array!= 1900
-
-
mean_year = np.mean(years_pre_processed_array[years_not_1900_index])
-
median_year = np.median(years_pre_processed_array[years_not_1900_index])
-
-
index_bad_data = [index for index in np.where(years_pre_processed_array== 1900)[ 0]]
-
years_pre_processed_array[index_bad_data] = median_year
-
print ( "Mean year of release: %d" % mean_year)
-
print ( "Median year of release: %d" % median_year)
-
print ( "Index of '1900' after assigning median: %s" % np.where(years_pre_processed_array == 1900)[ 0])
-
- 训练、评价模型
-
#使用推荐模型接口
-
from pyspark.mllib.recommendation import ALS,Rating
-
rawData = sc.textFile( "/Users/gao/Desktop/Toby/5Spark-JDK/data/ml-100k/u.data")
-
rawRatings = rawData.map( lambda line:line.split( "\t")[: 3])
-
#构造user-item-rating 数据
-
ratings = rawRatings.map( lambda line:Rating(user=int(line[ 0]),product=int(line[ 1]),rating=float(line[ 2])))
-
-
#模型训练
-
model = ALS.train(ratings, rank= 50, iterations= 10, lambda_= 0.01)
-
-
#在训练集上评价模型
-
from pyspark.mllib.evaluation import RegressionMetrics
-
testdata = ratings.map( lambda p:(p.user,p.product))
-
predictions = model.predictAll(testdata).map( lambda r: ((r.user, r.product), r.rating) )
-
ratesAndPreds = ratings.map( lambda r: ((r.user, r.product), r.rating)).join(predictions)
-
predictedAndTrue = ratesAndPreds.map( lambda r:r[ 1])
-
regressionMetrics = RegressionMetrics(predictedAndTrue)
-
-
print( 'explainedVariance is {:.5f}'.format(regressionMetrics.explainedVariance))
-
print( 'meanAbsoluteError is {:.5f}'.format(regressionMetrics.meanAbsoluteError))
-
print( 'meanSquaredError is {:.5f}'.format(regressionMetrics.meanSquaredError))
-
print( 'r2 is {:.5f}'.format(regressionMetrics.r2))
-
print( 'rootMeanSquaredError is {:.5f}'.format(regressionMetrics.rootMeanSquaredError))
-
注:这里只为演示流程,没有对模型调参,直接效果不好。
-
- 为用户推荐电影
-
#整理电影ID和名称数据,将推荐的电影ID翻译成电影名称
-
movies = sc.textFile( "/Users/gao/Desktop/Toby/5Spark-JDK/data/ml-100k/u.item")
-
movie_titles = movies.map( lambda line:line.split( '|')[: 2]).map( lambda line:(int(line[ 0]),line[ 1])).collectAsMap()
-
-
#为用户 userId = 800寻找 K = 10个推荐点用
-
userId = 800
-
K = 10
-
#方法一:为每个用户推荐K个电影,为每个电影推荐K个对它感兴趣的用户
-
products_for_users = model.recommendProductsForUsers(K) #为每个用户推荐K个电影
-
users_for_products = model.recommendUsersForProducts(K) #每个电影推荐K个对它感兴趣的用户
-
products_for_users.lookup(userId) #查看给userId推荐的电影评分情况
-
-
#方法二:模型自带的给用户user推荐Top K商品的使用方法
-
topKRecs = model.recommendProducts(user=userId,num=K)
-
print( 'Top {} moives recommended for user {} are:'.format(K,userId))
-
for rec in topKRecs:
-
print(movie_titles[rec.product],rec.rating) #查看给用户推荐的商品及得分
-
-
- 寻找相似电影
-
#计算商品的相似性 使用cosine相似性度量
-
def cosineSimilarity(vec1, vec2):
-
return float(np.dot(vec1, vec2))/(np.linalg.norm(vec1, ord= 2) * np.linalg.norm(vec2, ord= 2))
-
-
#计算物品A、B的相似性
-
itemIdA = 567
-
K = 50
-
itemFactorA = np.array(model.productFeatures().lookup(itemIdA)[ 0])
-
-
#找到与物品A 最相似的top-K 个物品
-
itemFactorA_bcast = sc.broadcast(itemFactorA)
-
sim = model.productFeatures().map( lambda factor:(factor[ 0],cosineSimilarity(np.array(factor[ 1]),itemFactorA_bcast.value)))
-
sim_sort=sim.sortBy(keyfunc=( lambda x:x[ 1]), ascending= False).take(K+ 1) #自定义按照相似性进行降序排序
-
-
print( 'Top-50 similar movies to {} are:'.format(movie_titles[itemIdA]))
-
for (itemid,simlarity) in sim_sort[ 1:]:
-
print( '{},{:.3f}'.format(movie_titles[itemid],simlarity) ) #打印出查看相似度高的电影名称
-
- 保存模型
-
#保存模型、可供下次调用
-
model.save(sc, "target/tmp/myCollaborativeFilter")
-
Done
转载:https://blog.csdn.net/eylier/article/details/105302693
查看评论