Spark中的数据读写

Spark 支持多种文件格式的读写,包括

  • 本地文本文件:Json、SequenceFile 等文件格式
  • 文件系统:HDFS、Amazon S3
  • 数据库:MySQL、HBase、Hive

本地文件读写

文本文件

使用以下语句从文件系统中读写文件

1
2
3
4
5
6
7
val text = sc.textFile("file:///home/jerrysheh/word.txt")

// .first() 是一个"action"
text.first()

// 从RDD写回文件系统,saveAsTextFile是一个action
text.saveAsTextFile("file:///home/jerrysheh/wordWriteBack")

spark的惰性机制使得在“转换”操作时,即使遇到错误也不会立即报错,直到”行动(action)“操作时才开始真正的计算,这时候如果有错误才会报错。

wordWriteBack 是一个文件夹,写回后存放在该文件夹里,里面有part-00000 和 _SUCCESS 两个文件。part-00000 里面的内容就是写会的内容。

当我们想把输出的结果再次加载到RDD中,只要在textFile()中定位到 wordWriteBack 这个目录即可。

1
val text = sc.textFile("file:///home/jerrysheh/wordWriteBack")

json文件

1
2
3
4
5
// jsonStr的类型是:org.apache.spark.rdd.RDD[String]
val jsonStr = sc.textFile("file:///home/jerrysheh/people.json")

// 使用 foreach 遍历
jsonStr.foreach(println)

输出:

1
2
3
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

可以用 scala 自带的 JSON 库 —— scala.util.parsing.json.JSON 进行解析。


从HDFS读写

跟本地文件类似,只不过把 file:// 换成 hdfs://

1
val textFile = sc.textFile("hdfs://localhost:9000/user/jerrysheh/word.txt")

Spark SQL 和 DataFrame

RDD 和 DataFrame 的区别

RDD 是弹性分布式数据集,其本质是 Dataset。Dataset 可以从 JVM 对象中构建 (例如 rating 对象,即 javabean ),然后通过 map、flatMap、filter 等方法转换来操作。

为了更好地读写数据以及使用类似SQL语句一样简单地操作,Spark SQL 提供了 DataFrame (其前身是SchemaRDD)。

sparksql

DataFrame 能够让你知道数据集中的每一行和列。这个概念跟关系型数据库中的表(table)类似,但是比表更强大。如下图:

dataframes

DataFrame 可以从结构化的数据文件(structured data files)、Hive中的表、外部数据库或者已存在的RDD中构建。

在 Java 中,使用 Dataset<Row> 来表示 DataFrame。

阅读更多

使用 Spark Streaming 进行实时流计算(二)

Spark Streaming 除了可以从三个基本数据源(文件、TCP套接字、RDD)读取数据,还能从高级的数据源中读取。这里的高级数据源,指的是 Kafka 或 Flume。

Kafka 简介

Kafka 是 LinkedIn 开发的一种高吞吐量的分布式发布订阅消息系统。 相比更擅长批量离线处理的 Flume 和 Scribe, Kafka 的优点是可以同时满足在线实时处理和批量离线处理。

阅读更多

Spark编程入门(二)RDD编程

RDD 简介

弹性分布式数据集(Resilient Distributed Dataset,RDD),是分布式内存的一个抽象概念,它提供了一种高度受限的内存模型。本质上,RDD是一个只读的分区记录集合。一个RDD可以分成多个分区,每个分区就是一个数据集片段。不同的分区可以保存在集群的不同节点上,从而可以进行并行计算。

RDD是只读的,不能直接修改。但是RDD提供了 转换行动 两种操作。转换操作返回一个新的RDD,行动操作用于执行计算并指定输出,返回非RDD。

典型的RDD执行过程如下:

  • RDD读入外部数据源(或内存中的集合),进行创建。
  • RDD经过一系列的转换操作,每一次都产生不同的RDD,给下一次转换使用。
  • 最后一个RDD经行动操作进行处理,输出到外部数据源。

需要注意的是,RDD采用了惰性调用,即转换操作并不会真的进行转换,只是记录。直到行动操作的时候,才从头开始进行一系列计算。


RDD的依赖关系

RDD的依赖关系分为 窄依赖(Narrow Dependency)宽依赖(Wide Dependency)。窄依赖即一个或多个父RDD的分区对应一个子RDD的分区,典型的例子有map、filter、union、join,而宽依赖则是一个父RDD的分区对应多个字RDD的分区,典型的例子有groupByKey、join。

spark_dep

为什么要这样设计依赖关系?

为了提高容错性,以加快 Spark 的执行速度。因为 RDD 通过“血缘关系”记住了它是如何从其它 RDD 演变过来的,血缘关系记录的是粗颗粒度的转换操作行为,当这个RDD的部分分区数据丢失时,它可以通过血缘关系获取足够的信息来重新运算和恢复丢失的数据分区,由此带来了性能的提升。

阅读更多

Spark编程入门(一) QuickStart

Spark 基本概念

在实际应用中,大数据处理主要包括:

  • 复杂的批量数据处理(数十分钟 - 几小时)
  • 基于历史数据的交互式查询(数十秒 - 几分钟)
  • 基于实时流的数据处理 (数百毫秒 - 几秒)

Spark 的设计遵循“一个软件栈满足不同的应用场景”,有一套完整的生态系统。包括内存计算框架、SQL即时查询、实时流式计算、机器学习和图计算等。Spark可以部署在 YARN 资源管理器上,提供一站式的大数据解决方案。

阅读更多
Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×