使用Scala的Spark RDD基础知识

在我刚刚完成的一门课程中(https://www.udemy.com/scalable-programming-with-scala-and-spark/learn/v4/overview),提到的一件事是Spark是数据分析的好工具“未知的未知数”。

为了解释这一点,我们必须了解:

  • “已知知识”我们知道的事情,我们知道。
  • “未知”我们知道我们不知道的事情。
  • ‘Unknown-Unknowns’我们不知道的事情我们也不知道。

数据分析中的未知-未知将是您不了解的关系和驱动程序。

Spark将REPL(读取-评估-打印-循环)环境与一个稳定且高性能的系统结合在一起。

Spark是用于数据处理和分析的通用引擎。 它是使用Scala构建的,但是您也可以通过API用其他语言(例如Python或Java)编写Spark代码。 Spark是Hadoop生态系统的一部分,因此其引擎能够进行分布式计算。

RDD(弹性分布式数据集)。 它们是Spark中的主要编程抽象。 它们是内存中对象,所有数据都使用它们进行处理。 它们具有弹性是因为它们生活在内存中,但被设计为容错的。

在继续之前,我已经创建了一个csv文件(books.csv),您可以从我的github下载该文件,并将其用于下面说明的示例。

eday69 / SparkRDD

SparkRDD –使用Spark RDD示例

github.com

回顾一下将文件移入HFS。

将文件从本地计算机复制到AWS服务器:

> scp -i file.pem /pathtofile/books.csv user @ myawsfileserver:/pathtofileinAWS/books.csv

> scp -i file.pem /pathtofile/publishers.csv user @ myawsfileserver:/pathtofileinAWS/publishers.csv

在HDFS(可选)中为文件创建目录:

>须藤hdfs dfs -mkdir sparkrdd

将文件复制到HDFS:

> sudo hdfs dfs -put books.csv / user / root / sparkrdd /

> sudo hdfs dfs -put Publishers.csv / user / root / sparkrdd /

RDD具有3个定义特征:分区,它们是只读的,并且知道其沿袭:

  • 分区:数据分为多个分区,并分配到多个计算机/节点。
  • 只读:不可变。 您只能做两件事:从中读取数据(操作),也可以将其转换为另一个RDD(转换)。
  • 血统:要了解血统,一个简单的例子就是家谱。 对于该家族树的特定成员,您可以追溯父亲,父亲的父亲(祖父)等。在RDD中,您可能会以相同的方式来思考:它仅包含元数据,有关如何操作的说明创建它,并且它知道其父级是谁。 知道父级是谁也可以重新创建该RDD,并且由于父级也是RDD,我们可以知道父级是谁(祖父),依此类推,直到我们将其追溯到信息的单一来源为止。 这使得RDD具有弹性(可以随时对其进行重构),并可以进行延迟评估。 数据被“物化”,直到在RDD上执行操作为止。

您可以使用RDD做什么?

  • 创建一个RDD
  • 应用转换
  • 应用动作以获取结果
  • 将RDD坚持到磁盘。

例子。

创建RDD

//来自数据源(磁盘)

val books = sc.textFile(“ scalardd / books.csv”)

//通过应用转换从另一个RDD

books.filter(lambda x:’Description’not in x)

转变

  • filter:根据条件(布尔值)从RDD中过滤记录
  • map:应用在每个记录上返回单个对象的函数
  • flatmap:应用任何可返回迭代器并将其转换为多个记录的函数

这些只是最“流行”的转换。

另外,我们还有其他适用于2个RDD的转换:

  • union:2个RDD的并集
  • 交集:2个RDD的交集
  • 减去:从另一个中删除1个RDD的内容
  • 笛卡尔:笛卡尔乘积:创建元组的RDD。

动作

collect:返回所有RDD元素。 出于这个原因,它使用不多:我们将处理大量数据,并且此函数返回所有数据。 通常,我们试图以某种方式对其进行过滤或使用部分信息。

  • 第一:返回第一行
  • 取:返回任意行
  • count:返回总行数
  • countByValue:计算值在RDD中出现的次数
  • reduce:它将以指定的方式组合RDD中的所有值。
  • 集合:类似于reduce,但允许返回不同类型的元素。

这些是一些常用的动作。

测试示例

好的,足够的理论知识,让我们尝试一些实时测试! 为此,我们将使用Spark-Shell:

>须藤火花壳

使用spark-shell的一大好处是,我们的SparkContext是自动创建的。 如果您阅读上图的文本,您会看到: “ Spark context available as sc”,这是我们的SparkContext变量,它表示与Spark Cluster的连接。 这是我们将数据加载到内存中的位置,这是创建RDD的要求。

请记住:我们所有的数据分析都将通过对RDD的操作进行。

以前,我们使用此命令创建了一个RDD,还记得吗?

> val books = sc.textFile(“ sparkrdd / books.csv”);

这样,我们创建了第一个RDD! 让我们对其进行一些操作:

> books.count();

→res0:长=38154。— — —我们有38,154条记录的RDD输出。

> books.first();

→res1:字符串= 9780781795807,21,“鲁宾的病理学评论”,44.95,35.0

在这里,它为我们提供了RDD的第一条记录。 你有发现奇怪的事吗? 它全部是一个字符串! 为了访问数据的不同字段,将这些字段分开将很方便。

books.map(_split(“,”));

“ _”字符充当每个记录的占位符。 但是,除非您想使用数组的RDD(这是上一条命令所能提供的),否则我们可以设置为按名称引用这些列。 从字符串本身将字段转换为相关的数据类型。

首先,我们将设置一个代表记录的类:

案例类Book(isbn:String,Publisher_id:Int,title:String,价格:Float,cost:Float)

案例类是Scala中的一种特殊类型的类。 案例类用于设置普通对象,这些普通对象仅用于保存具有特定结构的数据。 构造函数中的参数将自动成为公共成员变量。 构造函数可以像函数一样使用。

因此,知道了这一点,我们将编写一个函数,将RDD书中的每个字符串记录转换为此类。

这就是我们将如何在Scala中编写它:

现在,使用已解析的字段创建新的RDD:

> val booksParsed = books.map(解析)

→booksParsed:org.apache.spark.rdd.RDD [Book] = MapPartitionsRDD [2]在地图上的:33

如果我们调用第一条记录:

> booksParsed.first();

res0:书籍=书籍(9780781795807,21,“鲁宾的病理学评论”,44.95,35.0)

现在,我们得到一个Book对象作为记录,而不是字符串。 我们可以使用字段名称访问Book对象的值。

计算书籍的平均价格

要计算平均价格,我们需要知道所有价格的总和,然后将其除以图书数量,对吗? 要获取书籍总数,我们只需要计算RDD的记录即可。 对于所有价格的总和,我们需要其他东西,我们需要另一种方法。

> val totalPrice = booksParsed.map(_。price).reduce((x,y)=> x + y)

不幸的是,当我尝试执行最后一条命令时,我的hadoop服务器崩溃了。 在解决此问题的同时,我将发布该故事并在另一个故事中继续该主题。

希望这对您有用,就像对我一样。 一如既往,请拍手!

未完待续…