小言_互联网的博客

使用pyspark.mllib.recommendation做推荐案例-实现流程

409人阅读  评论(0)

经典案例:对user-movie-rating数据建模,用户获得可能喜爱的电影推荐,电影获得潜在观看用户以做营销推广。

movie数据下载地址: http://files.grouplens.org/datasets/ movielens/ml-100k.zip

解压后可以看到主要的三个数据文件,用户信息数据 u.user , 电影信息数据 u.item , 以及用户对电影的评分数据 u.data。

  1. 初始化PySpark
    
        
    1. #Initializing PySpark
    2. from pyspark import SparkContext, SparkConf
    3. conf = SparkConf().setMaster( 'local[4]').setAppName( 'movies_app')
    4. sc = SparkContext(conf=conf)
    5. import matplotlib.pyplot as plt
    6. import numpy as np

     

  2. 加载数据,并做数据统计、绘制分布直方图、查看数据整体分布情况
    
        
    1. #加载数据
    2. user_data = sc.textFile( "/Users/gao/Desktop/Toby/5Spark-JDK/data/ml-100k/u.user")
    3. user_data.first()
    4. #统计数据
    5. user_fields = user_data.map( lambda line: line.split( "|"))
    6. num_users = user_fields.map( lambda fields: fields[ 0]).count()
    7. num_genders = user_fields.map( lambda fields:fields[ 2]).distinct().count()
    8. num_occupations = user_fields.map( lambda fields:fields[ 3]).distinct().count()
    9. num_zipcodes = user_fields.map( lambda fields:fields[ 4]).distinct().count()
    10. print ( "Users: %d, genders: %d, occupations: %d, ZIP codes: %d" % (num_users, num_genders, num_occupations, num_zipcodes))
    11. #建立分布直方图做统计
    12. ages = user_fields.map( lambda x: int(x[ 1])).collect()
    13. fig = plt.gcf()
    14. fig.set_size_inches( 8, 6)
    15. plt.hist(ages, bins= 20, color= 'lightblue', normed= True)
    16. plt.title( 'histogram of ages')

    
        
    1. #建立职业的饼图,使用 Map-Reduce的方法做统计
    2. count_by_occupation2 = user_fields.map( lambda fields: fields[ 3]).countByValue()
    3. x_axis1 = []
    4. y_axis1 = []
    5. for k,v in count_by_occupation2.items():
    6. x_axis1.append(k)
    7. y_axis1.append(v)
    8. arg_sort = np.argsort(y_axis1)
    9. x_axis = np.array(x_axis1)[arg_sort]
    10. y_axis = np.array(y_axis1)[arg_sort]
    11. #绘制图形
    12. pos = np.arange(len(x_axis))
    13. width = 1.0
    14. fig = plt.gcf()
    15. fig.set_size_inches( 12, 8)
    16. ax = plt.axes()
    17. ax.set_xticks(pos + (width / 2))
    18. ax.set_xticklabels(x_axis)
    19. plt.bar(pos, y_axis, width, color= 'lightblue')
    20. plt.xticks(rotation= 30)
    21. plt.title( 'distribution of occupations')

       

    
        
    1. #探索下评分数据
    2. rating_data_raw = sc.textFile( "/Users/gao/Desktop/Toby/5Spark-JDK/data/ml-100k/u.data")
    3. print (rating_data.first())
    4. num_ratings = rating_data.count()
    5. print ( "Ratings: %d" % num_ratings)
    6. rating_data = rating_data_raw.map( lambda line: line.split( "\t"))
    7. ratings = rating_data.map( lambda fields: int(fields[ 2]))
    8. max_rating = ratings.reduce( lambda x, y: max(x, y))
    9. min_rating = ratings.reduce( lambda x, y: min(x, y))
    10. mean_rating = ratings.reduce( lambda x, y: x + y) / num_ratings
    11. median_rating = np.median(ratings.collect())
    12. ratings_per_user = num_ratings / num_users
    13. ratings_per_movie = num_ratings / num_movies
    14. print ( "Min rating: %d" % min_rating)
    15. print ( "Max rating: %d" % max_rating)
    16. print ( "Average rating: %2.2f" % mean_rating)
    17. print ( "Median rating: %d" % median_rating)
    18. print ( "Average # of ratings per user: %2.2f" % ratings_per_user)
    19. print ( "Average # of ratings per movie: %2.2f" % ratings_per_movie)

       

  3. 缺失值填充
    
        
    1. years_pre_processed = movie_fields.map( lambda fields: fields[ 2]).map( lambda x: convert_year(x)).collect()
    2. years_pre_processed_array = np.array(years_pre_processed)
    3. years_not_1900_index = years_pre_processed_array!= 1900
    4. mean_year = np.mean(years_pre_processed_array[years_not_1900_index])
    5. median_year = np.median(years_pre_processed_array[years_not_1900_index])
    6. index_bad_data = [index for index in np.where(years_pre_processed_array== 1900)[ 0]]
    7. years_pre_processed_array[index_bad_data] = median_year
    8. print ( "Mean year of release: %d" % mean_year)
    9. print ( "Median year of release: %d" % median_year)
    10. print ( "Index of '1900' after assigning median: %s" % np.where(years_pre_processed_array == 1900)[ 0])

     

  4. 训练、评价模型
    
        
    1. #使用推荐模型接口
    2. from pyspark.mllib.recommendation import ALS,Rating
    3. rawData = sc.textFile( "/Users/gao/Desktop/Toby/5Spark-JDK/data/ml-100k/u.data")
    4. rawRatings = rawData.map( lambda line:line.split( "\t")[: 3])
    5. #构造user-item-rating 数据
    6. ratings = rawRatings.map( lambda line:Rating(user=int(line[ 0]),product=int(line[ 1]),rating=float(line[ 2])))
    7. #模型训练
    8. model = ALS.train(ratings, rank= 50, iterations= 10, lambda_= 0.01)
    9. #在训练集上评价模型
    10. from pyspark.mllib.evaluation import RegressionMetrics
    11. testdata = ratings.map( lambda p:(p.user,p.product))
    12. predictions = model.predictAll(testdata).map( lambda r: ((r.user, r.product), r.rating) )
    13. ratesAndPreds = ratings.map( lambda r: ((r.user, r.product), r.rating)).join(predictions)
    14. predictedAndTrue = ratesAndPreds.map( lambda r:r[ 1])
    15. regressionMetrics = RegressionMetrics(predictedAndTrue)
    16. print( 'explainedVariance is {:.5f}'.format(regressionMetrics.explainedVariance))
    17. print( 'meanAbsoluteError is {:.5f}'.format(regressionMetrics.meanAbsoluteError))
    18. print( 'meanSquaredError is {:.5f}'.format(regressionMetrics.meanSquaredError))
    19. print( 'r2 is {:.5f}'.format(regressionMetrics.r2))
    20. print( 'rootMeanSquaredError is {:.5f}'.format(regressionMetrics.rootMeanSquaredError))

    注:这里只为演示流程,没有对模型调参,直接效果不好。

  5. 为用户推荐电影
    
        
    1. #整理电影ID和名称数据,将推荐的电影ID翻译成电影名称
    2. movies = sc.textFile( "/Users/gao/Desktop/Toby/5Spark-JDK/data/ml-100k/u.item")
    3. movie_titles = movies.map( lambda line:line.split( '|')[: 2]).map( lambda line:(int(line[ 0]),line[ 1])).collectAsMap()
    4. #为用户 userId = 800寻找 K = 10个推荐点用
    5. userId = 800
    6. K = 10
    7. #方法一:为每个用户推荐K个电影,为每个电影推荐K个对它感兴趣的用户
    8. products_for_users = model.recommendProductsForUsers(K) #为每个用户推荐K个电影
    9. users_for_products = model.recommendUsersForProducts(K) #每个电影推荐K个对它感兴趣的用户
    10. products_for_users.lookup(userId) #查看给userId推荐的电影评分情况
    11. #方法二:模型自带的给用户user推荐Top K商品的使用方法
    12. topKRecs = model.recommendProducts(user=userId,num=K)
    13. print( 'Top {} moives recommended for user {} are:'.format(K,userId))
    14. for rec in topKRecs:
    15. print(movie_titles[rec.product],rec.rating) #查看给用户推荐的商品及得分

  6. 寻找相似电影
    
        
    1. #计算商品的相似性 使用cosine相似性度量
    2. def cosineSimilarity(vec1, vec2):
    3. return float(np.dot(vec1, vec2))/(np.linalg.norm(vec1, ord= 2) * np.linalg.norm(vec2, ord= 2))
    4. #计算物品A、B的相似性
    5. itemIdA = 567
    6. K = 50
    7. itemFactorA = np.array(model.productFeatures().lookup(itemIdA)[ 0])
    8. #找到与物品A 最相似的top-K 个物品
    9. itemFactorA_bcast = sc.broadcast(itemFactorA)
    10. sim = model.productFeatures().map( lambda factor:(factor[ 0],cosineSimilarity(np.array(factor[ 1]),itemFactorA_bcast.value)))
    11. sim_sort=sim.sortBy(keyfunc=( lambda x:x[ 1]), ascending= False).take(K+ 1) #自定义按照相似性进行降序排序
    12. print( 'Top-50 similar movies to {} are:'.format(movie_titles[itemIdA]))
    13. for (itemid,simlarity) in sim_sort[ 1:]:
    14. print( '{},{:.3f}'.format(movie_titles[itemid],simlarity) ) #打印出查看相似度高的电影名称

  7. 保存模型
    
        
    1. #保存模型、可供下次调用
    2. model.save(sc, "target/tmp/myCollaborativeFilter")

     

Done

 



 


转载:https://blog.csdn.net/eylier/article/details/105302693
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场