在我刚刚完成的一门课程中(https://www.udemy.com/scalable-programming-with-scala-and-spark/learn/v4/overview),提到的一件事是Spark是数据分析的好工具“未知的未知数”。
为了解释这一点,我们必须了解:
- “已知知识”我们知道的事情,我们知道。
- “未知”我们知道我们不知道的事情。
- ‘Unknown-Unknowns’我们不知道的事情我们也不知道。
数据分析中的未知-未知将是您不了解的关系和驱动程序。
Spark将REPL(读取-评估-打印-循环)环境与一个稳定且高性能的系统结合在一起。
Spark是用于数据处理和分析的通用引擎。 它是使用Scala构建的,但是您也可以通过API用其他语言(例如Python或Java)编写Spark代码。 Spark是Hadoop生态系统的一部分,因此其引擎能够进行分布式计算。
RDD(弹性分布式数据集)。 它们是Spark中的主要编程抽象。 它们是内存中对象,所有数据都使用它们进行处理。 它们具有弹性是因为它们生活在内存中,但被设计为容错的。
在继续之前,我已经创建了一个csv文件(books.csv),您可以从我的github下载该文件,并将其用于下面说明的示例。
eday69 / SparkRDD
SparkRDD –使用Spark RDD示例
github.com
回顾一下将文件移入HFS。
将文件从本地计算机复制到AWS服务器:
> scp -i file.pem /pathtofile/books.csv user @ myawsfileserver:/pathtofileinAWS/books.csv
> scp -i file.pem /pathtofile/publishers.csv user @ myawsfileserver:/pathtofileinAWS/publishers.csv
在HDFS(可选)中为文件创建目录:
>须藤hdfs dfs -mkdir sparkrdd
将文件复制到HDFS:
> sudo hdfs dfs -put books.csv / user / root / sparkrdd /
> sudo hdfs dfs -put Publishers.csv / user / root / sparkrdd /
RDD具有3个定义特征:分区,它们是只读的,并且知道其沿袭:
- 分区:数据分为多个分区,并分配到多个计算机/节点。
- 只读:不可变。 您只能做两件事:从中读取数据(操作),也可以将其转换为另一个RDD(转换)。
- 血统:要了解血统,一个简单的例子就是家谱。 对于该家族树的特定成员,您可以追溯父亲,父亲的父亲(祖父)等。在RDD中,您可能会以相同的方式来思考:它仅包含元数据,有关如何操作的说明创建它,并且它知道其父级是谁。 知道父级是谁也可以重新创建该RDD,并且由于父级也是RDD,我们可以知道父级是谁(祖父),依此类推,直到我们将其追溯到信息的单一来源为止。 这使得RDD具有弹性(可以随时对其进行重构),并可以进行延迟评估。 数据被“物化”,直到在RDD上执行操作为止。
您可以使用RDD做什么?
- 创建一个RDD
- 应用转换
- 应用动作以获取结果
- 将RDD坚持到磁盘。
例子。
创建RDD
//来自数据源(磁盘)
val books = sc.textFile(“ scalardd / books.csv”)
//通过应用转换从另一个RDD
books.filter(lambda x:’Description’not in x)
转变
- filter:根据条件(布尔值)从RDD中过滤记录
- map:应用在每个记录上返回单个对象的函数
- flatmap:应用任何可返回迭代器并将其转换为多个记录的函数
这些只是最“流行”的转换。
另外,我们还有其他适用于2个RDD的转换:
- union:2个RDD的并集
- 交集:2个RDD的交集
- 减去:从另一个中删除1个RDD的内容
- 笛卡尔:笛卡尔乘积:创建元组的RDD。
动作
collect:返回所有RDD元素。 出于这个原因,它使用不多:我们将处理大量数据,并且此函数返回所有数据。 通常,我们试图以某种方式对其进行过滤或使用部分信息。
- 第一:返回第一行
- 取:返回任意行
- count:返回总行数
- countByValue:计算值在RDD中出现的次数
- reduce:它将以指定的方式组合RDD中的所有值。
- 集合:类似于reduce,但允许返回不同类型的元素。
这些是一些常用的动作。
测试示例
好的,足够的理论知识,让我们尝试一些实时测试! 为此,我们将使用Spark-Shell:
>须藤火花壳


使用spark-shell的一大好处是,我们的SparkContext是自动创建的。 如果您阅读上图的文本,您会看到: “ Spark context available as sc”,这是我们的SparkContext变量,它表示与Spark Cluster的连接。 这是我们将数据加载到内存中的位置,这是创建RDD的要求。
请记住:我们所有的数据分析都将通过对RDD的操作进行。
以前,我们使用此命令创建了一个RDD,还记得吗?
> val books = sc.textFile(“ sparkrdd / books.csv”);
这样,我们创建了第一个RDD! 让我们对其进行一些操作:
> books.count();
→res0:长=38154。— — —我们有38,154条记录的RDD输出。
> books.first();
→res1:字符串= 9780781795807,21,“鲁宾的病理学评论”,44.95,35.0
在这里,它为我们提供了RDD的第一条记录。 你有发现奇怪的事吗? 它全部是一个字符串! 为了访问数据的不同字段,将这些字段分开将很方便。
books.map(_split(“,”));
“ _”字符充当每个记录的占位符。 但是,除非您想使用数组的RDD(这是上一条命令所能提供的),否则我们可以设置为按名称引用这些列。 从字符串本身将字段转换为相关的数据类型。
首先,我们将设置一个代表记录的类:
案例类Book(isbn:String,Publisher_id:Int,title:String,价格:Float,cost:Float)
案例类是Scala中的一种特殊类型的类。 案例类用于设置普通对象,这些普通对象仅用于保存具有特定结构的数据。 构造函数中的参数将自动成为公共成员变量。 构造函数可以像函数一样使用。
因此,知道了这一点,我们将编写一个函数,将RDD书中的每个字符串记录转换为此类。
这就是我们将如何在Scala中编写它:


现在,使用已解析的字段创建新的RDD:
> val booksParsed = books.map(解析)
→booksParsed:org.apache.spark.rdd.RDD [Book] = MapPartitionsRDD [2]在地图上的:33
如果我们调用第一条记录:
> booksParsed.first();
res0:书籍=书籍(9780781795807,21,“鲁宾的病理学评论”,44.95,35.0)
现在,我们得到一个Book对象作为记录,而不是字符串。 我们可以使用字段名称访问Book对象的值。
计算书籍的平均价格
要计算平均价格,我们需要知道所有价格的总和,然后将其除以图书数量,对吗? 要获取书籍总数,我们只需要计算RDD的记录即可。 对于所有价格的总和,我们需要其他东西,我们需要另一种方法。
> val totalPrice = booksParsed.map(_。price).reduce((x,y)=> x + y)
不幸的是,当我尝试执行最后一条命令时,我的hadoop服务器崩溃了。 在解决此问题的同时,我将发布该故事并在另一个故事中继续该主题。
希望这对您有用,就像对我一样。 一如既往,请拍手!
未完待续…