Hadoop笔记


Hadoop分布式框架

image-20220925210146307

一、Hadoop概述(了解)

1.1大数据的历史、现状与未来

1.1.1大数据的历史

大数据不是凭空产生的,它有自己的发展过程,大致分为三个阶段

1)第一个阶段:萌芽时期(20 世纪 90 年代至 21 世纪初)。

  • 1997 年,美国国家航空航天局武器研究中心的大卫•埃尔斯沃思和迈克尔•考克斯在研究数据可视化时首次使用了大数据的这一名词概念。

  • 1998 年,《Science》杂志发表了一篇题为《大数据科学的可视化》的文章,大数据作为一个专业名词正式出现在公共期刊上。

  • 2003 年 10 月,Google 发表 Google File System 论文,这篇论文解决了海量网页的存储问题。

  • 2004 年 10 月,Google 发表了 MapReduce 论文,这篇论文解决了海量网页的索引计算问题。

  • 道格·卡廷(Doug Cutting) 看到这两篇论文后,欣喜若望,很快地就仿照谷歌论文的思想,实现了 HDFS 和 MapReduce 的前身,并且从 Nutch 中剥离出来,成立了独立的开源项目 Hadoop。

    image-20220925210032878

    ​ Hadoop之父 道格·卡廷(Doug Cutting)

    总的来说,大数据萌芽时期以谷歌论文的发表和 Hadoop 项目的成立为标志,前者提供了理论支撑,后者提供了开源的解决方案。

2)第二个阶段:发展时期(21 世纪初至 2010 年)。

  • 2006 年后,Hadoop 得到快速发展,成为数据分析的主要技术。同年谷歌发表了 Big Table 论文,这最终激发了 HBase 的创建。

  • 2007 年,百度开始使用 Hadoop 做离线处理。

  • 2008 年,Hadoop 成为 Apache 的顶级项目,Yahoo 在 900 个节点上运行 1TB 数据排序仅需要 209 秒,成为世界最快。自此,Hadoop 作为软件开发领域的一颗明星冉冉升起。

  • 2010 年,美国信息技术顾问委员会发布了一份题为《规划数字化未来》的报告,详细描述了政府工作中大数据的收集和使用。

  • 随后,众多 Hadoop 周边产品开始出现,大数据生态体系逐渐形成,其中包括:专门将关系数据库中的数据导入导出到 Hadoop 平台的 Sqoop;针对大规模日志进行分布式收集、聚合和传输的 Flume;MapReduce 工作流调度引擎 Oozie 等。

    总的来说,这个阶段以 Hadoop 为代表的大数据技术在这段时间得到快速发展,形成了较为完整的技术生态。

3)第三个阶段:兴盛时期(2011 年至今)。

  • 2011 年,MGI 发布了《大数据前沿报告》,详细介绍了大数据在各个领域的应用,以及大数据的技术框架。
  • 2012 年,在瑞士举行的世界经济论坛讨论了一系列与大数据有关的问题,发表了题为《大数据,大影响》的报告,并正式宣布了大数据时代的到来。
  • 2012 年 3 月,奥巴马签署并发布“大数据研究发展创新计划”。
  • 2012 年 7 月,联合国发布白皮书《大数据促发展:挑战与机遇》,全球大数据研究进入前所未有的高潮期。
  • 2013 年 5 月,麦肯锡研究院发布研究报告《颠覆性技术:技术改变生活、商业和全球经济》并未列入大数据技术,其给出的解释是,大数据技术已成为其它技术的基石。
  • 2015 年 9 月,中国国务院颁发了《促进大数据发展行动纲要》。
  • 2016 年 3 月 17 日,国家“十三五”规划纲要发布,明确指出大数据发展相关事宜。

总的来说,2011 年之后,大数据的发展可以说进入了全面兴盛的时期,大数据渗透到各行各业之中,不断变革原有行业的技术和创造出新的技术,大数据的发展呈现出一片蓬勃之势。

image-20220925214212590

1.1.2大数据的现状

按照数据开发应用深入程度的不同,可将众多的大数据应用分为三个层次。

1)第一层,描述性分析应用。

是指从大数据中总结、抽取相关的信息和知识,帮助人们分析发生了什么,并呈现事物的发展历程。如美国的 DOMO 公司从其企业客户的各个信息系统中抽取、整合数据,再以统计图表等可视化形式,将数据蕴含的信息推送给不同岗位的业务人员和管理者,帮助其更好地了解企业现状,进而做出判断和决策。

2)第二层,预测性分析应用。

是指从大数据中分析事物之间的关联关系、发展模式等,并据此对事物发展的趋势进行预测。如微软公司纽约研究院研究员 David Rothschild 通过收集和分析赌博市场、好莱坞证券交易所、社交媒体用户发布的帖子等大量公开数据,建立预测模型,对多届奥斯卡奖项的归属进行预测。2014 和 2015 年,准确预测了奥斯卡共 24 个奖项中的 21 个,准确率达 87.5%。

3)第三层,指导性分析应用。

是指在前两个层次的基础上,分析不同决策将导致的后果,并对决策进行指导和优化。如无人驾驶汽车分析高精度地图数据和海量的激光雷达、摄像头等传感器的实时感知数据,对车辆不同驾驶行为的后果进行预判,并据此指导车辆的自动驾驶。

下面介绍几个大数据在相应行业中的应用。

电商行业

在使用电商APP时收集客户的信息(包括偏好、搜索历史、愿望清单和购物车等),从而可以预测客户更有可能购买什么商品。其算法考虑了注册客户的送货地址,可以选择最近的仓库送货,减少交货时间和相关成本。

image-20220925214459361

  • 更好地了解客户
  • 紧跟市场潮流
  • 设定最优价格
  • 仓储优化

农业

image-20220925215725931

金融行业

旅游行业

医疗行业

交通行业

......

1.1.3大数据的未来

大数据的发展非常迅速,未来的趋势含有以下几个特点:

  • 结合智能计算。

    包括大数据与神经计算、深度学习、语义计算以及人工智能其它相关技术结合。得益于以云计算、大数据为代表的计算技术的快速发展,使得信息处理速度和质量大为提高,能快速、并行处理海量数据。

  • 跨学科、跨领域。

    由于现有的大数据平台易用性差,而垂直应用行业的数据又涉及领域专家知识和领域建模,目前在大数据行业应用与通用的大数据技术之间存在很大的鸿沟,缺少相互的交叉融合。因此,迫切需要进行跨学科和跨领域的大数据技术和应用研究,促进和推动大数据在典型和重大行业中的应用和落地,尤其是与物联网、移动互联、云计算、社会计算等热点技术领域相互交叉融合。

  • 涉及安全和隐私。

    大数据时代,各网站均不同程度地开放其用户所产生的实时数据,一些监测数据的市场分析机构可通过人们在社交网站中写入的信息、智能手机显示的位置信息等多种数据组合进行分析挖掘。然而,大数据时代的数据不能保证个人信息不被其他组织非法使用,用户隐私安全问题的解决迫在眉睫。安全智能更加强调将过去分散的安全信息进行集成与关联,独立的分析方法和工具进行整合形成交互,最终实现智能化的安全分析与决策。

  • 结合可视化技术。

    在进行分析之前,需要对数据进行探索式地考查。在此过程中,可视化将发挥很大的作用。对大数据进行分析以后,为了方便用户理解结果,也需要把结果展示出来。尤其是可视化移动数据分析工具,能追踪用户行为,让应用开发者得以从用户角度评估自己的产品,通过观察用户与一款应用的互动方式,开发者将能理解用户为何执行某些特定行为,从而为自己完善和改进应用提供依据。

1.2大数据的概念、特征与价值

1.2.1大数据的概念

关于大数据这里参考马丁•希尔伯特的总结,今天我们常说的大数据其实是在 2000 年后,因为信息交换、信息存储、信息处理三个方面能力的大幅增长而产生的数据,如下图所示

image-20220925221800799mapreduce

hdfs

1.2.2大数据的特征(5V)

  • 数据量大(Volume)

    大数据中数据的采集、存储和计算的量都非常大。那么大数据究竟有多大呢?正常的计算机处理 4G 数据需要 4 分钟的时间,处理 1TB 需要 3 个小时的时间,而达到 1PB 的数据需要 4 个月零 3 天的时间

    计算机存储单位一般用 bit, Byte, KB, MB, GB, TB, PB, EB, ZB, BB来表示,
    我们经常将Byte简称为B,将KB简称问K。什么几K了,什么几B了。

    换算关系

    ​ 1B(Byte 字节)=8bit,iP 192.168.32.1 11111111 11111111 11111111 00000000    1KB (Kilobyte 千字节)=1024B,    1MB (Megabyte 兆字节 简称“兆”)=1024KB,    1GB (Gigabyte 吉字节 又称“千兆”)=1024MB,    1TB (Trillionbyte 万亿字节 太字节)=1024GB,其中1024=2^10 ( 2 的10次方),    1PB(Petabyte 千万亿字节 拍字节)=1024TB,    1EB(Exabyte 百亿亿字节 艾字节)=1024PB,    1ZB (Zettabyte 十万亿亿字节 泽字节)= 1024 EB,    1YB (Yottabyte 一亿亿亿字节 尧字节)= 1024 ZB,    1BB (Brontobyte 一千亿亿亿字节)= 1024 YB.

    image-20220925222525156

  • 增长、处理速度快(Velocity)

  • 种类和来源多(Variety)结构化数据(数据库中的数据)半结构化日志网络爬虫,非结构化数据

  • 价值密度低(Value)

  • 数据的准确性(Veracity)

1.2.3大数据的价值

大数据的核心价值,从业务角度出发,主要有如下的三点

  • 数据辅助决策。

    为企业提供基础的数据统计报表分析服务。分析师能够轻易获取数据,产出分析报告用于指导产品开发和运营,产品经理能够通过统计数据完善产品功能和改善用户体验,运营人员可以通过数据发现运营问题并确定运营的策略和方向,管理层可以通过数据掌握公司业务运营状况,从而进行一些战略决策。

  • 数据驱动业务。

    通过数据产品、数据挖掘模型实现企业产品和运营的智能化,从而极大地提高企业的整体效能产出。最常见的应用领域有基于个性化推荐技术的精准营销服务、广告服务、基于模型算法的风控反欺诈服务征信服务等。

  • 数据对外变现。

    通过对数据进行精心的包装,对外提供数据服务,从而获得现金收入。市面上比较常见有各大数据公司利用自己掌握的大数据,提供风控查询、验证、反欺诈服务,提供导客、导流、精准营销服务,提供数据开放平台服务等。

但在实践中,更多的人喜欢把数据的价值分为两个方面,一个方面是给企业创造营收,另一个方面就是给企业节省成本。

1.3大数据技术

image-20220925225644877

大数据技术是一系列技术的总称,它集合了数据采集与传输、数据存储、数据处理与分析、数据挖掘、数据可视化等技术,是一个庞大而复杂的技术体系。

1)数据源层(了解)

数据源层主要指大数据各个行业中多种多样的原始数据,如:互联网用户数据、web 服务器系统日志数据、企业数据库数据等

2)数据收集层(了解) etl工程师

大数据收集层主要采用了大数据采集技术,实现对数据的 ETL 操作,ETL,是英文 Extract-Transform-Load 的缩写,数据从数据来源端经过抽取(extract)、转换(transform)、加载(load)到目的端。用户从数据源抽取出所需的数据,经过数据清洗,最终按照预先定义好的数据模型,将数据加载到数据仓库中去,最后对数据仓库中的数据进行数据分析和处理。数据采集位于数据分析生命周期的重要一环,它通过传感器数据、社交网络数据、移动互联网数据等方式获得各种类型的结构化、半结构化及非结构化的海量数据。

3)数据存储层(了解)

当大量的数据收集完后,我们需要对大数据进行存储。数据的存储分为持久化存储和非持久化存储。持久化存储表示把数据存储在磁盘中,关机或断电后,数据依然不会丢失。非持久化存储表示把数据存储在内存中,读写速度快,但是关机或断电后,数据丢失。

4)数据处理层(了解)

当我们把数据收集好了、数据存储以及读写也都没有问题,我们手握着这一堆数据干嘛?除了保存原始数据,做好数据备份之外,我们还需要考虑到利用他们产生更大的价值。那么首先我们需要对这些数据进行处理。大数据处理分为两类,批量处理(离线处理)和实时处理(在线处理)。

5)数据建模层(了解)

数据收集、数据存储和数据处理是大数据架构的基础设置。一般情况下,完成以上三个层次的数据工作,已经将数据转化为基础数据,为上层的业务应用提供支撑。可以使用 R、Python 来进行数据分析,也可以使用 Mahout、Spark ML 根据算法模型、业务模型进行融合建模,挖掘有价值的信息,从而更好地为业务应用提供优质结果。

6)数据应用层(了解)

数据应用层是大数据技术应用的目标。通常提供查询、报表、数据可视化等功能。Lucene、Solr、Elasticsearch 和 Echart 、CK这样的开源项目为信息查询和数据可视化实现提供了可能。

1.4大数据的离线计算与实时计算

image-20220925231039376

1.4.1大数据离线计算

大数据离线计算,就是利用大数据的技术栈(主要是 Hadoop),在计算开始前准备好所有输入数据,该输入数据不会产生变化,且在解决一个问题后就要立即得到计算结果的计算模式。离线(offline)计算也可以理解为批处理(batch)计算,与其相对应的是在线(online)计算或实时(realtime)计算。

大数据离线计算的特点:

  • 数据量巨大,保存时间长。
  • 在大量数据上进行复杂的批量运算。
  • 数据在计算之前已经完全到位,不会发生变化。
  • 能够方便地查询计算结果。

1.4.2大数据实时计算

大数据实时计算指的是数据实时产生,产生后就立刻处理,这种计算方式倾向于把数据看作是 Streaming 即流来处理

大数据实时计算的特点:

  • 数据连续输入,不停计算。
  • 低延迟,响应时间要求为秒级。
  • 计算结果一般存储在内存数据库(Redis)中。
  • 查询结果一般是截止某个时刻的即时汇总数据。

大数据实时计算与离线计算的区别:

离线计算实时计算
存储HDFS消息队列,如Kafka
使用框架MapReduce|HiveSpark(DStream)|Storm |Flink
处理速度速度慢,延迟高速度快,延迟低
计算特点批量计算实时计算:7×24

二、Hadoop的组成

Hadoop是一个由Apache基金会所开发的分布式系统。用户可以在不了解分布式底层细节的情况下,开发分布式程序。

广义上Hadoop有一个更广泛的概念----->Hadoop生态圈-》分布式: 多台服务器做同一个事情

image-20220925232939679

​ Hadoop生态圈

狭义上Hadoop由分布式文件系统(HDFS)、分布式计算框架(MapReduce)、资源调度(YARN)组成,主要用于解决海量数据的存储与计算

2.1HDFS概述

HDFS全称为:Hadoop Distributed File System,源自于 Google 发表于 2003 年 10 月的 GFS 论文,HDFS 是 GFS 的克隆版,它是 Hadoop 体系中数据存储管理的基础,是面向普通硬件环境的分布式文件系统,它的作用是提供高可靠、高扩展、高吞吐的数据存储服务。

2.1.1NameNode(NN)

存储文件的元数据,如文件名称,文件属性(生成时间,副本数,文件权限),文件目录结构,每个文件块所对应的DataNode

image-20220926085522338

2.1.2DataNode(DN)

NameNode 下达命令,DataNode 执行操作并存储实际数据

image-20220926085349767

2.1.3Secondary NameNode(2NN)

每隔一段时间对NameNode元数据进行备份

2.2MapReduce概述

源自于 Google 发表于 2004 年 12 月的 MapReduce 论文,它是一种简化的分布式应用程序开发的编程模型,允许开发人员在不了解分布式系统底层细节和缺少并行应用开发经验的情况下,轻松地编写出分布式并行程序。

2.3YARN概述

YARN 是一个纯粹的资源管理和调度框架,它的作用是为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。

ResourceManager

NodeManagetr

三、Hadoop集群安装

3.1Hadoop下载

官网地址:http://hadoop.apache.org/releases.html

下载地址:https://archive.apache.org/dist/hadoop/common/

3.2模板机准备

用我们上课的机器作为模板机,建议4G内存,40G硬盘,在/opt 目录下创建software用于存放安装包

[root@bigdata04 ~]# mkdir /opt/software

3.3克隆虚拟机

3.3.1通过模板机克隆三台虚拟机,node1,node2,node3

1)关闭正在运行的虚拟机,右键--》管理--》克隆

image-20220926094423681

2)选择完整克隆

image-20220926095031648

image-20220926095504383

3)修改node1,node2,node3的IP处于同一个网段

image-20220926100307539

[root@bigdata04 ~]# vi /etc/sysconfig/network-scripts/ifcfg-ens33
#添加IP地址和自己网段相同  老师是192.168.139网段

IPADDR=192.168.139.10
#添加网关 
GATEWAY=192.168.139.2   
#添加域名解析器
DNS1=192.168.139.2

#修改BOOTPROTO="dhpc" 为 static 
BOOTPROTO="static"
#修改ONBOOT="yes"
ONBOOT="yes"  #系统启动的时候网络接口是否有效(yes/no)

4)修改主机名和映射文件

[root@bigdata04 ~]# vi /etc/hostname
node1
[root@bigdata04 ~]# vi /etc/hosts
192.168.32.11 node1
192.168.32.12 node2
192.168.32.13 node3

[root@bigdata04 ~]# reboot

修改Window映射文件(hosts文件)

文件位置:C:\Windows\System32\drivers\etc\hosts

复制文件到桌面进行修改,添加如下内容

192.168.32.11 node1
192.168.32.12 node2
192.168.32.13 node3

将桌面hosts文件覆盖C:\Windows\System32\drivers\etc路径hosts文件

3.4安装JDK

1)下载JDK

下载地址 https://www.oracle.com/java/technologies/downloads/

2)上传JDK到 /opt/software中

image-20220926152043923

3)到上传目录下解压

[root@node1 software]# tar -zxvf jdk-8u341-linux-x64.tar.gz -C /opt/

4)配置环境变量

[root@node1 ~]# cd /etc/profile.d/
[root@node1 profile.d]# touch bigdata_env.sh
[root@node1 profile.d]# vim bigdata_env.sh

添加环境变量

#JAVA_HOME
export JAVA_HOME=/opt/jdk1.8.0_341
export PATH=$PATH:$JAVA_HOME/bin

5)测试是否安装成功

[root@node1 ~]# java -version
java version "1.8.0_341"

3.5安装Hadoop

1)下载hadoop

下载地址 https://archive.apache.org/dist/hadoop/common/

2)上传hadoop到/opt/software下

和3.4JDK上传步骤一样

3)解压hadoop

[root@node1 software]# tar -zxvf hadoop-3.1.3.tar.gz -C /opt/

4)配置环境变量

[root@node1 software]# vim /etc/profile.d/bigdata_env.sh

添加环境变量

#HADOOP_HOME
export HADOOP_HOME=/opt/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin

5)检验是否安装成功

[root@node1 ~]# hadoop version
Hadoop 3.1.3

6)hadoop目录结构

[root@node1 hadoop-3.1.3]# ll
总用量 176
drwxr-xr-x. 2 root root    183 9月  12 2019 bin
drwxr-xr-x. 3 root root     20 9月  12 2019 etc
drwxr-xr-x. 2 root root    106 9月  12 2019 include
drwxr-xr-x. 3 root root     20 9月  12 2019 lib
drwxr-xr-x. 4 root root    288 9月  12 2019 libexec
-rw-rw-r--. 1 root root 147145 9月   4 2019 LICENSE.txt
-rw-rw-r--. 1 root root  21867 9月   4 2019 NOTICE.txt
-rw-rw-r--. 1 root root   1366 9月   4 2019 README.txt
drwxr-xr-x. 3 root root   4096 9月  12 2019 sbin
drwxr-xr-x. 4 root root     31 9月  12 2019 share

  • bin目录:存放对Hadoop相关服务(HDFS,YARN)进行操作的脚本
  • etc目录:Hadoop的配置文件目录,存放Hadoop的配置文件
  • lib目录:存放Hadoop的本地库(对数据进行压缩解压缩功能)
  • sbin目录:存放启动或停止Hadoop相关服务的脚本
  • share目录:存放Hadoop的依赖jar包、文档、和官方案例

7)Hadoop常用端口

(3)常用端口号说明

NameHadoop2Hadoop3
Hadoop HDFS NameNode8020 / 90009820
Hadoop HDFS NameNode HTTP UI500709870
Secondary NameNode500919869
Secondary NameNode HTTP UI500909868
Hadoop HDFS DataNode IPC500209867
Hadoop HDFS DataNode500109866
Hadoop HDFS DataNode HTTP UI500759864

3.6Hadoop运行模式

hadoop的运行模式有三种,分别为本地模式,伪分布式,完全分布式(公司使用)

3.6.1本地模式

这种模式在一台单机上运行,没有分布式文件系统,而是直接读写本地操作系统的文件系统

1)在hadoop下创建input文件夹用于存放输入文件

[root@node1 hadoop-3.1.3]# mkdir input
[root@node1 hadoop-3.1.3]# cd input

2)创建测试文件,test.txt

[root@node1 input]# vim test.txt
hello world
hello hadoop
hello hive

:wq保存退出

3)执行官网程序

[root@node1 hadoop-3.1.3]# hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount input output

4)查看执行结果

[root@node1 output]# ll
总用量 4
-rw-r--r--. 1 root root 32 9月  26 15:31 part-r-00000
-rw-r--r--. 1 root root  0 9月  26 15:31 _SUCCESS
[root@node1 output]# cat part-r-00000
hadoop	1
hello	3
hive	1
world	1

3.6.2伪分布式

这种模式也是在一台单机上运行,但用不同的Java进程模仿分布式运行中的各类结点: (NameNode,DataNode,SecondaryNameNode)

0.免密登录配置

生成公钥和私钥

[root@node1 ~] ssh-keygen -t rsa

敲三次回车,完成公钥和私钥生成

将公钥发送到需要免密登录的服务器上

[root@node1 ~] ssh-copy-id node1

1)修改配置文件

core-site.xml:核心配置项,包括 HDFS、MapReduce 和 YARN 常用的 I/O 设置等。

[root@node1 hadoop]# vim /opt/hadoop-3.1.3/etc/hadoop/core-site.xml
<configuration>
<!-- 指定NameNode的地址 -->
  <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
<!-- 指定hadoop数据的存储目录 -->
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/hadoop-3.1.3/data</value>
</property>
<!-- 指定hadoop用户 -->
<property>
   <name>hadoop.http.staticuser.user</name>
   <value>root</value>
</property>
</configuration>

hdfs-site.xml:HDFS 相关进程的配置项,包括 NameNode、SecondaryNameNode、DataNode 等。

[root@node1 hadoop]# vim /opt/hadoop-3.1.3/etc/hadoop/hdfs-site.xml
<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
   	</property>
</configuration>

yarn-site.xml:YARN 相关进程的配置项,包括 ResourceManager、NodeManager 等。

[root@node1 hadoop]# vim /opt/hadoop-3.1.3/etc/hadoop/yarn-site.xml
<configuration>
<!-- 指定MR走shuffle -->
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
        </property>
<!-- 指定ResourceManager的地址-->
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>node1</value>
        </property>
</configuration>

在/opt/hadoop-3.1.3/sbin/start-dfs.sh,/opt/hadoop-3.1.3/sbin/stop-dfs.sh中添加:

HDFS_DATANODE_USER=root
HADOOP_SECURE_DN_USER=hdfs
HDFS_NAMENODE_USER=root
HDFS_SECONDARYNAMENODE_USER=root 

在/opt/hadoop-3.1.3/sbin/start-yarn.sh,/opt/hadoop-3.1.3/sbin/stop-yarn.sh中添加:

YARN_RESOURCEMANAGER_USER=root
HADOOP_SECURE_DN_USER=yarn
YARN_NODEMANAGER_USER=root

2)集群启动

​ (1)如果集群是第一次启动,需要在node1节点格式化NameNode(注意格式化NameNode,会产生新的集群id,导致NameNode和DataNode的集群id不一致,集群找不到已往数据。如果集群在运行过程中报错,需要重新格式化NameNode的话,一定要先停止namenode和datanode进程,并且要删除所有机器的data和logs目录,然后再进行格式化。)

[root@node1 hadoop]# hdfs namenode -format

(2)启动HDFS

[root@node1 hadoop]# start-dfs.sh

(3)在配置了ResourceManager的节点启动YARN

[root@node1 hadoop]# start-yarn.sh

(4)Web端查看HDFS的NameNode

浏览器中输入:http://node1:9870

(5)Web端查看YARN的ResourceManager

浏览器中输入:http://node1:8088

3.6.3完全分布式

这种模式在多台机器上运行,是真正的分布式,也是公司中使用的

1)完成三台虚拟机器node1、node2、node3的网络配置(性能好的可以多弄几台)

2)rsync远程同步工具

三台服务器都需要安装

yum -y install rsync

rsync主要用于备份和镜像。具有速度快、避免复制相同内容和支持符号链接的优点

(1)基本语法

rsync -av $pdir/$fname $user@ip:$pdir/$fname

命令 选项参数 要拷贝的文件路径/名称 目的用户@主机:目的路径/名称

​ 选项参数说明

选项功能
-a归档拷贝
-v显示复制过程

(2)案例练习

案例1:把node1的/opt/software 同步到node2的/opt/software,选择yes,输入node2的密码

[root@node1 ~]# rsync -av /opt/software/ root@node2:/opt/software/

(3)远程同步脚本编写

[root@node1 ~]# vim rsyncAll.sh  
#!/bin/bash
#判断参数个数
if [ $# -lt 1 ]
then
  echo "请输入参数"
  exit;
fi
# 遍历集群所有机器
for host in node1 node2 node3
do
  echo --------------- $host ---------------
  # 遍历所有目录,挨个发送
  for file in $@
  do
    #判断文件是否存在
    if [ -e $file ]
    then
      #获取父目录
      pdir=$(cd -P $(dirname $file); pwd)
      #获取当前文件的名称
      fname=$(basename $file)
      ssh $host "mkdir -p $pdir"
      rsync -av $pdir/$fname $host:$pdir
    else
      echo $file 不存在!
    fi
  done
done

(4)修改权限并复制到/bin/下

[root@node1 ~]# chmod +x rsynsAll.sh
[root@node1 ~]# cp rsyncAll.sh /bin/

(5)同步/opt 下的JDK和Hadoop到node2,node3节点上

3)SSH免密登录配置

(1)基本语法

ssh另一台电脑的ip地址

[root@node1 ~]ssh node1

(2)原理

(3)生成公钥和私钥

[root@node1 ~] ssh-keygen -t rsa

敲三次回车,完成公钥和私钥生成

(4)将公钥发送到需要免密登录的服务器上

[root@node1 ~] ssh-copy-id node1
[root@node1 ~] ssh-copy-id node2
[root@node1 ~] ssh-copy-id node3

注意,需要在node1,node2,node3上都完成这个步骤

4)集群规划

node1node2node3
HDFSNameNode、DataNodeDataNodeSecondaryNameNode、DataNode
YARNNodeManagerResourceManager、NodeManagerNodeManager

5)修改配置文件

core-site.xml核心文件

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<!-- 指定NameNode的地址 -->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://node1:9820</value>
</property>
<!-- 指定hadoop数据的存储目录 -->
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/hadoop-3.1.3/data</value>
</property>
<!-- 指定hadoop数据的用户 -->
 <property>
        <name>hadoop.http.staticuser.user</name>
        <value>root</value>
</property>

</configuration>

vim hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
	<!-- nn web端访问地址-->
	<property>
        <name>dfs.namenode.http-address</name>
        <value>node1:9870</value>
    </property>
	<!-- 2nn web端访问地址-->
    <property>
        <name>dfs.namenode.secondary.http-address</name>
        <value>node3:9868</value>
    </property>
</configuration>

vim yarn-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
	<!-- 指定MR走shuffle -->
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
	</property>
<!-- 指定ResourceManager的地址-->
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>node2</value>
	</property>
<!-- 环境变量的继承 -->
    <property>
        <name>yarn.nodemanager.env-whitelist</name>
        <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
	</property>
<!-- yarn容器允许分配的最大最小内存 -->
    <property>
        <name>yarn.scheduler.minimum-allocation-mb</name>
        <value>512</value>
    </property>
    <property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>4096</value>
</property>
<!-- yarn容器允许管理的物理内存大小 -->
    <property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>4096</value>
</property>
<!-- 关闭yarn对物理内存和虚拟内存的限制检查 -->
    <property>
        <name>yarn.nodemanager.pmem-check-enabled</name>
        <value>false</value>
    </property>
    <property>
        <name>yarn.nodemanager.vmem-check-enabled</name>
        <value>false</value>
    </property>
</configuration>

vim mapred-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
	<!-- 指定MapReduce程序运行在Yarn上 -->
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>

配置workers

[root@node1 hadoop]# vim /opt/hadoop-3.1.3/etc/hadoop/workers
node1
node2
node3

在/opt/hadoop-3.1.3/sbin/start-dfs.sh,/opt/hadoop-3.1.3/sbin/stop-dfs.sh中添加:

HDFS_DATANODE_USER=root
HADOOP_SECURE_DN_USER=hdfs
HDFS_NAMENODE_USER=root
HDFS_SECONDARYNAMENODE_USER=root 

在/opt/hadoop-3.1.3/sbin/start-yarn.sh,/opt/hadoop-3.1.3/sbin/stop-yarn.sh中添加

YARN_RESOURCEMANAGER_USER=root
HADOOP_SECURE_DN_USER=yarn
YARN_NODEMANAGER_USER=root

同步配置文件

[root@node1 hadoop]# rsyncAll /opt/hadoop-3.1.3/

6)集群启动

​ (1)如果集群是第一次启动,需要在node1节点格式化NameNode(注意格式化NameNode,会产生新的集群id,导致NameNode和DataNode的集群id不一致,集群找不到已往数据。如果集群在运行过程中报错,需要重新格式化NameNode的话,一定要先停止namenode和datanode进程,并且要删除所有机器的data和logs目录,然后再进行格式化。)

[root@node1 hadoop]# hdfs namenode -format

(2)启动HDFS

[root@node1 hadoop]# start-dfs.sh

(3)在配置了ResourceManager的节点(node2)启动YARN

[root@node1 hadoop]# start-yarn.sh

(4)Web端查看HDFS的NameNode

浏览器中输入:http://node1:9870

(5)Web端查看YARN的ResourceManager

浏览器中输入:http://node2:8088

7)集群测试

创建文件夹

[root@node1 sbin]# hadoop fs -mkdir /input

上传文件

[root@node1 output]# hadoop fs -put /opt/hadoop-3.1.3/input/test.txt  /input

下载文件到本地当前目录

[root@node1 output]# hadoop fs -get /input/test.txt ./

wordcount程序运行

[root@node1 output]# hadoop jar /opt/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /input /output

8)各个服务组件逐一启动/停止

​ 启动/停止HDFS组件

hdfs --daemon start/stop namenode/datanode/secondarynamenode

启动/停止YARN

yarn --daemon start/stop  resourcemanager/nodemanager

9)各个模块分开启动/停止(配置ssh是前提)

整体启动/停止HDFS

start-dfs.sh/stop-dfs.sh

整体启动/停止YARN

start-yarn.sh/stop-yarn.sh

四、分布式文件系统HDFS

4.1HDFS 概述

HDFS(Hadoop Distributed File System)是 Hadoop 分布式文件系统,是 Hadoop 三大核心之一,是针对谷歌文件系统 GFS(Google File System)的开源实现(The Google File System, 2003)。

HDFS 是一个具有高容错性的文件系统,适合部署在廉价的机器上,HDFS 能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。大数据处理框架如 MapReduce、Spark 等要处理的数据源大部分都存储在 HDFS 上,Hive、HBase 等框架的数据通常也存储在 HDFS 上。

简而言之,HDFS 为大数据的存储提供了保障。经过多年的发展,HDFS 自身已经十分成熟和稳定,且用户群愈加广泛,HDFS 逐渐成为分布式存储的事实标准。

4.2HDFS 文件系统的基本特征

  • 高容错性。

    把硬件出错看做一种常态,HDFS 设计了相应机制能够快速自动进行错误检测和恢复。

  • 数据容量大。

    目前普通计算机的硬盘容量可以达到 1T 甚至更多,而 HDFS 集群可以轻松拥有上百台节点甚至更大的规模,还可以不停地往集群中添加节点,以适应海量数据的需求。

  • 可扩展性。

    水平扩展性强,集群中的节点可以根据需要进行动态地添加和删除。

  • 高吞吐量。

    由于 Hadoop 集群采用的是分布式架构,可以并行地处理数据,从而大大提高了单位时间内的数据处理能力,提高了吞吐量。

4.3HDFS 应用场景

HDFS 被设计为适应如下的应用场景:

  • 需要存储非常大的文件:这里非常大指的是 GB、或者 TB 级别,需要高吞吐量,对延时没有要求。
  • 一次写入、多次读取:即历史数据没有频繁变化的需求。
  • 硬件成本预算低:期望由普通廉价机器完成存储任务。
  • 需要高容错性:数据自动保存多个副本,副本丢失后自动恢复。
  • 需要根据数据规模变化方便地扩容:HDFS 集群可以横向扩展,当集群增加新节点之后,NameNode 自动感知,将数据分发和备份到相应的节点上。

但它也有不适合的应用场景:

  • 低延时的数据访问:对延时要求在秒级别的应用,不适合采用 HDFS,因为 HDFS 是为高吞吐数据传输设计的。
  • 需要任意的文件修改:HDFS 采用追加(append-only)的方式写入数据。不支持文件任意 offset 的修改,且不支持多个写入器(writer)。
  • 大量小文件:由于 HDFS 的 NameNode 要记录文件的元数据,大量小文件将导致元数据规模很大。HDFS 会将文件切割成块(block),块的大小(blocksize)默认为 128M。存储一个大小恰好为 128M 的文件,HDFS 只会记录一个块的元数据;存储 128 个 1024K 大小的文件,HDFS 则会记录 128 个块的元数据(HDFS 不会对小文件进行合并)。文件总大小固定,用小文件(远小于 blocksize)比用大文件所导致的块的数量要多得多。文件及块的元数据保存在 NameNode 的内存中,太多的元数据将导致 NameNode 所在节点的内存难以支撑。

HDFS文件块大小:HDFS中的文件在物理存储上是分块(block)存储的,块的大小可以通过(dfs.blocksize)来设置。Hadoop后默认为128M(通过寻址时间和传输时间计算出来)

4.4HDFS 架构

HDFS 是一个主从(Mater-Slave)(leader-works)体系结构。由四个部分组成,HDFS Client、NameNode、DataNode 和 Secondary NameNode,如下图所示。

图片描述

  • HDFS Client,通常指 HDFS 集群下各种操作请求对应的客户端。其功能主要有以下几点:

    1. 文件上传到 HDFS 的时候,Client 将文件切分成一个一个的 数据块Block,然后进行存储。
    2. Client 与 NameNode 交互,获取文件的位置信息。
    3. Client 与 DataNode 交互,读取或者写入数据。
    4. Client 提供一些命令来管理和访问 HDFS,比如启动或者关闭 HDFS。
  • NameNode:集群老大,管理者

    1. 管理HDFS命令空间
    2. 管理副本策略
    3. 管理数据块(映射)
    4. 处理客户端请求
  • DataNode:集群小弟,实际干活的服务器

    1. 存储实际的数据块(bloke)
    2. 执行数据的读写操作
  • Secendary NameNode:辅助NameNode.

4.5HDFS Web

HDFS Web UI 提供服务器基础统计信息和文件系统运行状态的查看功能,从该页面上,可以查看当前文件系统中各个节点的分布信息,浏览 NameNode 上的存储、登录等日志,以及下载某个数据节点上某个文件的内容。HDFS 2.X Web UI 地址为 http://NameNodeIP:50070 HDFS 3.X Web UI 地址为 http://NameNodeIP:9870进入后可以看到当前 HDFS 文件系统的 Overview、Summary、NameNode Journal Status、NameNode Storage 等信息。首页中 Datanodes 选项卡下显示当前配置的 DataNode 节点基本信息,Datanode Volume Failures 选项卡下显示日志中已经记录的数据节点失败信息,Snapshot 选项卡下显示快照信息,Startup Progress 选项卡中显示系统启动进度信息例如元数据文件 FsImage、日志 EditLog 等的加载进度,在 Utilities 选项卡中包括了一些实用工具,例如文件系统查询和浏览功能。如下图所示:

图片描述

4.6HDFS的Shell操作(开发重点)

4.6.1 基本语法

hadoop fs 具体命令 or hdfs dfs 具体命令

4.6.2 上传命令

1)-put

[root@node1 testHDFS]# hadoop fs -put appendToFile.txt /

2)-appendToFile:追加一个文件到已经存在的文件末尾

[root@node1 testHDFS]# touch appendToFile.txt

[root@node1 testHDFS]# vim appendToFile.txt
666
[root@node1 testHDFS]# hadoop fs -appendToFile appendToFile.txt /bigdata03.txt

4.6.3下载命令

1)-get,就是从HDFS下载文件到本地

[root@node1 testHDFS]# hadoop fs -get /bbb.txt ./

2)-getmerge:合并下载多个文件,比如HDFS的目录 /test下有多个文件:log.1, log.2,log.3,...

[root@node1 testHDFS]# hadoop fs -getmerge /bigdata03/* ./merge.txt

4.6.4 HDFS直接操作命令

1)-ls: 显示目录信息

[root@node1 testHDFS]# hadoop fs -ls /

2)-mkdir:在HDFS上创建目录

[root@node1 testHDFS]# hadoop fs -mkdir -p /bigdata03/group1

3)-cat:显示文件内容

[root@node1 testHDFS]#$ hadoop fs -cat /a.txt

4)-chgrp 、-chmod、-chown:Linux文件系统中的用法一样,修改文件所属权限

[root@node1 testHDFS]# hadoop fs  -chmod  777  /

[root@node1 testHDFS]# hadoop fs  -chown  root:root  /

5)-cp :从HDFS的一个路径拷贝到HDFS的另一个路径

[root@node1 testHDFS]# hadoop fs -cp /bigdata03/group1/zangsan.txt  /

6)-mv:在HDFS目录中移动文件

[root@node1 testHDFS]# hadoop fs -mv /bigdata03/group1/zangsan.txt /bigdata03/group

7)-tail:显示一个文件的末尾1kb的数据

[root@node1 testHDFS]# hadoop fs -tail /bigdata03/group1/zangsan.txt

8)-rm:删除文件或文件夹

[root@node1 testHDFS]# hadoop fs -rm /user/atguigu/test/jinlian2.txt

9)-rmdir:删除空目录

[root@node1 testHDFS]# hadoop fs -mkdir /test

[root@node1 testHDFS]# hadoop fs -rmdir /test

10)-du统计文件夹的大小信息

[root@node1 testHDFS]# hadoop fs -du  -h /

4.7数据读取机制

HDFS 的真实数据分散存储在 DataNode 上,但是读取数据时需要先经过 NameNode。 HDFS 数据读取的基本过程为:首先客户端连接到 NameNode 询问某个文件的元数据信息, NameNode 返回给客户一个包含该文件各个块位置信息(存储在哪个 DataNode)的列表;然后,客户端直接连接对应 DataNode 来并行读取块数据;最后,当客户得到所有块后,再按照顺序进行组装,得到完整文件。为了提高物理传输速度,NameNode 在返回块的位置时,优先选择距离客户更近的 DataNode。

4.8数据写入机制

HDFS 的设计遵循“一次写入,多次读取”的原则,所有数据只能添加不能更新。数据会被划分为等尺寸的块写入不同的 DataNode 中。每个块通常保存指定数量的副本(默认 3 个)。HDFS 数据写入的基本过程为:客户端向 NameNode 发送文件写请求,NameNode 给客户分配写权限,并随机分配块的写入地址(DataNode 的 IP),假设副本数量是 3,则每个块会分配到三个不同的 DataNode,为了提高传输效率,客户端只会向其中一个 DataNode 复制一个副本,另外两个副本则由 DataNode 传输到相邻 DataNode。

五、分布式计算框架MapReduce

image-20221011194625331

5.1MapReduce 技术特点(了解)

单节点在处理海量数据时,会受到硬件条件的限制。具体地讲,每个计算机的 CPU、内存和硬盘等资源均有限制,无法在短时间内完成大量运算。一种有效的解决方案就是开发一套分布式系统,先将处理海量数据的任务拆分成多个子任务,然后由每个节点分别完成一个子任务,最后再将所有子任务进行汇总即可。然而,开发一套分布式系统无疑会大大增加程序的复杂性,从而增大开发成本。能否拥有一套现成的框架,我们只需编写各个子任务的业务逻辑,之后再将业务逻辑代码填入到这个框架中就能完成分布式运算呢?MapReduce 就是这样一个框架。

MapReduce 是由谷歌推出的一个编程模型,是一个能处理和生成超大数据集的算法模型,该架构能够在大量普通配置的计算机上实现并行化处理。MapReduce 在设计上具有以下主要的技术特征:

  • 向“外”横向扩展,而非向“上”纵向扩展

MapReduce 集群的构建完全选用价格便宜、易于扩展的低端服务器,而非价格昂贵、不易扩展的高端服务器。

  • 失效被认为是常态

MapReduce 集群中使用大量的低端服务器,因此,节点硬件失效和软件出错是常态,因而一个良好设计、具有高容错性的并行计算系统不能因为节点失效而影响计算服务的质量,任何节点失效都不应当导致结果的不一致或不确定;任何一个节点失效时,其它节点要能够无缝接管失效节点的计算任务;当失效节点恢复后应能自动加入集群,而不需要人工干预。

MapReduce 并行计算软件框架使用了多种有效的错误检测和恢复机制,如节点自动重启技术,使集群和计算框架具有健壮性。

  • 将计算向数据迁移

传统高性能计算系统往往将数据传输到计算节点上进行处理,这样在处理大规模数据时,存储节点上的文件 I/O 会成为制约系统性能的瓶颈。

为了减少大规模数据并行计算系统中的数据通信开销,考虑将计算与数据尽量靠拢,MapReduce 采用了数据|代码互定位技术,计算节点将首先尽量负责计算其本地存储的数据,以发挥数据本地化读写优势,仅当节点无法处理本地数据时,再采用就近原则寻找其它可用计算节点,并把数据传送到该可用计算节点。

  • 顺序处理数据、避免随机访问数据

大规模数据处理的特点决定了大量的数据记录难以全部存放在内存,而通常只能放在硬盘中进行处理。由于磁盘的顺序访问要远比随机访问快得多,因此 MapReduce 主要设计为面向顺序式大规模数据的磁盘访问处理。

  • 为应用开发者隐藏系统层细节

在编写并行程序时有很多困难,如需要考虑多线程中诸如同步等复杂繁琐的细节。由于并发执行中的不可预测性,程序的调试查错也十分困难;而且,大规模数据处理时程序员需要考虑诸如数据分布存储管理、数据分发、数据通信和同步、计算结果收集等诸多细节问题。

MapReduce 提供了一种抽象机制将程序与系统层细节隔离开来,程序员仅需描述需要计算什么,而具体怎么去计算就交由系统的执行框架处理,这样程序员就可从系统层细节中解放出来,而致力于问题的算法设计。

  • 性能的线性增长

理想的软件算法应当能随着数据规模的扩大而表现出持续的有效性,性能上的下降程度应与数据规模扩大的倍数相当;在集群规模上,要求算法的计算性能应能随着节点数的增加保持接近线性程度的增长。绝大多数现有的单机算法都达不到以上理想的要求;把中间结果数据维护在内存中的单机算法在大规模数据处理时很快失效;从单机到基于大规模集群的并行计算从根本上需要完全不同的算法设计。奇妙的是,MapReduce 在很多情形下能实现以上理想的扩展性特征。多项研究发现,对于很多计算问题,基于 MapReduce 的计算性能可随节点数目增长保持近似于线性的增长。

MapReduce是一个分布式运算程序的编程框架,其核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并行运行在一个Hadoop集群上。

图片描述

5.2官网MapReduce运行

1)启动hadoop集群

1.1伪分布式启动

[root@node1 ~]# start-dfs.sh
[root@node1 ~]# start-yarn.sh

1.2完全分布式启动

 [root@node1 ~]# start-dfs.sh
 [root@node2 ~]# start-yarn.sh

2)在分布式文件系统HDFS创建input文件夹

[root@node1 ~]# hadoop fs -mkdir /input

3)上传测试数据

[root@node1 ~]# vim test.txt
hadoop hadoop
hello hive
hive hive
[root@node1 ~]# hadoop fs -put test.txt /input

4)运行官网案例

[root@node1 ~]# hadoop jar /opt/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /input /output

hadoop 是命令的名称,jar 是执行的文件类型,/opt/……/hadoop-mapreduce-examples-3.1.3.jarjar 包的完整路径,wordcount 是任务名称,/input 表示输入的数据目录,该目录下的所有文件都要计算,output为计算结果。该命令执行之后,会发现在 /output 目录下新生产了 2 个文件:

image-20221011195440288

后缀带 \_SUCCESS 的文件,表明这次运算成功执行。另一个名为 part-r-00000 的文件就是输出结果文件了。接下来,查看该输出文件的内容:

image-20221011195614542

5.3Hadoop数据类型

Java类型Hadoop Writable类型
BooleanBooleanWritable
ByteByteWritable
IntIntWritable
FloatFloatWritable
LongLongWritable
DoubleDoubleWritable
StringText
MapMapWritable
ArrayArrayWritable
NullNullWritable

5.4编程实现Wordcount

需求:假设现在有一个破旧的扑克牌回收工厂,需要算出各种花色的扑克牌有多少

0.依赖导入

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
</dependencies>
<build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

1.编写入口类

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 1 获取配置信息以及获取job对象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 关联本Driver程序的jar
        job.setJarByClass(WordCountDriver.class);

        // 3 关联Mapper和Reducer的jar
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 4 设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

2.编写Map类

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	Text k = new Text();
	IntWritable v = new IntWritable(1);
	
	@Override
	protected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {
		// 1 获取一行
		String line = value.toString();
		// 2 切割
		String[] words = line.split(" ");
		// 3 输出
		for (String word : words) {		
			k.set(word);
			context.write(k, v);
        //(红桃A,1)
		}
	}
}

3.编写Reduce类

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {

    int sum;
    IntWritable v = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        sum=0;
        //相同key的V进行累加
        for (IntWritable count:values){
            sum=sum+count.get();
        }
        //输出
        v.set(sum);
        context.write(key,v);

    }
}

4.打包运行

//hadoop jar 包的位置 全类名 输入路径 输出路径
[root@hadoop test]# hadoop jar xxx.jar WordCountDriver wcinput/ wcoutput

5.查看结果

原文件

黑桃A 红桃3
黑桃A
黑桃K
红桃8
黑桃A 红桃3
黑桃A
......

输出结果

红桃3	114048
红桃4	38016
红桃6	38016
红桃8	76032
黑桃A	139392
黑桃K	76032

5.5数据清洗案例实现

MapReduce 在大数据领域有一个非常重要的功能:清洗数据。这里我们就以服务器的日志为例,选取一些常用的数据字段,并抽取部分数据片段如下:

211.191.131.84 - - [20/07/2020:10:42:30 +0800] "GET /el/book.jsp HTTP/1.1" 200 11452
166.179.223.239 - - [20/08/2020:10:42:31 +0800] "GET /el/index.jsp HTTP/1.1" 200 1336
105.163.58.172 - - [20/07/2020:10:42:33 +0800] "GET /el/register.jsp HTTP/1.1" 200 1336
86.104.131.213 - - [20/07/2020:10:44:59 +0800] "GET /el/index.jsp HTTP/1.1" 404 1022
55.249.94.97 - - [20/07/2020:10:45:09 +0800] "POST /el/book.jsp HTTP/1.1" 200 1301
0.45.116.44 - - [20/07/2020:10:46:45 +0800] "GET /el/music.jsp HTTP/1.1" 200 1375
177.200.147.39 - - [20/07/2020:10:46:45 +0800] "POST /favicon.ico HTTP/1.1" 200 21630
92.165.192.61 - - [20/07/2020:10:47:22 +0800] "GET /el/book.jsp HTTP/1.1" 200 1381
244.27.50.176 - - [20/07/2020:10:47:22 +0800] "POST /favicon.ico HTTP/1.1" 200 21630
219.7.147.24 - - [20/07/2020:10:50:49 +0800] "GET /el/hmm.png HTTP/1.1" 200 1360

这里的数据按照空格分隔,以第一条为例来说明:

  • 第 1 列 211.191.131.84 为访问的 IP 地址。
  • 第 2、3 列为 -,表示没有值。
  • 第 4 列 [20/07/2020:10:42:30 为日期时间。
  • 第 5 列 `+0800]`` 为与标准时区的时差。
  • 第 6 列 "GET 为请求方式。
  • 第 7 列 /el/book.jsp 为页面地址。
  • 第 8 列 `/HTTP/1.1"`` 为 HTTP 请求版本。
  • 第 9 列 200 为页面请求响应码,如 200 表示成功。
  • 第 10 列 11452 为请求的页面的字节大小。

10 列数据,有些数据我们不打算要,如第 2、3、5、8 列,有些列的格式不太友好,需要重新整理,如第 4 列改为 2020-05-20 10:42:30 这种格式,第 6"GET 去掉 ",需要把这些无效的访问行去掉。第 910 列保持原样。

编写Map类

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;

public class ETLMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    SimpleDateFormat sdf1 = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
    SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        //211.191.131.84 - - [20/07/2020:10:42:30 +0800] "GET /el/book.jsp HTTP/1.1" 200 11452
        //获取每行数据
        String line = value.toString();
        //按行切割
        String[] s = line.split(" ");
        //ip地址
        String ip=s[0];
        //访问时间
        String time=s[3];
        try {
            time=sdf2.format(sdf1.parse(time.substring(1)));
        } catch (ParseException e) {
            throw new RuntimeException(e);
        }
        //请求方式
        String method=s[5];
        method=method.substring(1);
        //url
        String url=s[6];
        //返回状态
        String status = s[8];//返回状态
        //页面字体大小
        String num = s[9];//返回页面字节大小

        //拼接数据
        StringBuilder sb=new StringBuilder();
        sb.append(ip).append("\t")
                .append(time).append("\t")
                .append(method).append("\t")
                .append(url).append("\t")
                .append(status).append("\t")
                .append(num);
        //写出数据
        context.write(new Text(sb.toString()),NullWritable.get());

    }
}

编写入口类

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ETLDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 1 获取配置信息以及获取job对象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 关联本Driver程序的jar
        job.setJarByClass(ETLDriver.class);

        // 3 关联Mapper
        job.setMapperClass(ETLMapper.class);

//        // 4 设置Mapper输出的kv类型
//        job.setMapOutputKeyClass(Text.class);
//        job.setMapOutputValueClass(NullWritable.class);

        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

打包运行

[root@node1 ~]# hadoop jar MapReduceTest-1.0-SNAPSHOT-jar-with-dependencies.jar ETLDriver /etl /etloutput

5.6MapReduce优缺点

优点缺点
“易于编程”不擅长实时计算
良好的扩展性每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO
良好的容错性
可支持上千台服务器集群并发工作

6.Yarn资源调度器

image-20221017165045381

6.1YARN 概述(了解)

YARN(Yet Another Resource Negotiator,另一种资源协调者)是 Hadoop2.0 的资源管理和调度框架,是一个通用的资源管理系统,在其上可以部署各种计算框架,它可为上层应用提供统一的资源管理和调度,它的引入为集群高可用性、可扩展性、资源利用率和数据共享等方面带来了很大好处。

YARN 的基本思想是将资源管理和任务调度与监控的功能分解为单独的守护进程。具体来说,ResourceManager 负责资源管理,ApplicationMaster 则负责任务调度与监控,而他们都需要 NodeManager(资源的真正宿主)的协作。YARN 的优势包括以下几个方面:

1. 可扩展性(Scalability)。

YARN 可以在大规模的集群上运行。YARN 利用 ResourceManager 和 ApplicationMaster 分离的架构优点,可以将集群轻松扩展到将近 10000 个节点和 100000 个任务。另外,YARN Federation 联邦机制会进一步增强集群的水平横向扩展性。

2. 高可用性(Availability)。

当 ResourceManager 进程失败时,备用的 ResourceManager 进程接管工作以便继续提供服务,从而可以获得高可用性(High Available)。一方面主备模式(Active/Standby--处于 Active 状态的节点提供客户端的各种服务,而处于 Standby 状态的节点暂时不提供客户端的各种服务,只进行数据同步,一旦 Active 状态的节点出现故障时,处于 Standby 状态的节点立刻切换为 Active 状态,保证集群随时都能高可用)保证了 YARN 高可用性,另一方面 ResourceManager Restart 特性保证了若 ResourceManager 发生单点故障,ResourceManager 能尽快自动重启。

3. 资源利用率高(Utilization)。

YARN 将资源管理和任务控制分离,使得资源管理更加合理。同时使用 Container 对资源进行抽象,它是一个动态资源划分单位,是根据任务的需求动态生成的,更加轻量级。YARN 中使用了容量调度器,支持多个调度队列同时运行,提高了资源的利用率。

4. 计算多样性(variety)。

在某种程度上可以说,YARN 最大的优点是向 MapReduce 以外的其它分布式计算框架开放了 Hadoop,MapReduce 仅是许多 YARN 应用中的一个,Spark、Tez、Storm 等计算框架也都可以运行在 YARN 上。另外,用户甚至可以在同一个 YARN 集群上运行不同版本的 MapReduce,这使得升级 MapReduce 更好管理。

6.2YARN 体系架构(了解)

和 HDFS 一样,YARN 也采用主从架构(Master/Slave),ResourceManager(可简写为 RM)是 Master,NodeManager(可简写为 NM)是 Slave,它们是守护进程;而 ApplicationMaster(可简写为 AppMstr)是与任务同生命周期的临时进程。在这 3 个核心组件之外,还有一个重要概念——Container(容器):Container 将内存、CPU、磁盘、网络等资源封装在一起,如规定“1 核 CPU+2G 内存”为 1 个 Container。Container 就是我们一直在提的“资源”,它存在于 NM 上。

RM 是集群中所有资源的老大,NM 是每台机器上 Containers 的代言人,监控它们的资源使用情况并将其报告给 RM。

AppMstr 是某个任务的实例,它与 RM 协商资源并与 NM 一起工作来执行和监视这些任务。

YARN 体系架构如下图所示。

图片描述

结合上图,我们进一步可知:

Client 负责任务的提交,RM 根据资源情况在某个 NM 上启动一个 Container 来运行 AppMstr;现在任务的管理权转移到了 AppMstr,它负责根据任务的情况,进一步向 RM 申请资源,RM 根据资源分配策略指令某些 NM 启动 Container 并分配给 AppMstr;接下来 Container 开始工作(运行各类 Task 如 MapTask 和 ReduceTask),NM 监视资源的使用情况并汇报给 RM,AppMstr 则监视应用的运行情况。

6.3YARN 调度器与调度策略(掌握)

Hadoop 集群在处理任务时,理想情况下,任务对 YARN 请求的资源应该立刻得到满足,但现实情况资源往往是有限的,特别是在一个很繁忙的集群上,一个应用资源的请求经常需要等待一段时间才能得到满足。为此,YARN 提供了多种调度器和可配置的策略供我们选择。

在 YARN 中有三种调度器可以选择:FIFO 调度器,Capacity 调度器和 Fair 调度器。

1. FIFO 调度器(先入先出调度器)。

FIFO 调度器是 Hadoop1.x 默认的调度器。

FIFO 调度器把任务按提交的顺序排成一个队列,这是一个先进先出队列,在进行资源分配的时候,先给队列中最前面的任务分配资源,待最前面的任务需求满足后再给下一个分配,以此类推。

FIFO 调度器是最简单也是最容易理解的调度器,也不需要任何配置,但是现实中任务有大有小,大的任务可能会占用所有集群资源,这就导致其它任务被阻塞。此时,可能更适合采用 Capacity 调度器或 Fair 调度器,这两个调度器都允许大任务和小任务同时获得一定的系统资源。

image-20221017233733146

2. Capacity 调度器(容量调度器)。

Capacity Scheduler是多用户调度器

Capacity 调度器是 Hadoop2.x 默认的调度器。其原理如下:

支持多个队列,每个队列可配置一定量的资源,每个队列又采用 FIFO 的方式调度。为了防止同一个用户的任务独占队列中的资源,调度器会对同一用户提交的任务所占资源进行限制。分配新的任务时,首先计算每个队列中正在运行任务的个数与其队列实际分配的资源量做比值,然后选择比值最小的队列。假设有 3 个队列 A、B 和 C。其中 A 队列占用 20% 的资源,目前已经有 15 个任务正在运行,B 队列占用 50% 的资源,目前已经有 25 个任务正在运行,C 队列占用 30% 的资源,目前已经有 25个任务正在运行。那么 3 个比值依次为:15/20%=75,25/50%=50,25/30%=83.33。所以选择最小 值队列 B。选定队列后,然后按照队列中的任务的优先级和时间顺序,同时要考虑到用户的资源量和内存的限制,对队列中的任务进行排序执行。多个队列同时并行执行,每个队列按照任务队列内的先后顺序依次执行。

image-20221017233422865

3. Fair 调度器(公平调度器)。

支持多个队列,每个队列可以配置一定的资源,每个队列中的任务公平共享其所在队列的所有资源。队列中的任务都是按照优先级分配资源,优先级越高分配的资源越多,但是为了确保公平,每个任务都会分配到一定资源。优先级是根据每个任务的理想获取资源量减去实际获取资源量的差值决定的,差值越大优先级越高。

image-20221018075813471

默认Yarn的配置下,容量调度器只有一条Default队列。在capacity-scheduler.xml中可以配置多条队列,并降低default队列资源占比.

hadoop

评论