头歌spark实训
Spark环境搭建和使用
词频统计
任务描述
读取文件 /data/bigfiles/example.txt 中的内容,完成 WordCount 词频统计,其中单词之间的间隔符为:一个空格符。最终输出结果按大小写字母升序排列,使用 \t 作为字段输出间隔符,如下所示:
A 10
B 3
a 5
b 10
c 1
d 9
代码补充完成后,打开右侧命令行窗口,使用 maven 打包项目,并使用 spark-submit 进行手动提交,将打包提交运行的结果保存到 /root/result.txt 文件中。
代码编写
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
/******************* Begin *******************/
// 定义读取的文件路径
val filePath = "/data/bigfiles/example.txt"
// 创建 SparkConf 对象
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
// 创建 SparkContext 对象
val sc = new SparkContext(conf)
// 读取文件
val lines = sc.textFile(filePath)
// 将每行拆分为单词
val words = lines.flatMap(line => line.split(" "))
// 将每个单词映射为 (word, 1) 的键值对
val wordCounts = words.map(word => (word, 1))
// 对每个单词进行计数
val result = wordCounts.reduceByKey(_ + _)
// 按照频率进行排序
val sortedResult = result.sortByKey()
// 输出结果
sortedResult.zipWithIndex().foreach { case ((word, count), index) =>
println(s"${word}\t$count")
}
/******************* End *******************/
}
}
控制台
切换目录到项目,maven打包后,提交到spark执行,执行结果写入到txt文件 一行脚本:
cd /data/workspace/myshixun/project/step1/work && mvn package && spark-submit --class WordCount /data/workspace/myshixun/project/step1/work/target/work-1.0-SNAPSHOT.jar > /root/result.txt
读取文件(循环输出)
编程要求
**目前已经构建好了 Spark 的应用程序根目录,存储在
/data/workspace/myshixun/project/step1/work
目录下,其程序相关的子文件以及pom.xml
文件都已经构建完成,无需再进行操作。
请你完善右侧代码文件窗口中的程序,读取文件 /data/bigfiles/example.txt
中的内容,循环遍历输出其中的所有内容。
代码补充完成后,打开右侧命令行窗口,使用
maven
打包项目,并使用spark-submit
进行手动提交,将打包提交运行的结果保存到/root/result.txt
文件中。**
代码编写
import org.apache.spark.{SparkConf, SparkContext}
object First_Question {
def main(args: Array[String]): Unit = {
/******************* Begin *******************/
// 定义文件存储路径
val filePath = "/data/bigfiles/example.txt"
// 创建 SparkConf 对象
val conf = new SparkConf().setAppName("First_Question").setMaster("local")
// 创建 SparkContext 对象
val sc = new SparkContext(conf)
// 读取文件内容,遍历输出
val lines = sc.textFile(filePath)
lines.foreach(line => println(line))
/******************* End *******************/
}
}
控制台
cd /data/workspace/myshixun/project/step1/work && mvn package && spark-submit --class WordCount /data/workspace/myshixun/project/step1/work/target/work-1.0-SNAPSHOT.jar > /root/result.txt
spark-shell过滤输出
编程要求
读取文件
/data/bigfiles/example.txt
中的内容,使用 Spark-Shell 将所有以大写字母L
开头的行内容保存到/root/result
目录下。
控制台
# 进入spark-shell交互编程环境
spark-shell
# 读取文件,逐行拦截L开头的,保存到文件
sc.textFile("/data/bigfiles/example.txt").filter(line => line.startsWith("L")).saveAsTextFile("/root/result")
Spark Standalone模式部署
编程要求
完成 Spark Standalone 模式部署。
Spark 压缩包存储路径:/data/bigfiles/spark-2.1.0-bin-hadoop2.7.tgz
Hadoop 存储目录:/usr/local/hadoop
JDK 存储目录:/usr/lib/jvm/jdk1.8.0_111
控制台
# 创建目录
mkdir /usr/local/spark
# 解压文件
tar -xvf /data/bigfiles/spark-2.1.0-bin-hadoop2.7.tgz
# 移动文件
mv spark-2.1.0-bin-hadoop2.7/* /usr/local/spark
#配置文件
cat /usr/local/spark/conf/spark-env.sh.template > /usr/local/spark/conf/spark-env.sh
cat /usr/local/spark/conf/spark-defaults.conf.template > /usr/local/spark/conf/spark-defaults.conf
cat /usr/local/spark/conf/slaves.template > /usr/local/spark/conf/slaves.conf
echo "export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_111" >> /usr/local/spark/conf/spark-env.sh
#启动集群
/usr/local/spark/sbin/start-all.sh
RDD编程
数据分析
编程要求
使用 spark-shell 读取数据集 /data/bigfiles/Data01.txt 中的内容,完成交互式编程任务。
该数据集包含了某大学计算机系的成绩,数据格式如下所示:
Tom,DataBase,80
Tom,Algorithm,50
Tom,DataStructure,60
Jim,DataBase,90
Jim,Algorithm,60
Jim,DataStructure,80
……
请根据给定的实验数据集,在 spark-shell 中通过编程来计算以下内容:
- 统计该系总共有多少学生,将统计结果存储到 /root/result.txt 文件中(只保存统计的数值)。
- 统计该系共开设来多少门课程,将统计结果存储到 /root/result2.txt 文件中(只保存统计的数值)。
- Tom 同学的总成绩平均分是多少,将统计结果存储到 /root/result3.txt 文件中(只保存统计的数值)。
- 求每名同学的选修的课程门数,将统计结果存储到 /root/result4 目录下(按姓名进行升序排列),输出示例如下:
(Bartholomew,5)
(Lennon,4)
(Joshua,4)
...
- 统计该系 DataBase 课程共有多少人选修,将统计结果存储到 /root/result5.txt 文件中(只保存统计的数值)。
- 统计各门课程的平均分是多少,将统计结果存储到 /root/result6 目录下(按课程名进行升序排列),输出示例如下:
(Algorithm,48)
(CLanguage,50)
(Software,50)
...
- 使用累加器计算共有多少人选了 DataBase 这门课,将统计结果存储到 /root/result7.txt 文件中(只保存统计的数值)。
代码实现
#读取数据
rdd = sc.textFile("file:///usr/local/spark/mycode/rdd/data01.txt")
rdd1 = rdd.map(lambda x:x.split(","))
#①多少名学生
rdd1.map(lambda x:(x[0],1)).groupByKey().count()
#②多少门课程
rdd1.map(lambda x:(x[1],1)).groupByKey().count()
#③Tom总成绩平均分
Tom_score = rdd1.filter(lambda x:(x[0]=='Tom')).map(lambda x:(int(x[2]))).sum()
Tom_num = rdd1.filter(lambda x:(x[0]=='Tom')).map(lambda x:(int(x[2]))).count()
Tom_score/Tom_num
#④每名同学的选修的课程数量
rdd1.map(lambda x:(x[0],1)).groupByKey().map(lambda x:(x[0],sum(x[1]))).collect()
#⑤该系DataBase课程共有多少人选修
rdd1.filter(lambda x:(x[1]=='DataBase')).count()
#⑥各门课程平均分是多少
total_score = rdd1.map(lambda x:(x[1],int(x[2]))).groupByKey().map(lambda x:(x[0],sum(x[1])))
total_num = rdd1.map(lambda x:(x[1],1)).groupByKey().map(lambda x:(x[0],sum(x[1])))
total_score.join(total_num).map(lambda x:(x[0],round(x[1][0]/x[1][1],2))).collect()
#reduceByKey方式
rdd1.map(lambda x:(x[1],(int(x[2]),1))). \
reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])). \
map(lambda x:(x[0],round(x[1][0]/x[1][1],2))).collect()
#⑦使用累加器计算共有多少人选修DataBase这门课
accum=sc.accumulator(0)
rdd2 = rdd1.filter(lambda x:(x[1]=='DataBase'))
rdd2.foreach(lambda x:accum.add(1))
accum.value
控制台
spark
val data = sc.textFile("/data/bigfiles/Data01.txt")
val result4 = data.map(line => {
val fields = line.split(",")
(fields(0), fields(1))
}).groupBy(_._1).mapValues(_.size).sortByKey().collect()
sc.parallelize(result4).saveAsTextFile("/root/result4")
val result6 = data.map(line => {
val fields = line.split(",")
(fields(1), fields(2).toDouble)
}).groupBy(_._1).mapValues(values => {
val scores = values.map(_._2)
(scores.sum / scores.size).toInt
}).sortByKey().collect()
sc.parallelize(result6).saveAsTextFile("/root/result6")