失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > hadoop入门到精通

hadoop入门到精通

时间:2024-03-18 07:38:32

相关推荐

hadoop入门到精通

1、判断文件是否存在(判断该目录下是否有文件存在)

hdfs dfs -test -e hdfs路径if [ $? -eq 0 ] ;thenecho 'exist'elseecho 'Error! Directory is not exist Or Zero bytes in size'fi

2、查看hdfs目录下各目录(文件)的大小: hadoop fs -du -h hdfs路径

3、client读取hdfs文件过程

1)调用FileSystem的get()方法(得到其子类),得到DistributedFileSystem对象

Configuration conf = new Configuration();FileSystem fs = FileSystem.get(conf);

2)调用DistributedFileSystem的open方法打开件,DistributedFileSystem使用 RPC方式调用了NameNode,NameNode 返回存有该副本DataNode地址,最终返回一个输入流对象(FSDataInputStream)。

Path file = new Path("demo.txt");FSDataInputStream inStream = fs.open(file);

3)循环调用输入流 FSDataInputStream.read( )方法从而将数据从 DataNode传输到客户端,完成读取。

关闭连接:即调用输入流:FSDataInputStream.close( )

4、client写数据到hdfs

1)调用FileSystem的get()(得到其子类),得到DistributedFileSystem

Configuration conf = new Configuration();FileSystem fs = FileSystem.get(conf);

2)调用DistributedFileSystem.create()方法创建文件,DistributedFileSystem用RPC调用namenode,在文件系统的命名空间中创建一个新的文件,namenode首先确定文件原来不存在,并且客户端有创建文件的权限,然后创建新文件。最终返回输出流DFSOutputStream

3)输出流 DFSOutputDtream 将数据分成一个个的数据包,并写入内部队列。

5、FileSystem是抽象类,其实现类是 DistributedFileSystem

6、基于ZK的hadoop HA实现

非HA弊端

HDFS集群的分布式存储是靠namenode节点(namenode负责响应客户端请求)来实现。在非HA集群中一旦namenode宕机,虽然元数据不会丢失,但整个集群将无法对外提供服务,导致HDFS服务的可靠性不高HA机制

已知导致服务可靠性不高的原因是namenode节点宕机,那么怎么才能避免这个namenode节点宕机呢?一个容易想到的解决方案是部署两台namenode节点,形成主备模式(active/standby模式),这样一旦active节点宕机,standby节点立即切换到active模式。事实上HA机制就是采取的这种方案。要想实现该机制,需要解决以下问题:

1、为什么选择主备模式,而不是主主模式(active/active模式),也即让两个namenode节点都响应客户端的请求?

一个显然的前提是,两台namenode节点需要保存一致的元数据。我们知道namenode节点是用来管理这些元数据的,响应客户端请求时(上传)需要增加元数据信息,如果使用主主模式,那么两个节点都将对元数据进行写操作,怎么同步是个很困难的问题。因此,只能有一台机器响应请求,也即处在active状态的节点(可称为主节点),而另一台namenode在主节点正常工作情况下仅用来同步active节点的元数据信息,这个namenode称为备用节点(处在standby状态),可见,要解决的问题主要是怎么同步active节点的元数据信息。

2、怎么同步两个namenode节点的元数据

响应客户端请求的是active节点,因此只有active节点保存了最新的元数据。元数据分为两部分,一部分是刚写入新的元数据(edits),另一部分是合并后的较旧的(fsimage)。

HA机制解决同步问题的方法是将active节点新写入的edits元数据放在zookeeper集群上(zookeeper集群主要功能是实现少量数据的分布式同步管理),standby节点在active节点正常情况下只需要将zookeeper集群上edits文件同步到自己的fsimage中就可以。hadoop框架为这个集群专门写了个分布式应用qjournal(依赖zookeeper实现),实现qjournal的节点称为journalnode。

3、怎么感知active节点是否宕机,并将standby节点快速切换到active状态?

解决方案是专门在namenode节点上启动一个监控进程,时刻监控namenode的状态。对于处在active状态的namenode,如果发现不正常就向zookeeper集群中写入一些数据。

对于处在standby状态的namenode,监控进程从zookeeper集群中读数据,从而感知到active节点是否正常。如果发现异常,监控进程负责将standby状态切换到active状态。这个监控进程在hadoop中叫做zkfc( ZooKeeperFailoverController,依赖zookeeper实现)。

4、如何在状态切换时避免brain split(脑裂)?

脑裂:active namenode工作不正常后,zkfc在zookeeper中写入一些数据,表明异常,这时standby namenode中的zkfc读到异常信息,并将standby节点置为active。但是,如果之前的active namenode并没有真的死掉,出现了假死,这样,就有两台namenode同时工作了。这种现象称为脑裂。

解决方案:standby namenode感知到主用节点出现异常后并不会立即切换状态,zkfc会首先通过ssh远程杀死active节点的 namenode进程(kill -9 进程号)。但是(这样还不行,惊讶),如果kill指令没有执行成功咋办?如果在一段时间内没有收到执行成功的回执,standby节点会执行一个自定义脚本,尽量保证不会出现脑裂问题!这个机制在hadoop中称为fencing(包括ssh发送kill指令,执行自定义脚本两道保障)

7、hadoop二次排序

MapReduce是按照key来进行排序的,那么如果有个需求就是先按照第一个字段排序,在第一个字段相等的情况下,按照第二个字段排序(即有时候需要对Key排序的同时还需要对Value进行排序),这就是传说中的二次排序。

1.自定义一个Key

2.分区函数类

3.比较函数类

4.分组函数类

8、reduce的个数怎么设置 ==》 这个决定我们的输出文件个数

reduce多 ==》 导致小文件多

reduce小 ==》 导致跑的慢

权衡

9、mr跑得慢的原因

集群资源不足,某些节点内存,cpu等性能不足io操作耗时: 1)数据倾斜 2)map和reduce设置不合理 3)map运行时间过长,导致reduce等待过久 4)小文件过多

5)大量不可分割的超大文件 6)spill次数过多 7)merge次数过多

10、mr优化可以从以下6个方面考虑

数据输入

合并小文件:执行mr任务前将小文件合并。因为大量的小文件会导致启动大量的map任务,而map任务装载很耗时。而且hdfs上每个文件都会在namenode上建一个索引(160byte),大量的索引文件不仅占用namenode的内存,而且影响索引速度。

MapReduce 框架默认的 TextInputFormat 切片机制是对任务按文件规划切片,如果有大量小文件,就会产生大量的MapTask,处理小文件效率非常低。

(1) 最好的办法,在数据处理系统的最前端(预处理/采集),将小文件先合并成大文件,如利用hadoop,archive命令:

hadoop archive -archiveName ljy.har -p /user/oldpath/ -r file1.log file2.log file3.log /user/new

通过hadoop archive命令创建归档文件,-archiveName指定文件名, -p指定原文件路径,-r指定要归档的小文件,最后指定hdfs中归档文件存放路径,(2)补救措施:如果已经是大量小文件在HDFS中了,可以使用另一种InputFormat来做切片(CombineTextInputFormat),切片逻辑和TextFileInputFormat不同,可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个maptask

job.setInputFormatClass(CombineTextInputFormat.class)CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4mCombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

map阶段

1)减少溢写(spill)次数:通过调整io.sort.mb及sort.spill.percent参数,增大触发spill的内存上限值,从而减少spill次数,即减少io

2)减少合并(merge)次数:通过调整io.sort.factor参数,增大merge合并的文件数,从而减少合并次数

3)在map之后,在不影响业务的前提下,先进行combinereduce阶段

1)合理设置map和reduce数

2)设置map和reduce共存:在mapred-site.xml配置文件中有一个参数mapreduce.job.pletedmaps,这个参数可以控制当map任务执行到哪个比例的时候就可以开始为reduce task申请资源。

3)尽量只使用map,规避reduceio传输

1)map输出结果压缩。因为map作业的输出会被写入磁盘并通过网络传输到reducer节点,所以如果使用LZO之类的快速压缩,能得到更好的性能,因为传输的数据量大大减少了。以下代码显示了启用map输出压缩和设置压缩格式的配置属性。

conf.setCompressMapOutput(true);

conf.setMapOutputCompressorClass(GzipCodec.class);

2)reduce输出结果压缩数据倾斜问题参数调优

11、集群增加新节点

在新节点中进行操作系统的配置,修改新节点主机名映射:vi /etc/hosts,在其中添加集群中各个节点的节名与ip的映射关系

关闭防火墙,网络,配置新节点和master的ssh免密登陆和安装JDK等

在所有的节点的/etc/hosts文件中增加新节点

修改master节点的slaves文件,增加该节点

单独启动该节点上的datanode和NodeManager

运行start-balancer.sh进行数据负载均衡操作

删除节点

修改master节点的hdfs-site.xml,增加dfs.hosts.exclude参数

修改master节点的yarn-site.xml,增加yarn.resourcemanager.nodes.exclude-path参数

修改master节点的mapred-site.xml,增加mapreduce.jobtracker.hosts.exclude.filename(选择性)

新建excludes文件,添加需要删除的主机名

执行refreshNodes使配置生效

12、master是节点,namenode是master上的一个进程

13、每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并即merge(最终磁盘中会至少有一个这样的溢写文件存在(如果map的输出结果很少,当map执行完成时,只会产生一个溢写文件),因为最终的文件只能有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做Merge),生成最终的正式输出文件,然后等待reduce task来拉数据。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。

14、secondary namenode不是namenode的备份,而是负责帮namenode处理一些事情:把edits和fsimage文件合并……

15、hadoop 2.7的HA

Hadoop1.0版本利用了SecondaryNamenode做fsimage和edits文件的合并,但是这种机制达不到热备的效果。Hadoop1.0的namenode存在单点故障问题Hadoop 2.0官方提供了两种HDFS HA的解决方案,一种是NFS,另一种是QJM。这里我们使用简单的QJM。在该方案中,主备NameNode之间通过一组JournalNodes进程同步元数据信息,一条数据只要成功写入多数JournalNode即认为写入成功。通常配置奇数个JournalNode,这里还配置了一个Zookeeper集群,用于ZKFC故障转移,当Active NameNode挂掉了,会自动切换Standby NameNode为Active状态。

16、Hadoop1.0是按64MB切,BlockSize=64MB。

Hadoop2.0 BlockSize=128MB,2.0已经没有1.0的jobtracker和tasktracker这些东西,取而代之的是RM,AM,NM,container

17、分片大小范围可以在mapred-site.xml中设置:mapred.min.split.size和mapred.max.split.size,minSplitSize大小默认为1B,maxSplitSize大小默认为Long.MAX_VALUE = 9223372036854775807

而分片大小到底是多少呢?下面是算法

minSize=max{minSplitSize,mapred.min.split.size}

maxSize=mapred.max.split.size

splitSize=max{minSize,min{maxSize,blockSize}} //其实基本就是选maxSize和blockSize里面较小的

而默认情况下minSize<blockSize<maxSize 所以默认情况下输入分片大小就是blockSize

18、HDFS元数据按形式分为内存元数据和元数据文件两种,分别存在内存和磁盘上。

HDFS 磁盘上元数据文件分为两类,用于持久化存储:

fsimage 镜像文件:是元数据的一个持久化的检查点,包含 Hadoop 文件系统中的所有目录和文件元数据信息,但不包含文件块位置的信息。文件块位置信息只存储在内存中,是在 datanode 加入集群的时候,namenode 询问 datanode 得到的,并且间断的更新。

Edits 编辑日志:存放的是 Hadoop 文件系统的所有更改操作(文件创建,删除或修改)的日志,文件系统客户端执行的更改操作首先会被记录到 edits 文件中。

fsimage 和 edits 文件都是经过序列化的,在 NameNode 启动的时候,它会将 fsimage文件中的内容加载到内存中,之后再执行 edits 文件中的各项操作,使得内存中的元数据和实际的同步,存在内存中的元数据支持客户端的读操作,也是最完整的元数据。

当客户端对 HDFS 中的文件进行新增或者修改操作,操作记录首先被记入 edits 日志文件中,当客户端操作成功后,相应的元数据会更新到内存元数据中。因为 fsimage 文件一般都很大(GB 级别的很常见),如果所有的更新操作都往 fsimage 文件中添加,这样会导致系统运行的十分缓慢。

HDFS 这种设计实现着手于:一是内存中数据更新、查询快,极大缩短了操作响应时间;二是内存中元数据丢失风险颇高(断电等),因此辅佐元数据镜像文件(fsimage)+编辑日志文件(edits)的备份机制进行确保元数据的安全。

NameNode 维护整个文件系统元数据。因此,元数据的准确管理,影响着 HDFS 提供文件存储服务的能力。

总结:元数据的读写都在namenode内存中,然后元数据在磁盘上也会有一份,防止内存中数据丢失

19、CDH

简化了hadoop的部署,对于hadoop生态系统监控都非常方便。如果想安装hadoop生态系统,并能够监控起来,Cloudera Manager安装是一个不错的选择目Hadoop发行版非常多,有华为发行版、Intel发行版、Cloudera发行版(CDH)等,所有这些发行版均是基于Apache Hadoop衍生出来的,之所以有这么多的 版本,完全是由ApacheHadoop的开源协议决定的:任何人可以对其进行修改,并作为开源或商业产品发布/销售国内大部分公司也是用CDH免费版,省时省心。 原生hadoop的缺陷: 部署过程繁琐,比如有100台机器,怎么部署?做好一台然后分发?升级过程也很麻烦各组件之间兼容性差安全性低 用CDH能避免上面的所有问题

20、CDH安装

官方共给出了3种CDH安装方式:

方法一:必须要求所有机器都能连网,由于各种国外的网站被墙的厉害,尝试了几次各种超时错误,耽误时间不说,一旦失败,重装非常痛苦。方法二:下载很多包,不方便容易迷路。方法三:对系统侵入性最小,可实现全离线安装,而且重装非常方便。后期的集群统一包升级也非常好。(本篇采用该方法安装)1)安装jdk export JAVA_HOME=/usr/local/java/jdk1.8.0_65/ export

CLASSPATH=.: J A V A H O M E / l i b / d t . j a r : JAVA_HOME/lib/dt.jar: JAVAH​OME/lib/dt.jar:JAVA_HOME/lib/tools.jar export

PATH= J A V A H O M E / b i n : JAVA_HOME/bin: JAVAH​OME/bin:PATH2)修改hostname,比如改主机为master,从机为slaver

vi /etc/hostname3)将ip和hostname绑定(这里的hostname必须和上面的完完全全一致,否则后面安装会出错)

vi /etc/hosts

192.168.202.220 master4)关闭selinux

selinux是一款为了提高系统安全性的软件,配置起来也很复杂:对系统服务,文件权限,网络端口访问有极其严格的限制,例如 :如果对一个文件没有正确安全上下文配置, 甚至你是root用户,你也不能启动某服务 vi /etc/sysconfig/selinux

SELINUX=disabled5)关闭防火墙

集群其实现在没什么安全性考虑的,因为都是内网搭建的,对外还有一个服务器的,那个服务器有防火墙,由它来访问内网集群,如果内网内开启防火墙,内网集群通讯会出现很多问题。

systemctl stop firewalld 关闭防火墙

systemctl disable firewalld 禁止开机启动6)ntp服务器配置(使每个节点的时间相同,因为hadoop集群间需要保持时间一致)

1)yum -y install ntp

master配置

2)vi /etc/ntp.conf 注释掉所有server..*的指向,新添加一条可连接的ntp服务器 server .

3)配置好后,先设置开机启动ntp:chkconfig ntpd on(一定要配置,否则每次都要手动启动ntp) 再启动服务:service ntpd start

4) 5到10分钟后,查看是否启动成功:ntpstat

如果过了很长的时间,一直显示unsynchronised, 那么卸载ntp:yum remove -y ntp yum erase ntp ntpdate 然后重新安装ntp!!!7)修改slaver节点的ntp,将服务器设置为master

vi /etc/ntp.conf server master

修改完成后,先手动与master同步一下ntpdate -u master8)SSH免密登陆配置9)安装mysql(在主节点安装即可)(可选,如果不安装,则使用cloudera内置的postgreSQL数据库)10)安装cloudera manager

(1)每个节点创建/opt目录,然后将cloudera-manager-centos7-cm5.14.2_x86_64.tar.gz解压到该目录下(后面安装的时候会默认使用该目录)

(2)所有节点创建cloudera-scm用户 由于Cloudera Manager和Managed

Services默认使用cloudera-scm,所以需要创建此用户 useradd --system

–home=/opt/cloudera-manager/cm-5.7.2/run/cloudera-scm-server --no-create-home --shell=/bin/false --comment “Cloudera SCM User” cloudera-scm (3)配置所有节点cloudera-manger-agent指向主节点服务器 vi

/opt/cloudera-manager/cm-5.7.2/etc/cloudera-scm-agent/config.ini

将server_host改为CMS所在的主机名即master01 4)在主节点初始化CM5的数据库:

/opt/cm-5.1.3/share/cmf/schema/scm_prepare_database.sh mysql cm

-hlocalhost -uroot -pJayo01… --scm-host localhost scm scm Scm01 其中cm和第一个scm都是数据库,第二个scm是用户,Scm01是密码

(5)将CHD5相关的Parcel包(CDH安装包,通过CM安装CDH的时候就是通过这些包安装的)放到主节点的/opt/cloudera/parcel-repo/目录中,相关的文件如下:

● CDH-5.1.3-1.cdh5.1.3.p0.12-el6.parcel ●

CDH-5.1.3-1.cdh5.1.3.p0.12-el6.parcel.sha1 ● manifest.json

最后将CDH-5.1.3-1.cdh5.1.3.p0.12-el6.parcel.sha1重命名为CDH-5.1.3-1.cdh5.1.3.p0.12-el6.parcel.sha。否则,系统会重新下载CDH-5.1.3-1.cdh5.1.3.p0.12-el6.parcel文件

(6)主节点通过/opt/cm-5.1.3/etc/init.d/cloudera-scm-server start启动服务端。所有节点通过/opt/cm-5.1.3/etc/init.d/cloudera-scm-agent start启动Agent服务。 如果启动失败到/opt/cloudera-manager/cm-5.7.0/log目录下查看日志

(7)通过http://192.168.202.147:7180访问CM

(8)至此基本上CM的安装己经完成,接下来就可以通过clouderamanager的WEB界面进行安装CDH了!

后面的操作就很简单了,跟着页面提示,选择自己要安装的服务,一步一步点击就行了

21、Task的运行情况由对应的一个TaskInProgress对象来跟踪,它允许一个Task尝试运行多次,每次成为一个Task Attempt

Hadoop提供了两个可配置的参数:

mapred.map.max.attempts

mapred.reduce.max.attempts

hadoop提交作业时可通过设置这两个参数来设置Map Task和Reduce Task尝试运行最大次数。默认情况下,这两个值均为4。如果尝试4次之后仍旧为运行成功,则TaskInProgress对象将该任务运行状态标注成FAILED。未成功运行的Task Attempt分为两种,killed task 和failed task,其中killed task 是MapReduce框架主动杀死的Task Attempt,一般产生于一下3中场景。

1.人为杀死Task Attempt:用户使用命令hadoop job -kill-task task_id将一个Task Attempt杀死

2.磁盘空间或者内存不够:任务运行过程中,出现磁盘或者内存磁盘不足的情况,MapReduce框架需要采用一定策略杀次若干个Task已释放资源,其选择杀死Task的策略是优先选择Reduce Task,其实是进度最慢的Map Task。

3.TaskTracker丢失:如果TaskTracker在一定时间内未想JobTracker回报心跳,则JobTracker认为该TaskTracker已经死掉,它上面的任务将被杀掉。

Failed Task 和killed Task除了产生场景不同以外,还有以下两个重要区别:

调度策略:一个Task Attempt在某个节点运行失败之后,调度器变不会将同一个Task的Task Attempt分配给该节点。而Task Attempt 被人为杀死后,仍可能被调度到同一个节点上运行。

注意,上面这些都是针对于hadoop job里面的,包括hql翻译成的MR,会提供一个job_157586083_14029633可以去web ui查看具体的MR task信息

跟spark里面根据application_1584519092517_167990去web ui查看信息是不同的。spark里面没有taskid,也不能根据hadoop job -kill-task task_id去杀死task线程

对于hql中,如果某个task很慢,又没数据倾斜,可能是这个task所在节点性能不行,人为hadoop job -kill-task task_id杀掉task

22、小文件危害:

在HDFS中,任何一个文件、目录和block,在HDFS中都会被表示为一个object存储在Namenode的内存中,每一个object占用150bytes的内存空间。所以过多的小文件会占用namenode内存小文件过多可能会造成map数过多,占用过多计算资源

23、Hadoop在运行一个mapreduce job之前,需要估算这个job的maptask数和reducetask数

map数等于split个数,每个split的大小:splitsize=max(minSize,min(blockSize,maxSize))。如果没有设置minimumsize和maximumsize,splitsize的大小默认等于blocksize。

就是将上面的三个size进行一个排序(由大到小还是由小到大都可以),位于中间的size就是分片的大小了。

reduce个数的设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,由于mapreduce中没有办法直接控制map数量,所以只能曲线救国,通过设置每个map中处理的数据量进行设置;reduce是可以直接设置的。

默认情况下,切片大小跟块大小是一样大,切片大小跟块大小一样的好处:

如果我们定义splitSize是1M,那么一块128M,切成128个split,分发到网络上128个结点同时运行(可以一个结点运行多个切片,但是集群并发情况下,负载均衡,系统会自动分发给其它结点),浪费时间与资源。如果我的splitSize和块大小相同,直接就在本结点上运行了。

split 分片的计算跟 blockSize, minSize, maxSize 三个参数有关

假如 blockSize 设置128 M,文件大小 200M,那么splitSize 就是128M。

200/128=1.56>1.1 创建1个split (通过源码可以看出最右一个文件的大小在 0- 128+12.8M之间)

剩下72M在创建一个,就会产生2 个split。

对于一堆小文件,比如4个50M的hdfs文件,读的时候会合并成一个map处理,一般可能是如果几个小文件的大小总和小于230M左右,就都是合并成一个map处理。但是好像这个也跟文件数有关系

但是一个200M的文件,就会拆分成两个map,其实如果是128.2 M,也会分成两个map,一个map128M,另一个map 0.2M

24、hadoop资源调度器

FIFO: hadoop1.x使用的默认调度器就是FIFO。先入先出调度器,先进来的job,优先执行Capacity Scheduler(容量调度器):hadoop2.x使用的默认调度器是Capacity Scheduler。

25、Hadoop 2和3的区别

最低Java版本从7升级到8引入纠删码(Erasure Coding)

主要解决数据量大到一定程度磁盘空间存储能力不足的问题。HDFS中的默认3副本方案在存储空间中具有200%的额外开销。但是,对于I/O活动相对较少冷数据集,在正常操作期间很少访问其他块副本,但仍然会消耗与第一个副本相同的资源量。

纠删码能勾在不到50%数据冗余的情况下提供和3副本相同的容错能力,因此,使用纠删码作为副本机制的改进是自然而然,也是未来的趋势.2仅支持一个活跃NameNode和一个备用NameNode,2支持多个备用NameNode

26、Hadoop分布式复制(distcp)是Hadoop集群间复制大量数据的高效工具

如果觉得《hadoop入门到精通》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。