跳至主要內容

头歌spark实训

fatSheep大约 5 分钟

Spark环境搭建和使用

词频统计

任务描述

读取文件 /data/bigfiles/example.txt 中的内容,完成 WordCount 词频统计,其中单词之间的间隔符为:一个空格符。最终输出结果按大小写字母升序排列,使用 \t 作为字段输出间隔符,如下所示:

A    10
B    3
a    5
b    10
c    1
d    9

代码补充完成后,打开右侧命令行窗口,使用 maven 打包项目,并使用 spark-submit 进行手动提交,将打包提交运行的结果保存到 /root/result.txt 文件中。

代码编写

import org.apache.spark.{SparkConf, SparkContext}

  

object WordCount {

  

def main(args: Array[String]): Unit = {

  

/******************* Begin *******************/

  

// 定义读取的文件路径

val filePath = "/data/bigfiles/example.txt"

  

// 创建 SparkConf 对象

val conf = new SparkConf().setAppName("WordCount").setMaster("local")

  

// 创建 SparkContext 对象

val sc = new SparkContext(conf)

  

// 读取文件

val lines = sc.textFile(filePath)

  

// 将每行拆分为单词

val words = lines.flatMap(line => line.split(" "))

  

// 将每个单词映射为 (word, 1) 的键值对

val wordCounts = words.map(word => (word, 1))

  

// 对每个单词进行计数

val result = wordCounts.reduceByKey(_ + _)

  

// 按照频率进行排序

val sortedResult = result.sortByKey()

  

// 输出结果

sortedResult.zipWithIndex().foreach { case ((word, count), index) =>

println(s"${word}\t$count")

}

  

/******************* End *******************/

  

}

}

控制台

切换目录到项目,maven打包后,提交到spark执行,执行结果写入到txt文件 一行脚本:

cd /data/workspace/myshixun/project/step1/work && mvn package && spark-submit --class WordCount /data/workspace/myshixun/project/step1/work/target/work-1.0-SNAPSHOT.jar > /root/result.txt

读取文件(循环输出)

编程要求

**目前已经构建好了 Spark 的应用程序根目录,存储在 /data/workspace/myshixun/project/step1/work 目录下,其程序相关的子文件以及 pom.xml 文件都已经构建完成,无需再进行操作。

请你完善右侧代码文件窗口中的程序,读取文件 /data/bigfiles/example.txt 中的内容,循环遍历输出其中的所有内容。

代码补充完成后,打开右侧命令行窗口,使用 maven 打包项目,并使用 spark-submit 进行手动提交,将打包提交运行的结果保存到 /root/result.txt 文件中。**

代码编写

import org.apache.spark.{SparkConf, SparkContext}

  

object First_Question {

  

def main(args: Array[String]): Unit = {

  

/******************* Begin *******************/

  

// 定义文件存储路径

val filePath = "/data/bigfiles/example.txt"

  

// 创建 SparkConf 对象

val conf = new SparkConf().setAppName("First_Question").setMaster("local")

  

// 创建 SparkContext 对象

val sc = new SparkContext(conf)

  

// 读取文件内容,遍历输出

val lines = sc.textFile(filePath)

lines.foreach(line => println(line))

  

/******************* End *******************/

  

}

  

}

控制台

cd /data/workspace/myshixun/project/step1/work && mvn package && spark-submit --class WordCount /data/workspace/myshixun/project/step1/work/target/work-1.0-SNAPSHOT.jar > /root/result.txt

spark-shell过滤输出

编程要求

读取文件 /data/bigfiles/example.txt 中的内容,使用 Spark-Shell 将所有以大写字母 L 开头的行内容保存到 /root/result 目录下。

控制台

# 进入spark-shell交互编程环境
spark-shell

# 读取文件,逐行拦截L开头的,保存到文件
sc.textFile("/data/bigfiles/example.txt").filter(line => line.startsWith("L")).saveAsTextFile("/root/result")

Spark Standalone模式部署

编程要求

完成 Spark Standalone 模式部署。

Spark 压缩包存储路径:/data/bigfiles/spark-2.1.0-bin-hadoop2.7.tgz

Hadoop 存储目录:/usr/local/hadoop

JDK 存储目录:/usr/lib/jvm/jdk1.8.0_111

控制台


# 创建目录

mkdir /usr/local/spark

# 解压文件

tar -xvf /data/bigfiles/spark-2.1.0-bin-hadoop2.7.tgz

# 移动文件

mv spark-2.1.0-bin-hadoop2.7/* /usr/local/spark

#配置文件

cat /usr/local/spark/conf/spark-env.sh.template > /usr/local/spark/conf/spark-env.sh

cat /usr/local/spark/conf/spark-defaults.conf.template > /usr/local/spark/conf/spark-defaults.conf

cat /usr/local/spark/conf/slaves.template > /usr/local/spark/conf/slaves.conf

  

echo "export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_111" >> /usr/local/spark/conf/spark-env.sh

  

#启动集群

/usr/local/spark/sbin/start-all.sh

RDD编程

数据分析

编程要求

使用 spark-shell 读取数据集 /data/bigfiles/Data01.txt 中的内容,完成交互式编程任务。

该数据集包含了某大学计算机系的成绩,数据格式如下所示:

Tom,DataBase,80
Tom,Algorithm,50
Tom,DataStructure,60
Jim,DataBase,90
Jim,Algorithm,60
Jim,DataStructure,80
……

请根据给定的实验数据集,在 spark-shell 中通过编程来计算以下内容:

  • 统计该系总共有多少学生,将统计结果存储到 /root/result.txt 文件中(只保存统计的数值)。
  • 统计该系共开设来多少门课程,将统计结果存储到 /root/result2.txt 文件中(只保存统计的数值)。
  • Tom 同学的总成绩平均分是多少,将统计结果存储到 /root/result3.txt 文件中(只保存统计的数值)。
  • 求每名同学的选修的课程门数,将统计结果存储到 /root/result4 目录下(按姓名进行升序排列),输出示例如下:
(Bartholomew,5)
(Lennon,4)
(Joshua,4)
...
  • 统计该系 DataBase 课程共有多少人选修,将统计结果存储到 /root/result5.txt 文件中(只保存统计的数值)。
  • 统计各门课程的平均分是多少,将统计结果存储到 /root/result6 目录下(按课程名进行升序排列),输出示例如下:
(Algorithm,48)
(CLanguage,50)
(Software,50)
...
  • 使用累加器计算共有多少人选了 DataBase 这门课,将统计结果存储到 /root/result7.txt 文件中(只保存统计的数值)。

代码实现

#读取数据
rdd = sc.textFile("file:///usr/local/spark/mycode/rdd/data01.txt")
rdd1 = rdd.map(lambda x:x.split(","))

#①多少名学生
rdd1.map(lambda x:(x[0],1)).groupByKey().count()

#②多少门课程
rdd1.map(lambda x:(x[1],1)).groupByKey().count()

#③Tom总成绩平均分
Tom_score = rdd1.filter(lambda x:(x[0]=='Tom')).map(lambda x:(int(x[2]))).sum()
Tom_num = rdd1.filter(lambda x:(x[0]=='Tom')).map(lambda x:(int(x[2]))).count()
Tom_score/Tom_num

#④每名同学的选修的课程数量
rdd1.map(lambda x:(x[0],1)).groupByKey().map(lambda x:(x[0],sum(x[1]))).collect()

#⑤该系DataBase课程共有多少人选修
rdd1.filter(lambda x:(x[1]=='DataBase')).count()

#⑥各门课程平均分是多少
total_score = rdd1.map(lambda x:(x[1],int(x[2]))).groupByKey().map(lambda x:(x[0],sum(x[1])))
total_num = rdd1.map(lambda x:(x[1],1)).groupByKey().map(lambda x:(x[0],sum(x[1])))
total_score.join(total_num).map(lambda x:(x[0],round(x[1][0]/x[1][1],2))).collect()
#reduceByKey方式
rdd1.map(lambda x:(x[1],(int(x[2]),1))). \
         reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])). \
          map(lambda x:(x[0],round(x[1][0]/x[1][1],2))).collect()

#⑦使用累加器计算共有多少人选修DataBase这门课
accum=sc.accumulator(0)
rdd2 = rdd1.filter(lambda x:(x[1]=='DataBase'))
rdd2.foreach(lambda x:accum.add(1))
accum.value

控制台

spark

val data = sc.textFile("/data/bigfiles/Data01.txt")

val result4 = data.map(line => {
  val fields = line.split(",")
  (fields(0), fields(1))
}).groupBy(_._1).mapValues(_.size).sortByKey().collect()

sc.parallelize(result4).saveAsTextFile("/root/result4")

val result6 = data.map(line => {
  val fields = line.split(",")
  (fields(1), fields(2).toDouble)
}).groupBy(_._1).mapValues(values => {
  val scores = values.map(_._2)
  (scores.sum / scores.size).toInt
}).sortByKey().collect()


sc.parallelize(result6).saveAsTextFile("/root/result6")