统计电影总数
合并两个文件,计算每部电影的平均得分是多少
统计得分排名前10的电影(输出电影名称)
统计最受欢迎的电影(即评分人数最多的电影),
思路:4.1对Ratings.csv文件操作,统计每部电影的评分人数
4.2对评分人数进行排序
4.3在movies.csv文件中过滤出评分人数最多的电影名称
5.编程实现RDD转DataFrame,并选择一个合理的SQL分析。
统计电影总数
合并两个文件,计算每部电影的平均得分是多少
统计得分排名前10的电影(输出电影名称)
统计最受欢迎的电影(即评分人数最多的电影),
思路:4.1对Ratings.csv文件操作,统计每部电影的评分人数
4.2对评分人数进行排序
4.3在movies.csv文件中过滤出评分人数最多的电影名称
5.编程实现RDD转DataFrame,并选择一个合理的SQL分析。
关注本答案参考多次询问 ChatGPT-3.5 后选择的最佳回答,若是满意,还请采纳,谢谢。
以下是使用Spark SQL进行影评分析的Scala代码示例:
首先,确保你已经导入了必要的包和SparkSession的创建:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.builder()
.appName("Movie Review Analysis")
.getOrCreate()
import spark.implicits._
val moviesDF = spark.read.csv("path_to_movies.csv")
val totalMoviesCount = moviesDF.count()
println(s"Total number of movies: $totalMoviesCount")
val ratingsDF = spark.read.csv("path_to_ratings.csv")
val moviesDF = spark.read.csv("path_to_movies.csv")
// 将ratings转换为DataFrame并添加电影ID
val ratingsDFWithMovieId = ratingsDF.withColumn("movieId", col("movieId").cast("integer"))
// 合并DataFrame
val joinedDF = ratingsDFWithMovieId.join(moviesDF, "movieId")
// 计算每部电影的平均得分
val averageRatingsDF = joinedDF.groupBy("title")
.agg(avg("rating").alias("averageRating"))
averageRatingsDF.show()
val topRatedMoviesDF = averageRatingsDF.orderBy($"averageRating".desc).limit(10)
topRatedMoviesDF.show()
// 统计每部电影的评分人数
val popularMoviesDF = ratingsDFWithMovieId.groupBy("movieId").agg(count("rating").alias("numRatings"))
// 对评分人数进行排序
val sortedPopularMoviesDF = popularMoviesDF.orderBy($"numRatings".desc)
// 过滤出评分人数最多的电影名称
val mostPopularMovieDF = sortedPopularMoviesDF.join(moviesDF, "movieId")
.select("title")
.limit(1)
mostPopularMovieDF.show()
// 假设ratingsRDD和moviesRDD是已经加载的RDD
val ratingsRDD = spark.sparkContext.textFile("path_to_ratings.csv")
val moviesRDD = spark.sparkContext.textFile("path_to_movies.csv")
// 将RDD转换为DataFrame
val ratingsDF = ratingsRDD.map(line => {
val fields = line.split(",")
(fields(1).toInt, fields(2).toDouble)
}).toDF("movieId", "rating")
val moviesDF = moviesRDD.map(line => {
val fields = line.split(",")
(fields(0).toInt, fields(1))
}).toDF("movieId", "title")
// 创建临时视图
ratingsDF.createOrReplaceTempView("ratings")
moviesDF.createOrReplaceTempView("movies")
// 使用Spark SQL进行分析
val sqlResultDF = spark.sql(
"""
|SELECT m.title, AVG(r.rating) AS averageRating
|FROM ratings r JOIN movies m ON r.movieId = m.movieId
|GROUP BY m.title
""".stripMargin)
sqlResultDF.show()
请确保替换path_to_movies.csv和path_to_ratings.csv为你的实际文件路径。这些代码示例提供了一个基本的框架,你可以根据具体的需求进行调整和优化。