【Python+Spark2.0+Hadoop机器学习与大数据实战】实践课

这本书浅显易懂,虽然代码中的错误还是比较多,问题不大,但是就会执行不了。


更新历史

  • 2020.02.18:重新上线
  • 2019.05.21:完成阅读与读后感(还差最后四章)
  • 2019.05.20:开始阅读

读后感

这本书浅显易懂,虽然代码中的错误还是比较多,问题不大,但是就会执行不了。有一些就比较要命,比如 AUC 的定义都有错误。总体来说属于官方文档+教程的翻译+扩展,初学者值得一看。

阅读笔记

第 1 章 基本概念

关于机器学习

  • 机器学习训练用的数据是由 Feature, Label 组成的
  • 机器学习分为训练、预测两个阶段
  • 机器学习分类:二元、多元、回归、聚类

关于 Spark

  • Spark 的核心是 RDD(Resilient Distributed Dataset) 弹性分布式数据集
  • Spark 在内存中运行程序
  • Spark 2.0 主要功能:Spark SQL(DataFrame), Spark Streaming, Mllib(ML), GraphX

Spark 数据处理方式主要有三种:

  • RDD:不必定义 Schema,必须有 Map/Reduce 的概念
  • DataFrame:必须定义 Schema
  • Spark SQL:由 DataFrame 衍生出来,需要先建立 DataFrame
  • 易用程度:Spark SQL > DataFrame > RDD
  • 用 python 操作 DataFrames 性能与 Scala 几乎相同,而如果是 RDD,则会比 Scala 慢很多

Spark 机器学习主要有两个 API:

  1. Spark MLlib:RDD-based 机器学习 API
  2. Spark ML pipeline:Dataframes-based 机器学习 API
    1. Spark DataFrame 与 Pandas DataFrame 可以互相转换
    2. Spark DataFrame 可以轻松读取大数据中的各种数据源

关于 MapReduce

  • Map 将任务分割成更小任务,由每台服务器分别运行
  • Reduce 将所有服务器的运算结果汇总整理,返回最后的结果
  • YARN 是 MapReduce 的效率更高的资源管理核心

第 2、3、4、5 章是安装配置,略过

第 6 章 Hadoop HDFS 命令

下面列出常用的命令:

hadoop fs -mkdir
hadoop fs -ls
hadoop fs -copyFromLocal
hadoop fs -put
hadoop fs -cat
hadoop fs -copyToLocal
hadoop fs -get
hadoop fs -cp
hadoop fs -rm

第 7 章 Hadoop MapReduce

用的是 WordCount.java 的例子。Hadoop MapReduce 的缺点为:

  • 程序设计模式不容易使用,开发者效率很低
  • 中间数据保存到硬盘,有读写延迟,运行效率不高
  • 不支持实时处理,原始设计就是以批处理为主

第 8 章 Python Spark 介绍

  • SparkContext 通过 Cluster Manager 管理整个集群,集群中包含多个 Worker Node,在每个 WorkerNode 中都有 Executor 负责执行任务
  • 命令行中输入 pyspark 可以进入交互式界面
  • 通过修改 log4j.propertiesrootCategory=WARN 来减少无用的输出,位于 /Users/dawang/spark-2.4.3-bin-hadoop2.7/conf
  • 在本地运行 pyspark 程序 pyspark --master local[4] 表示用 4 个线程
  • 读取本地的文件 textFile=sc.textFile("file:/usr/local/xxxx")
  • 显示项数 textFile.count()
  • 在 YARN 上运行 HADOOP_CONF_DIR=xxxxx pyspark --master yarn -deploy-mode client
  • 查看 Master sc.master

第 9 章是在 IPythonNotebook 中运行 Spark,略

第 10 章 Python Spark RDD

RDD 上可以施加 3 种类型的运算:

  1. 转换 Transformation:产生另一个 RDD,具有 lazy 特性,不会立刻执行
  2. 动作 Action:不会产生另一个 RDD,而是会产生数值、数组或写入文件系统;会立即执行(与之前的"转换"运算一起)
  3. 持久化 Persistence:对于重复使用的 RDD,可以持久化在内存中

RDD 本身具有 Lineage 机制,会记录每个 RDD 与其父代 RDD 之间的关联。RDD 本身具有不可变特性,再加上 Lineage 机制,使得 Spark 具备容错的特性。

单个 RDD 转换运算

  • Map 运算:通过传入的函数将每一个元素经过函数运算产生另外一个 RDD
  • Filter 运算:对 RDD 内的每一个元素进行筛选,并产生另外的 RDD
  • Distinct 运算:删除重复元素
  • RandomSplit 运算:将整个集合元素以随机数的方式按照比例分为多个 RDD
  • GroupBy 运算:按照传入的函数规则将数据分为多个 List

直接上代码

# 创建 RDD 可以使用 parallelize 方法
intRDD = sc.parallelize([3, 1, 2, 5, 5])
intRDD.collect()

stringRDD = sc.parallelize(["Apple", "Orange", "Banana", "Grape", "Apple"])
stringRDD.collect()

# ========================
# Map 运算:通过传入的函数将每一个元素经过函数运算产生另外一个 RDD
def addOne(x):
  return (x+1)

intRDD.map(addOne).collect()
# 用匿名函数可以更简单
intRDD.map(lambda x:x+1).collect()
# 简单的功能使用匿名函数,复杂的功能使用具名函数

# 同样也可以对 String 进行处理
stringRDD.map(lambda x:"fruit:"+x).collect()

# ========================
# Filter 运算:对 RDD 内的每一个元素进行筛选,并产生另外的 RDD

# 筛选小于 3 的数字
intRDD.filter(lambda x:x<3).collect()
# 筛选含有 ra 的字符串
stringRDD.filter(lambda x:"ra" in x).collect()

# ========================
# Distinct 运算:删除重复元素
intRDD.distinct().collect()
stringRDD.distinct().collect()

# ========================
# RandomSplit 运算:将整个集合元素以随机数的方式按照比例分为多个 RDD
# 四六开拆分
sRDD = intRDD.randomSplit([0.4, 0.6])
sRDD[0].collect()
sRDD[1].collect()

# ========================
# GroupBy 运算:按照传入的函数规则将数据分为多个 List
# 按照奇偶拆分
gRDD = intRDD.groupBy(lambda x: "even" if (x % 2 == 0) else "odd").collect()
print(gRDD[0][0], sorted(gRDD[0][1]))
print(gRDD[1][0], sorted(gRDD[1][1]))

多个 RDD 转换运算

  • Union 并集运算
  • Intersection 交集运算
  • Subtract 差集运算
  • Cartesian 笛卡尔乘积运算

具体看代码

# 先创建多个 RDD
intRDD1 = sc.parallelize([3, 1, 2, 5, 5])
intRDD2 = sc.parallelize([5, 6])
intRDD3 = sc.parallelize([2, 7])

# 并集运算
intRDD1.union(intRDD2).union(intRDD3).collect()

# 交集运算
intRDD1.intersection(intRDD2).collect()

# 差集运算
intRDD1.subtract(intRDD2).collect()

# 笛卡尔乘积运算
print(intRDD1.cartesian(intRDD2).collect())

动作运算

  • 读取元素
  • 统计功能

具体参考代码

# 读取元素
# ========================
# 取出第一个
intRDD.first()
# 取出前两个
intRDD.take(2)
# 从小到大排序,取出前 3 个
intRDD.takeOrdered(3)
# 从大到小排序,取出前 3 个
intRDD.takeOrdered(3, key=lambda x:-x)

# 统计元素
# ========================
# 统计
intRDD.stats()
# 最小
intRDD.min()
# 最大
intRDD.max()
# 标准差
intRDD.stdev()
# 计数
intRDD.count()
# 总和
intRDD.sum()
# 平均
intRDD.mean()

单个 Key-Value RDD 转换运算

Key-Value 运算也是 Map/Reduce 的基础

  • Filter 运算:筛选元素,产生另一个 RDD
  • Map 运算:对每一组 kv 进行运算,产生另一个 RDD
  • SortByKey 运算:按照 key 排序
  • ReduceByKey 运算:按照 key 进行 Reduce 运算

具体参考代码

# 创建 KV RDD
kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])

# 列出全部 key
kvRDD1.keys().collect()
# 列出全部 value
kvRDD1.values().collect()

# 筛选 key 小于 5 的
kvRDD1.filter(lambda keyValue: keyValue[0]<5).collect()
# 筛选 value 小于 5 的
kvRDD1.filter(lambda keyValue: keyValue[1]<5).collect()

# 将 value 每一个值进行平方
kvRDD1.mapValues(lambda x: x*x).collect()

# 从小到大按照 key 排序
kvRDD1.sortByKey(ascending=True).collect()
# 从大到小按照 key 排序
kvRDD1.sortByKey(ascending=False).collect()

# 将具有相同 key 值的数据合并
kvRDD1.reduceByKey(lambda x,y: x+y).collect()

多个 Key-Value RDD 转换运算

  • Join 运算:将两个 RDD 按照相同的 key 值 join 起来
  • LeftOuterJoin 运算:从左边集合对应到右边集合,如果没有,则显示 None
  • RightOuterJoin 运算:从右边集合对应到左边集合
  • SubtractByKey 运算:删除相同 key 值的数据

具体参考代码

# 创建 KV RDD
kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
kvRDD2 = sc.parallelize([(3,8)])

# join 运算
kvRDD1.join(kvRDD2).collect()
# [(3, (4, 8)), (3, (6, 8))],这里 3 是相同的 key,所以 4 和 8 一组,然后 6 和 8 一组

# LeftOuterJoin 运算
kvRDD1.leftOuterJoin(kvRDD2).collect()
# [(1, (2, None)), (3, (4, 8)), (3, (6, 8)), (5, (6, None))]

# RightOuterJoin 运算
kvRDD1.rightOuterJoin(kvRDD2).collect()
# [(3, (4, 8)), (3, (6, 8))]

# SubtractByKey 运算
kvRDD1.subtractByKey(kvRDD2).collect()
# [(1, 2), (5, 6)]

Key-Value 动作运算

具体参考代码

# 获取第一项数据
kvRDD1.first()

# 获取前两项数据
kvRDD1.take(2)

# 计算每一个 Key 值的项数
kvRDD1.countByKey()

# 创建 KV 字典 collectAsMap,如果有多个相同的 key,只会保留最后一个
KV=kvRDD1.collectAsMap()
print(type(KV))
print(KV[3])

# 寻找特定 key 的值,lookup
kvRDD1.lookup(3)

Broadcast 广播变量

使用规则:

  1. 使用 SparkContext.broadcast([初始值]) 创建
  2. 使用 .value 的方法来读取广播变量的值
  3. Broadcast 广播变量被创建后不能修改

优势:减少常用的变量向 WorkerNode 传递的次数

具体参考代码

kvFruit = sc.parallelize([(1, "apple"), (2, "orange"), (3, "banana"), (4, "grape")])
# 创建 fruitMap 字典
fruitMap = kvFruit.collectAsMap()
print("对照表:" + str(fruitMap))
# 将 fruitMap 字典转换为广播变量
bcFruitMap = sc.broadcast(fruitMap)

# 创建 fruitIds
fruitIds = sc.parallelize([2,4,1,3])
print("水果编号:" + str(fruitIds.collect()))

# 使用广播字典进行转换
fruitNames = fruitIds.map(lambda x:bcFruitMap.value[x]).collect()
print("水果名称:"+str(fruitNames))

Accumulator 累加器

计算总和是 MapReduce 常用的运算,为此 Spark 提供了 accumulator 累加器共享变量,使用规则如下:

  1. 使用 SparkContext.accumulator([初始值]) 来创建
  2. 使用 .add() 进行累加
  3. 在 task 中,例如 foreach 训练中,不能读取累加器的值
  4. 只有驱动程序,也就是循环外,才可以使用 .value 来读取累加器的值

具体参考代码:

intRDD1 = sc.parallelize([3, 1, 2, 5, 5])
# 创建 total 累加器,Double 类型
total = sc.accumulator(0.0)
# 创建 num 累加器,Int 类型
num = sc.accumulator(0)
# 使用 foreach 遍历操作
intRDD.foreach(lambda i: [total.add(i), num.add(1)])
# 计算平均并显示
avg = total.value / num.value
print("total="+str(total.value)+",num="+str(num.value)+",avg="+str(avg))

RDD Persistence 持久化

使用方法:

  • RDD.persist(存储等级),默认是 MEMORY_ONLY
  • RDD.unpersist(),取消持久化

持久化存储等级如下:

  • MEMORY_ONLY:以 Java 对象反序列化在 JVM 内存中。如果 RDD 太大,则不会缓存在内存中,需要时重新计算
  • MEMORY_AND_DISK:以 Java 对象反序列化在 JVM 内存中。如果 RDD 太大,会将多余的 RDD paritions 存储在硬盘中,需要时再从硬盘读取
  • MEMORY_ONLY_SER:与 MEMORY_ONLY 类似,但是以 Java 对象序列化来存储,需要再进行反序列化才能使用,但是比较省内存存储空间。如果 RDD 太大,则不会缓存在内存中,需要时重新计算
  • MEMORY_AND_DISK_SER:与 MEMORY_AND_DISK 类似,如果 RDD 太大,会将多余的 RDD paritions 存储在硬盘中,需要时再从硬盘读取
  • DISK_ONLY:把 RDD 存储在硬盘上
  • MEMORY_ONLY_2, MEMORY_AND_DISK_2:和前面意思一样,但是每一个 RDD partitions 都复制到两个节点

Spark WordCount

/Users/dawang/Documents/GitHub/computational-advertising-note/spark 目录下启动 pyspark,输入下面的代码

# 读取文本文件
textFile = sc.textFile("data/basic.txt")
# 使用 flatMap 空格符分隔单词,读取每个单词
stringRDD = textFile.flatMap(lambda line : line.split(" "))
# 通过 map reduce 计算每一个单词出现的次数
countsRDD = stringRDD.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x+y)
# 保存计算结果
countsRDD.saveAsTextFile("data/basic_count")

第 11 章是如何用 Eclipse Scala IDE 的,比较落伍,略

第 12 章 Python Spark 创建推荐引擎

Spark 支持 ALS(Alternating Least Squares) 推荐算法,是机器学习的协同过滤推荐算法。机器学习的协同过滤推荐算范通过观察所有用户给产品的评价来推断每个用户的喜好,并向用户推荐适合的多个产品,也可以把某一个产品推荐给多个用户。

代码参考 computational-advertising-note/spark/1-movielens-recommend.py

常见推荐算法

  • 基于关系型规则的推荐(Association Rule)
    • 消费者购买产品 A,那么他有多大机会购买产品 B
    • 购物车分析(啤酒和尿布)
  • 基于内容的推荐(Content-based)
    • 分析网页内容自动分类,再将用户自动分类
    • 将新进已分类的网页推荐给对该群感兴趣的用户
  • 人工统计式的推荐(Demographic)
    • 将用户以个人属性(性别、年龄、教育背景、居住地、语言)作为分类的指标
    • 以此类作为推荐的基础
  • 协同过滤式的推荐(Collaborative Filtering)
    • 通过观察所有用户对产品的评分来推断用户的喜好
    • 找出对产品评分相近的其他用户,他们喜欢的产品当前用户多半也会喜欢

协同过滤式推荐的优缺点:

  • 优点
    • 个性化推荐
    • 不需要内容分析
    • 可以发现用户新的兴趣点
    • 自动化程度高
  • 缺点
    • 冷启动问题:如果没有历史数据就没办法分析
    • 新用户问题:新用户没有评分,就不知道他的喜好

数据集

采用 Movielens 的 ml-100k 数据集,保存到 /Users/dawang/Documents/GitHub/computational-advertising-note/spark/data 中,解压后会后很多文件,我们主要使用:

  • u.dat 用户评分数据:包含四个字段,userid, itemid, rating, timestamp
  • u.item 电影的数据:主要使用 movieid, movietitle

训练模型

ALS.train 可以分为显式评分训练(ALS.train)与隐式评分训练(ALS.trainImplicit),均返回 MatrixFactorizationModel。这里我们使用显式。

第 13 章 Python Spark MLlib 决策树二分类

决策树模型的应用广泛,优点是条理清晰、方法简单、易于理解、适用范围广。本章将使用决策树分类 StumbleUpon 数据集 ,预测网页是暂时的(ephemeral)还是长青的(evergreen)。

决策树不能无限成长,我们必须设置如下参数:

  • maxBins:决策树每一个节点最大分支数
  • maxDepth:决策树最大深度
  • Impurity:决策树分裂节点时的方法
    • 基尼指数(Gini):选择分裂后最小的基尼指数方式
    • 熵(Entropy):选择分裂后最小熵的方式(也就是最不混乱的方式)

数据集

  • 我们用第 3~25 个字段作为特征,比如网页分类、链接数目、图像的比例等
  • 第 26 个字段(因为是从 0 开始的,所以 index 是 25)是 label
    • 1 表示长青 evergreen,会持续让用户感兴趣
    • 0 表示 ephemeral,是一个暂时的网页
  • 这里我们主要用 test.tsvtrain.tsv

评估准确率

采用 AUC 评估二元分类模型

  • True Positives:预测 1,实际 1
  • False Positives:预测 1,实际 0
  • True Negatives:预测 0,实际 0
  • False Negatives:预测 0, 实际 1
  • TPR:所有实际为 1 的样本中被判断为 1 的比例 = TP/(TP+FN)
  • FPR:所有实际为 0 的样本中被错误判断为 1 的比例 = FP/(FP+TN)
  • TPR(y轴) 和 FPR(x 轴) 可以绘制出 ROC 曲线,ROC 曲线下的面积就是 AUC
  • 坐标轴上的值表示的是如果阈值为 x,对应的 TPR 和 FPR
  • AUC 在 0.5~1 之间才优于随机猜测,有预测价值

后面的内容都在在这个数据集上进行机器学习,就不再单独列出章节:

  • 第 14 章 Python Spark MLlib 逻辑回归二分类:在线性回归中因变量是连续变量。例如,年龄与疾病的关系是线性回归,随着年龄增长,得到某疾病的概率也会增加。然而,如果因变量不是连续变量,而是二分变量(例如某个年龄是否得到某疾病),就必须使用逻辑回归 logistic Regression.
  • 第 15 章 Python Spark MLlib SVM 二分类:有监督的学习方法,广泛应用于机器学习分类
  • 第 16 章 Python Spark MLlib 朴素贝叶斯二分类:简单且实用的分类方法

第 17 章 Python Spark MLlib 决策树多分类

数据集

采用 UCI Covertype 数据集 ,目标是根据特征判断某个地方最适合什么植被类型,多分类。下载 covtype.data.gz

  • 最后一个字段是 label 标签字段,其他是 feature 特征字段
  • 字段 1~10 是数值特征字段,比如海拔、方位、斜率、距离水源的垂直距离、距离水源的水平距离、9 点时阴影等
  • 字段 11~14 是荒野分类特征字段,OneHotEncoding,一共有 4 种,所以长度为 4
  • 字段 15~54 是土壤分类的分类特征字段,OneHotEncoding,一共有 40 种,所以长度为 40

第 18 章 Python Spark MLlib 决策树回归分析

预测在不同的情况下(季节、月份、时间、假日、星期、工作日、天气、温度、体感温度、湿度、风速)下,每一小时的租用量

数据集

采用 UCI Bike Sharing 数据集 ,我们主要使用 hour.csv 并忽略部分无关字段

TODO 后面章节与工作相关度较小,暂略,有空再看