Spark中的数据读写

Spark 支持多种文件格式的读写,包括

  • 本地文本文件:Json、SequenceFile 等文件格式
  • 文件系统:HDFS、Amazon S3
  • 数据库:MySQL、HBase、Hive

本地文件读写

文本文件

使用以下语句从文件系统中读写文件

1
2
3
4
5
6
7
val text = sc.textFile("file:///home/jerrysheh/word.txt")

// .first() 是一个"action"
text.first()

// 从RDD写回文件系统,saveAsTextFile是一个action
text.saveAsTextFile("file:///home/jerrysheh/wordWriteBack")

spark的惰性机制使得在“转换”操作时,即使遇到错误也不会立即报错,直到”行动(action)“操作时才开始真正的计算,这时候如果有错误才会报错。

wordWriteBack 是一个文件夹,写回后存放在该文件夹里,里面有part-00000 和 _SUCCESS 两个文件。part-00000 里面的内容就是写会的内容。

当我们想把输出的结果再次加载到RDD中,只要在textFile()中定位到 wordWriteBack 这个目录即可。

1
val text = sc.textFile("file:///home/jerrysheh/wordWriteBack")

json文件

1
2
3
4
5
// jsonStr的类型是:org.apache.spark.rdd.RDD[String]
val jsonStr = sc.textFile("file:///home/jerrysheh/people.json")

// 使用 foreach 遍历
jsonStr.foreach(println)

输出:

1
2
3
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

可以用 scala 自带的 JSON 库 —— scala.util.parsing.json.JSON 进行解析。


从HDFS读写

跟本地文件类似,只不过把 file:// 换成 hdfs://

1
val textFile = sc.textFile("hdfs://localhost:9000/user/jerrysheh/word.txt")

Scala中的类与对象

定义类

1
2
3
4
5
6
7
class Counter {
private var value = 0

//大括号和返回类型可以省略
def increment(): Unit = { value += 1}
def current(): Int = {value}
}
  • 字段没有 private 修饰的默认为 public
  • def来声明方法
  • Unitincrement()方法的返回类型
  • 方法的最后一条表达式就是返回值,不需要写return
阅读更多

Scala中的函数式编程

Scala是一门多范式编程语言,混合了 面向对象编程函数式编程 的风格。

在大数据处理中为什么要函数式编程?

函数式编程的一个重要特性就是值不可变性,这对于编写可扩展的并发程序而言可以带来巨大好处。因为它避免了对公共的可变状态进行同步访问控制的复杂问题。简单地说,我们完全不需要进行传统并发编程里面的同步、加锁等操作。

阅读更多

Spark SQL 和 DataFrame

RDD 和 DataFrame 的区别

RDD 是弹性分布式数据集,其本质是 Dataset。Dataset 可以从 JVM 对象中构建 (例如 rating 对象,即 javabean ),然后通过 map、flatMap、filter 等方法转换来操作。

为了更好地读写数据以及使用类似SQL语句一样简单地操作,Spark SQL 提供了 DataFrame (其前身是SchemaRDD)。

sparksql

DataFrame 能够让你知道数据集中的每一行和列。这个概念跟关系型数据库中的表(table)类似,但是比表更强大。如下图:

dataframes

DataFrame 可以从结构化的数据文件(structured data files)、Hive中的表、外部数据库或者已存在的RDD中构建。

在 Java 中,使用 Dataset<Row> 来表示 DataFrame。

阅读更多

Scala 入门

Scala是一门现代的多范式编程语言,平滑地集成了 面向对象函数式语言 的特性,旨在以简练、优雅的方式来表达常用编程模式。

Scala运行于JVM上,并兼容现有的Java程序,Scala代码可以调用Java方法,访问Java字段,继承Java类和实现Java接口。

在面向对象方面,Scala是一门非常纯粹的面向对象编程语言,也就是说,在Scala中,每个值都是对象,每个操作都是方法调用。 在函数式方面,函数是一等公民,可以像操作其他数据类型那样操作函数。

vczh

阅读更多

使用 Spark Streaming 进行实时流计算(二)

Spark Streaming 除了可以从三个基本数据源(文件、TCP套接字、RDD)读取数据,还能从高级的数据源中读取。这里的高级数据源,指的是 Kafka 或 Flume。

Kafka 简介

Kafka 是 LinkedIn 开发的一种高吞吐量的分布式发布订阅消息系统。 相比更擅长批量离线处理的 Flume 和 Scribe, Kafka 的优点是可以同时满足在线实时处理和批量离线处理。

阅读更多

Spark编程入门(二)RDD编程

RDD 简介

弹性分布式数据集(Resilient Distributed Dataset,RDD),是分布式内存的一个抽象概念,它提供了一种高度受限的内存模型。本质上,RDD是一个只读的分区记录集合。一个RDD可以分成多个分区,每个分区就是一个数据集片段。不同的分区可以保存在集群的不同节点上,从而可以进行并行计算。

RDD是只读的,不能直接修改。但是RDD提供了 转换行动 两种操作。转换操作返回一个新的RDD,行动操作用于执行计算并指定输出,返回非RDD。

典型的RDD执行过程如下:

  • RDD读入外部数据源(或内存中的集合),进行创建。
  • RDD经过一系列的转换操作,每一次都产生不同的RDD,给下一次转换使用。
  • 最后一个RDD经行动操作进行处理,输出到外部数据源。

需要注意的是,RDD采用了惰性调用,即转换操作并不会真的进行转换,只是记录。直到行动操作的时候,才从头开始进行一系列计算。


RDD的依赖关系

RDD的依赖关系分为 窄依赖(Narrow Dependency)宽依赖(Wide Dependency)。窄依赖即一个或多个父RDD的分区对应一个子RDD的分区,典型的例子有map、filter、union、join,而宽依赖则是一个父RDD的分区对应多个字RDD的分区,典型的例子有groupByKey、join。

spark_dep

为什么要这样设计依赖关系?

为了提高容错性,以加快 Spark 的执行速度。因为 RDD 通过“血缘关系”记住了它是如何从其它 RDD 演变过来的,血缘关系记录的是粗颗粒度的转换操作行为,当这个RDD的部分分区数据丢失时,它可以通过血缘关系获取足够的信息来重新运算和恢复丢失的数据分区,由此带来了性能的提升。

阅读更多

Spark编程入门(一) QuickStart

Spark 基本概念

在实际应用中,大数据处理主要包括:

  • 复杂的批量数据处理(数十分钟 - 几小时)
  • 基于历史数据的交互式查询(数十秒 - 几分钟)
  • 基于实时流的数据处理 (数百毫秒 - 几秒)

Spark 的设计遵循“一个软件栈满足不同的应用场景”,有一套完整的生态系统。包括内存计算框架、SQL即时查询、实时流式计算、机器学习和图计算等。Spark可以部署在 YARN 资源管理器上,提供一站式的大数据解决方案。

阅读更多

Hadoop 配置遇到的一些坑

温馨提示:点击页面右下角按钮,打开目录。

一、 Connection refused

根据官方文档 Hadoop 3.0 配置,在

1
sbin/start-dfs.sh

的时候报错,

1
pdsh@ubuntu: localhost: connect: Connection refused

原因是pdsh默认采用的是rsh登录,修改成ssh登录即可,在环境变量/etc/profile里加入:

1
export PDSH_RCMD_TYPE=ssh

然后source /etc/profile 之后重新sbin/start-dfs.sh


二、jps 没有 namenode 或者 datanode

每次开机都得重新格式化一下namenode才可以看到namenode

格式化了namenode,datanode又没有了

其实问题就出在tmp文件,默认的tmp文件每次重新开机会被清空,与此同时namenode的格式化信息就会丢失

于是我们得重新配置一个tmp文件目录

首先在home目录下建立一个hadoop_tmp目录

1
sudo mkdir ~/hadoop_tmp

然后修改hadoop-3.0.0/etc/hadoop目录里面的core-site.xml文件,加入以下节点:

1
2
3
4
5
<property>
<name>hadoop.tmp.dir</name>
<value>/home/jerrysheh/hadoop_tmp</value>
<description>A base for other temporary directories.</description>
</property>

注意:我的用户是jerrysheh,所以目录是/home/jerrysheh/hadoop_tmp

OK了,重新格式化Namenode

1
hadoop namenode -format

然后启动

1
hadoopstart-all.sh

然后输入 jps, namenode 和 datanode 应该都出来了


三、关于 HDFS 文件夹位置

根据 官方文档

第一次运行,启动 start-dfs.sh 之后,要先创建用户

1
2
bin/hdfs dfs -mkdir /user
bin/hdfs dfs -mkdir /user/<username>

然后创建一个 input 文件夹,用来放数据

1
bin/hdfs dfs -mkdir input

dfs

发现了吗? -mkdir /user 和 -mkdir /user/<username> 是在根目录下创建 user 文件夹,然后在 user 文件夹里创建 username 文件夹, 这没有问题。

但是创建 input 文件夹的时候, 前面没有 /

意味着,是在默认的 username 文件夹里面创建了这个 input !

也就是, input 的实际位置在 /user/<username>/input

打开 web UI ( 127.0.0.1:8088 )看一下

dfs

果然如此

另,

1
2
hdfs dfs -ls .   /*表示当前用户目录*/
hdfs dfs -ls / /*表示根目录*/
  • 使用 hdfs dfs -ls . 的时候, HDFS 上的 username 必须和你本地linux系统的 username 一致!否则会显示没有该目录或文件。

四、Windows 环境下 JAVA_HOME 路径不对

配置好环境以后,执行格式化

1
hdfs namenode -format

然后报错

1
2
Error: JAVA_HOME is incorrectly set.
Please update F:\hadoop\conf\hadoop-env.cmd

原因是蛋疼的微软, Program Files文件夹有一个空格,导致不能被 Hadoop 识别。

解决办法:

  • 方法1:用路径替代符

    1
    C:\PROGRA~1\Java\jdk1.8.0_91

    PROGRA~1 ===== C:\Program Files 目录的dos文件名模式下的缩写
    长于8个字符的文件名和文件夹名,都被简化成前面6个有效字符,后面1,有重名的就 ~2,3,

  • 方法2:用引号括起来

    1
    "C:\Program Files"\Java\jdk1.8.0_91

五、IDEA 报 java.lang.ClassNotFoundException 问题

在 IDEA 本地环境运行 hadoop 程序时, IDEA报错

1
java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration

原因:

在 maven 中, 把依赖项的 provided 标签删掉即可 。因为加上provided标签意味着 the scope tag tells Maven that you’re using this dependency for building, but it indicates that the dependency will be provided during runtime, so you’ll either need to remove this tag or … (具体可到 StackOverFlow 看)

感谢 StackOverFlow 解决困扰了我一天一夜的问题

1
2
3
4
5
6
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0</version>
<!--<scope>provided</scope>-->
</dependency>

六、IDEA Hadoop程序 单机运行 和 在本地伪分布式 运行

单机运行

新建 Java Maven 工程,pom.xml添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>

依赖去哪里找? 有一个 mavenrepository 仓库,地址是:mavenrepository

然后在上面搜索 hadoop ,一些常见的依赖在上面,点击,选择版本,然后复制它的maven代码, 在pom.xml粘贴。 当然这里有坑! 看上面第五!!

然后右下角 import, maven 会自动帮我们下载依赖包

点 Run -> Edit configuration, Program argument填入

1
input/ output/

Main class 填你的main程序所在的 class ,可以输入前几个字母然后 IDEA 会自动帮我们检索

然后在项目目录下,新建一个文件夹 input , 往里面放你输入的数据(比如 mydata.log),可以放多个文件

然后运行即可

本地伪分布式运行

本地Hadoop环境先配起来,具体看这篇

http://blog.csdn.net/songhaifengshuaige/article/details/79575308

环境变量

HADOOP_HOME, 设置为 C:\hadoop-3.0.0 (根据你的目录)
Path, 添加%HADOOP_HOME%\bin%HADOOP_HOME%\sbin

然后先启动 start-dfs.cmdstart-yarn.cmd

输入jps命令,看看 datanode 和 namenode 启动没,确保集群环境启动了

然后把 C:\hadoop-3.0.0\etc\hadoop\ 里面的配置文件 (几个dfs、core、mapred、yarn相关的 xml 文件),放到 项目 src/main/resource 里面

然后运行。DONE!


七、Spark standalone 模式集群配置

记得关防火墙

1
2
service iptables status
service iptables stop

有一个WARN,提示你本地ip是127.0.0.1,应该该到 172.x.x.x 或 192.x.x.x ,否则局域网机器访问不到。

1
2
cp ./conf/spark-env.sh.template ./conf/spark-env.sh
vim ./conf/spark-env.sh

添加

1
2
3
SPARK_LOCAL_IP=172.x.x.x
SPARK_MASTER_HOST=172.x.x.x
SPARK_EXECUTOR__MEMORY=16G

Hadoop 笔记(一)初识

前言

当数据量变大的时候,一台机器完成一个问题要计算好久好久。这时候就需要多台机器并行运算。然而,每台机器不能用单台机器运行的算法,自己算自己的。而是要有不同的分工,联合起来共同算完这个问题。

Hadoop就是这样的一个大数据处理框架。其中包括很多开源的处理框架,比如:

  • 文件存储:Hadoop HDFS、Tachyon、KFS
  • 离线计算:Hadoop MapReduce、Spark
  • 流式、实时计算:Storm、Spark Streaming、S4、Heron
  • K-V、NOSQL数据库:HBase、Redis、MongoDB
  • 资源管理:YARN、Mesos
  • 日志收集:Flume、Scribe、Logstash、Kibana
  • 消息系统:Kafka、StormMQ、ZeroMQ、RabbitMQ
  • 查询分析:Hive、Impala、Pig、Presto、Phoenix、SparkSQL、Drill、Flink、Kylin、Druid
  • 分布式协调服务:Zookeeper
  • 集群管理与监控:Ambari、Ganglia、Nagios、Cloudera Manager
  • 数据挖掘、机器学习:Mahout、Spark MLLib
  • 数据同步:Sqoop
  • 任务调度:Oozie

那这么多,要怎么学呢?吴军博士在《数学之美》中提到:

分治算法是计算机科学中最漂亮的工具之一,我称为“各个击破”法。

我们就来各个击破。当然,先挑重点的学习。


MapReduce

假设我们要统计一本10000页的书里面,”apple”、”banana”、”orange”这三个单词出现的次数。由于规模很大,用一台机器来算,要算很久。我们能不能把规模缩小,交给多台机器去算呢?我们容易想到,可以拿4台服务器,假设为1,2,3,4,每台服务器计算2500页,各自算各自的。

好了,现在每台服务器把各自负责的2500页统计完了。但我们关心的是 10000 页这个总量里面单词出现的次数,而不是4个独立的2500页。这 4 个 2500 页的结果分别保存在1,2,3,4四台服务器上。我们现在要想办法合并结果。

于是我们找来另外三台服务器,假设为A,B,C:

  • 让 A 计算在机器1,2,3,4上面 “apple” 单词出现的总次数。
  • 让 B 计算在机器1,2,3,4上面 “banana” 单词出现的总次数。
  • 让 C 计算在机器1,2,3,4上面 “orange” 单词出现的总次数。

这样,我们就知道每个单词出现的总次数了。

阅读更多
Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×