Hadoop简介
Hadoop(Hdfs\MR\Yarn\ZK)
一、Hadoop简介
1.1 hadoop简介
作用:海量数据的存储,海量数据的分析
组件:hdfs--------数据的存储
MapReduce---数据的的分析
yarn--------资源的调度
comments----工具
优点:成本低--普通电脑就可以
易扩容--扩容机制
可靠----通过多个地方保存副本
高效----分布式存储
一键开启命令:start-dfs.sh
一键关闭命令:stop-dfs.sh
访问文件系统页面:linux01:8020

1.2 hadoop分布式文件管理系统核心原理
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-On5zOgPg-1657697962243)(img\image-20210620141336631.png)]
二、Hdfs
2.1 hdfs 介绍
hdfs 是一种分布式文件管理系统,可以操作多台机器,目前在学习中,不可能每个人搞几台电脑,来组建一个集群,于是通过linux虚拟机创建三台虚拟机,每台虚拟机的内存是2G磁盘是20G,20G的磁盘最终还是写到本机上。
2.2 hadoop集群的搭建
1、在linux01中下载安装hadoop系统,然后修改配置文件,最后分发给linux02和linux03
注:在配置文件中,指定了三台虚拟机各自的角色,linux01作为主节点和从节点,而2和3作为从节点,当把配置好的hadoop包分发给linux02和linux03的时候,linux02和linux03便接受了这个配置,作为从节点,而且三台虚拟机的连接是通过提前设置好的免密协议。有了免密协议,有了配置文件,此时初始化namenode,由于从节点要跟主节点进行连接,要向主节点进行汇报,所以必须先开启主节点,之后再开启从节点。(因为麻烦,所以后面设置了一键全部开启)。都开启之后,hadoop集群便搭建成功,此时三台虚拟机便构成了一个hadoop集群,其中linux01是主节点,linux02和linux03是从节点。有什么操作在主节点上进行操作,具体的执行由从节点来执行。
2、开启hadoop分布式文件管理系统后与不开启之前三台虚拟机的区别:
在未开启hadoop的时候,三台虚拟机只是三台可以免密传输的虚拟机,各自执行着各自的任务
开启hadoop之后,此三台虚拟机不仅是三台可以操作的虚拟机,而且形成了一个hadoop集群,有了各自的角色;当我在linux01中上传了一个文件的时候,会自动在三个从节点上保存三个副本,也就是linux01、02、03都存有此文件。
3、数据在存入的时候存到了哪?
1、namenode存储元数据(数据存储位置信息)的位置是/opt/hdpdata/name
2、datanode存储数据的位置是/opt/hdpdata/data 后面有个很深的目录,数据就存放在那,
而且更改了名字,nameNode提供的访问目录是虚拟目录
数据存储的最终目的地还是落在了本地磁盘上。

2.3 hdfs 组成结构图
)]
2.4 shell命令–hdfs的操作命令
shell命令本来是在bin目录下才能执行,配置环境变量后,便可以在任意位置执行。
上传:hdfs dfs -put 本地文件全路径 /hdfs路径
下载:hdfs dfs -get /hdfs路径 /本地路径
创建文件夹:hdfs dfs -mkdir -p /a/b/c
移动:hdfs dfs -mv /x /y
改名:hdfs dfs -mv /y /yy
删除:hdfs dfs -rm -r /aa
hdfs dfs -rm /a/a.txt
查看:hdfs dfs -ls /a
hdfs dfs -ls -a /
查看内存:hdfs dfs -df -h
hdfs dfs -du -h /aa
查看文档: hdfs dfs -tail/head /a.txt
修改权限:hdfs dfs -chmod 750 /aa

2.5 maven使用–java中操作hdfs
1、maven是一个工具,通过它 可以在java上写shall命令、即可以在java上操作hdfs
2、首先在java中配置maven的运行环境,和一些命令,以及在本地配置hadoop的运行环境
3、要想在java中操作hdfs 首先要获得hdfs的操作对象,然后通过这个对象 便可以执行hdfs的操作,这些操作跟shall命令相似,下载 上传 查看 删除 创建 等命令
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
URI uri = new URI("hdfs://linux01:8020");
String userName="root";
FileSystem fs = FileSystem.newInstance(uri, conf, userName);
//创建文件夹
boolean b = fs.mkdirs(new Path("/a/b/c"));
//删除--可以递归删除--删除前可以判断文件是否存在
boolean b1 = fs.delete(new Path("/a/b"), true);
//移动 改名
boolean b2 = fs.rename(new Path("/a/b"), new Path("/a/c"));
//查看
RemoteIterator<LocatedFileStatus> ite = fs.listFiles(new Path("/a/b"), true);
while(ite.hasNext()){
LocatedFileStatus next = ite.next();
//通过next可以获得很多信息
next.getPath();
next.getBlockSize();
}
FileStatus[] fss = fs.listStatus(new Path("/a/b"));
for (FileStatus file : fss) {
//可以判断是文件还是文件夹
boolean b3 = file.isFile();
}
//读操作 读一个字节
FSDataInputStream inputStream = fs.open(new Path("/wc/wc.txt"));
int read = inputStream.read();
//读操作 读一行
BufferedReader buff = new BufferedReader(new InputStreamReader(inputStream));
String line = buff.readLine();
//写操作--追加写
FSDataOutputStream append = fs.append(new Path("/wc/wc.txt"));
append.writeUTF("aaa");
}

2.6 原理加强
2.6 .1 数据存储位置
数据通过物理切块,分别存储在不同的从节点中,默认每个物理切块保存三个副本。
每个从节点都有统一的数据存储目录。
当从节点向主节点注册的时候,主节点会给从节点分配:统一的集群id、统一的数据存储目录、从节点的专属uuid 。
复制
.png)]
2.6.2 上传数据流程
1、客户端请求上传数据
2、nameNode接收请求,检查从节点资源,返回可以接收数据的从节点
3、客户端向返回的可接收数据的从节点进行发送数据,发送的时候以数据包的形式进行发送。
4、从节点之间建立连接,一方面从节点接收客户端发来的数据,并写到本地磁盘中;
另一方面,在接收数据的同时向其他从节点发送数据,其他从节点也是接收数据并写到本地磁盘, 并且在接收数据的同时向其他从节点发送数据。
5、从节点向客户端反馈第一块数据传送成功
6、主节点更新元数据信息。
7、开始传送第二块数据,重复上面2-7步骤。
(对本地的传输时通过本地输入输出来读写,其他的是通过网络输入输出流来读写)
2.6.3 下载数据流程
1、客户端请求下载数据
2、nameNode接收到请求,返回元数据信息(返回数据存储的位置)
3、客户端拿着元数据信息,去找相应的从节点进行下载第一块数据
4、从节点与客户端进行数据的传输
5、开始进行第二块数据的传输,重复以上步骤。
2.6.4 主从节点之间的通信
主从节点直接有着实时的通信。
1、从节点向主节点注册,主节点向从节点返回 统一的集群id、统一的数据存储路径、从节点的唯一id
2、从节点汇报自己的存储资源
3、心跳机制:工作状态、存储资源、领取任务等。
(心跳机制:主从节点之间每3秒进行一次通信,若从节点10次没有汇报,则主节点会记录它有问题,若5分钟还是没有汇报,则会提出此从节点)
2.6.5
简单来讲:1、nameNode记录元数据信息,不断更新的元数据会生成日志。
2、nameNode将日志发送给SecondNameNode,SecondNameNode将日志信息记录到本地,并 且不断下载namenode记录元数据信息的镜像文件,并且将不断更新的元数据信息和日志 拼到一起再发送给nameNode。
3、删除老的日志文件
三、MR
3.1 MR程序设计思想
3.2 MR程序注意事项
MR:对数据进行处理分析
mapper阶段:对数据进行一行一行的处理
reducer阶段:主要是对数据进行聚合
注: 1、map方法一行执行一次;reduce方法一个key执行一次
2、mapper阶段,setup方法会在map方法之前执行
reducer阶段,cleanup会在reduce方法全部执行完后再执行--可用于对最终数据再进行处理
3、可以只有mapper阶段,而无reducer阶段
4、当一个MR程序无法得到最终结果时,可以使用多个MR程序--MR串联。这也暴露出了MR的弊端: MR程序模式单一,不适合做迭代式的数据处理,顶多要求两次MR就要最终结果,否则太耗资源
5、可以自己设置排序规则、分区规则、分组合并规则
6、输出的数据类型可以改为Sequence类型,能够更快的读取数据
7、由于MR中间有网络传输,所以传送的数据要实现序列化,为了简化序列化的内容,可以使用 hadoop的序列化方式。

3.3 MR案例
MovieBean:
//{"movie":"661","rate":"3","timeStamp":"978302109","uid":"1"}
//写一个movieBean实现hadoop的序列化---实现Writable接口---目的是简化序列化的内容,去掉一些不重要的内容
public class MovieWritable implements Writable {
private String movie;
private double rate;
private String timeStamp;
private int uid;
//hadoop的序列化
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(movie);
dataOutput.writeDouble(rate);
dataOutput.writeUTF(timeStamp);
dataOutput.writeInt(uid);
}
//hadoop的反序列化
@Override
public void readFields(DataInput dataInput) throws IOException {
movie = dataInput.readUTF();
rate = dataInput.readDouble();
timeStamp=dataInput.readUTF();
uid=dataInput.readInt();
}
@Override
public String toString() {
return "MovieWritable{" +
"movie='" + movie + '\'' +
", rate=" + rate +
", timeStamp='" + timeStamp + '\'' +
", uid=" + uid +
'}';
}
public String getMovie() {
return movie;
}
public void setMovie(String movie) {
this.movie = movie;
}
public double getRate() {
return rate;
}
public void setRate(double rate) {
this.rate = rate;
}
public String getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(String timeStamp) {
this.timeStamp = timeStamp;
}
public int getUid() {
return uid;
}
public void setUid(int uid) {
this.uid = uid;
}
public MovieWritable() {
}
public MovieWritable(String movie, double rate, String timeStamp, int uid) {
this.movie = movie;
this.rate = rate;
this.timeStamp = timeStamp;
this.uid = uid;
}
}

MR程序:
/*
* 案例:统计每部电影评分最高的前5条评论的评分
* mapper阶段 key为 电影id value为MovieBean
* reducer阶段 key为movieBean,value为null
* 将movieBean存到集合中,然后集合按照评分降序排列,取前五条进行输出
* main方法 执行
*
* */
public class Work2_movie2 {
//Mapper阶段
static class Movie2_Mapper extends Mapper<LongWritable, Text,Text, MovieWritable>{
Text k=new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
String s = value.toString();
MovieWritable movieWritable = JSON.parseObject(s, MovieWritable.class);
String mid = movieWritable.getMovie();
k.set(mid);
context.write(k,movieWritable);
} catch (Exception e) {
e.printStackTrace();
}
}
}
//Reducer阶段
static class Movie2_Reducer extends Reducer<Text,MovieWritable,MovieWritable, NullWritable>{
@Override
protected void reduce(Text key, Iterable<MovieWritable> values, Context context) throws IOException, InterruptedException {
//这个集合要放在reducer方法里,每一个key都对应一个自己的集合,然后对集合排 序,最后将前五输出。这样的话得到的结果就是每个电影的前五评论
ArrayList<MovieWritable> list=new ArrayList<>();
for (MovieWritable value : values) {
/*
这里不可以直接list.add(value)
因为迭代器进行遍历数据的时候,使用的是一个对象来遍历所有的数据,所以,若 直接add则加的是同一个对象,只不过这个对象的数据随着循环遍历而值进行不断更 改而已,最后添加到集合中的
全都是最后一次更新的数据
*/
MovieWritable mb=new MovieWritable();
mb.setMovie(value.getMovie());
mb.setRate(value.getRate());
mb.setTimeStamp(value.getTimeStamp());
mb.setUid(value.getUid());
list.add(mb);
}
Collections.sort(list, new Comparator<MovieWritable>() {
@Override
public int compare(MovieWritable o1, MovieWritable o2) {
return Double.compare(o2.getRate(),o1.getRate());
}
});
for (int i = 0; i <Math.min(5,list.size()); i++) {
MovieWritable movieWritable = list.get(i);
context.write(movieWritable,NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Movie2");
job.setMapperClass(Movie2_Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MovieWritable.class);
job.setReducerClass(Movie2_Reducer.class);
job.setOutputKeyClass(MovieWritable.class);
job.setOutputValueClass(NullWritable.class);
//job.setNumReduceTasks(2); 这里可以设置由几个reducer任务来执行,有几个reducer执行,便会输出几个结果文件
//这里FileInputFormat 导包的时候选最长的 带lib的 setInputPaths要选以path来代替路径File的
FileInputFormat.setInputPaths(job,new Path("E:\\work2\\mrdata\\movie\\input"));
FileOutputFormat.setOutputPath(job,new Path("E:\\work2\\mrdata\\movie\\output_m_2"));
//提交任务 等待完成 成功则返回true
boolean b = job.waitForCompletion(true);
System.out.println(b);
}
}

3.4 MR原理
3.4.1 MR处理数据原理简述
1、根据文件个数、文件大小、切片的大小来划分任务切片,一个任务切片执行一个mapperTask
2、mapperTask进行读取数据,然后处理数据,最后发送出去。全程数据都是以K-V的形式进行处理。
map方法一行数据执行一次、一行数据生成一个K-V
3、数据在mapper阶段,执行完map方法后,数据要经过分区器,通过hash值取模来进行分区,然后区内 排序。
4、reducer阶段,拉取属于自己区的数据,然后进行分组合并排序,一组执行一次reduce方法,最终将 结果输出
3.4.2 MR处理数据原理详解
1、根据文件个数、文件大小、切片的大小来划分任务切片,一个任务切片执行一个mapperTask
2、MT读取数据,以KV的形式进行读取,也是以KV的形式进行处理,一行执行一次map方法
3、map方法将处理后的数据KV进行输出到reducer。中间要经过分区器,对数据进行分区,生成分区编号
4、分区后的数据会进到环形缓冲区,待缓冲区存到一定量的时候便会将数据溢出到磁盘,溢出到磁盘的时候进行排序、分区存储。
5、环形缓冲区假设存到80%开始溢出,此时一遍数据接着存入,另一边溢出,待存到快100%的时候还没溢出完便会阻塞等待溢出完。
6、由于溢出会有多次溢出,为了减少Reducer拉取数据的次数,磁盘会对多次溢出的数据再次按照分区进行组合、排序。
7、Reducer拉取属于自己区的数据,然后进行分组、排序,放到迭代器里
8、迭代器进行遍历数据、聚合
9、最终将结果发送到hdfs中。
在MR程序中会进行三次排序:
1、从环形缓冲区中溢出的时候会进行排序,将同一个区的数据放到一起并排序
0区:a a a a b b c 1区:a a a a b b c
0区:a a a a b b c 1区:a a a a b b c
2、多次溢出之后,会再次进行区内排序,将同一个分区的数据放到一起并排序
0区:a a a a a a a a b b b b c c 1区:a a a a a a a a b b b b c c
3、reducer抓取数据后会进行分组、合并、聚合
0区:a a a a a a a a b b b b c c
1区:a a a a a a a a b b b b c c
==>0区:a-8 b-4 c-2
==>1区:a-8 b-4 c-2

3.5 数据倾斜问题
由于数据本身就有倾斜的问题,所以要对数据做一定的处理,解决数据倾斜的问题
一般三种方法:
一、增加Reducer模块。
二、改变分区规则
三、将数据打散(key拼上随机数),这样便一定程度解决了倾斜问题,但要多写一个MR程序进 行聚合结果
以将数据打散的方法为例
/*
*数据倾斜的处理方式
* 方式一:
* 将数据打散:将每个数据再拼一个随机数->随机数的设置要根据reducerTask的个数来确定
* 这样的话,便可以根据任务量来进行分区,优点是使任务量更加均衡,解决了数据倾斜的问题
* 缺点是,不能直接得到最终结果,需要再做一次MR程序,将上面的结果再聚合一下
*
* 方式二:多增加一些ReducerTask的个数
*方式三:改变默认的分区规则--默认是根据哈希值取模来分区
* 案例:统计单词的个数
* 文件中a a a a v v b b f s s 明显a很多 而f就一个 显然有数据倾斜问题,所以要是用上面的解决方案来进行解决数据倾斜的问题。
* */
public class Demo2 {
static class Word_Mapper extends Mapper<LongWritable, Text,Text, IntWritable>{
Random r=new Random();
int number=0;
Text k=new Text();
IntWritable v=new IntWritable(1);
//获得reducerTast的个数
@Override
protected void setup(Context context) throws IOException, InterruptedException {
number = context.getNumReduceTasks();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split("\\s+");
for (String word : words) {
//根据reducerTast的个数来确定随机数的范围
int i = r.nextInt(number);
String word2=word+" "+i;
k.set(word2);
context.write(k,v);
}
}
}
static class Word_Reducer extends Reducer<Text,IntWritable,Text,IntWritable>{
IntWritable v=new IntWritable();
Text k=new Text();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count=0;
for (IntWritable value : values) {
count++;
}
v.set(count);
String[] split = key.toString().split("\\s");
String word = split[0];
k.set(word);
context.write(k,v);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Word2_new");
job.setMapperClass(Word_Mapper.class);
job.setReducerClass(Word_Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(2);
FileInputFormat.setInputPaths(job,new Path("E:\\work2\\mrdata\\skew\\input"));
FileOutputFormat.setOutputPath(job,new Path("E:\\work2\\mrdata\\skew\\output3"));
boolean b = job.waitForCompletion(true);
System.out.println(b);
}
}

public class Demo2_2 {
static class Word2_Mapper extends Mapper<LongWritable, Text,Text, IntWritable>{
Text k=new Text();
IntWritable v=new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\\s+");
String word = split[0];
String count = split[1];
k.set(word);
v.set(Integer.parseInt(count));
context.write(k,v);
}
}
static class Word2_Reducer extends Reducer<Text, IntWritable,Text, IntWritable>{
IntWritable v=new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum=0;
for (IntWritable value : values) {
int i = Integer.parseInt(value.toString());
sum+=i;
}
v.set(sum);
context.write(key,v);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word2");
job.setMapperClass(Word2_Mapper.class);
job.setReducerClass(Word2_Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job,new Path("E:\\work2\\mrdata\\skew\\output4_1"));
FileOutputFormat.setOutputPath(job,new Path("E:\\work2\\mrdata\\skew\\output4_2"));
boolean b = job.waitForCompletion(true);
System.out.println(b);
}
}

四、yarn 资源调度
4.1 yarn的简述
1、管理集群的运算资源:所有的从节点向主节点汇报自己的运算资源
2、根据机器的运算资源合理分配任务 , 进行任务的监控
去别于hdfs,Hdfs是管理集群的存储资源;yarn是管理集群的运算资源。
复制
4.2 yarn的安装
启动命令: start-yarn.sh stop-yarn.sh
1、在etc/hadoop/目录下 yarn-site.xml文件 写入一些配置,包括resourceManager和 nodeManager,以及各个从节点的可使用内存情况和计算机核数情况。
2、在hadoop的启动sbin中 添加指令
3、将配置文件发送给linux02和03
4.3 yarn的原理
1 提交job,ResourceManager生成appmaster接收这个任务,分析是什么类型的job,然后分配资源, 将任务分配给不同的子节点nodeManager
2 这个nodeManager接收任务并开启容器启动程序 MRMaster
3 MRAppMaster根据作业计算出需要的容器数量,向ResourceManager申请容器
4 ResourceManager选择NodeManager 创建容器启动YarnChild,运行不同的task任务
5 任务结束以后nodemanager会汇报自己容器的任务进度 , 当所有的任务结束以后 释放容器
提交job,ResourceManager接收任务,通过appMaster分析是什么任务类型,然后计算节点的运算资源,分配任务给nodeManager;nodeManager接收任务,启动MRMaster容器启动程序,MRMaster计算任务量需要多少容器,然后向ResourceManager申请容器,ResourceManager返回ok,然后启动yarnChild开始执行任务,然后节点会向ResourceManager进行任务进度汇报。
1、客户端client向YARN主节点ResourceManager提交作业job
2、主节点ResourceManager在某个DataNode从节点上启动一个Container运行
applicationMaster(运行应用的管理者)
注:Container是一个容器 其中放了内存和硬盘空间
applicationMaster是临时的作业管理者, 相当于项目经理 。applicationMaster根据作业的大 小等情况,可以向ResourceManager主节点申请内存空间、cpu以及硬盘等资源。
3、applicationMaster向ResourceManager请求资源,为了运行MapReduce中所有的任务;
ResourceManager将分配nodeManager上的资源,并告知applicationMaste申请的资源都在那台机 器上。
4、applicationMaster联系nodeManager,启动对应Container中的相关task(map task,reduce task)。
5、运行的task会实时的向applicationMaster进行汇报(心跳机制),用于监听整个应用。
6、当所有task(reduceTask)运行完成,applicationMaste告知ResourceManager,销毁applicationMaste。

4.4 yarn实例
/**
* 提交Job
* 1) 修改运行模式 yarn
* 2) 配置resourcemanage的位置
* 3) 将项目打包
* 4) HDFS文件系统
* 5) 用户名
* 6) 跨平台参数 true
*
*/
步骤:1、在MR程序中,main方法中修改一些配置-->
// 1 设置默认的运行模式 yarn---conf.set("mapreduce.framework.name", "yarn");
// 2 RM机器的位置conf.set("yarn.resourcemanager.hostname", "linux01");
// 3 执行项目的jar包位置--本类名.class-->job.setJarByClass(Work2_word.class);
// 4 输入输出路径全改为hdfs集群上的路径-->输入路径改为hdfs原因是读取资源的时候就 在本机器上,更加快捷
FileInputFormat.setInputPaths(job ,new Path("/wc/"));
FileOutputFormat.setOutputPath(job,new Path("/wc_res3"));
2、将程序打包成jar:点maven栏的Lifesycle中的package将项目打包,并且复制到d:\\work
3、开启hdfs集群-->start-dfs.sh 开启yarn集群-->start-yarn.sh
4、将jar包上传到hdfs中 /wc.jar
5、执行运行程序的命令:hadoop jar /wc.jar com.doit05.demo.Work1_word(src.java
后的类的全路径名)
6、程序便在yarn集群上运行。

/*
* 在本项目中有个文件word.txt
* 统计这个文件中每个单词的个数
*
* */
public class Work2_word {
static class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
@Override
protected void map(LongWritable key, Text line, Context context) throws IOException, InterruptedException {
String lineStr = line.toString();
String[] words = lineStr.split("\\s+");
for (String word : words) {
Text kout = new Text(word);
IntWritable vout = new IntWritable(1);
context.write(kout,vout);
}
}
}
/**
* 继承Reducer类
*/
static class WordCountReducer extends Reducer<Text, IntWritable ,Text , IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int cnt = 0 ;
for (IntWritable value : values) {
cnt++ ;
}
context.write(key , new IntWritable(cnt));
}
}
// 3 运行程序
public static void main(String[] args) throws Exception {
/*
* 设置用户名--默认操作的文件系统--跨平台参数--都可以省去,因为虚拟机上有一些配置文件
* 当new Configuration()的·时候,虚拟机会寻找这个conf配置文件,所以已经配置的和默认的配置就不用再写了
* jar包的位置不要写死
* */
Configuration conf = new Configuration();
// 1 设置默认的运行模式 yarn
conf.set("mapreduce.framework.name", "yarn");
// 2 RM机器的位置
conf.set("yarn.resourcemanager.hostname", "linux01");
Job job = Job.getInstance(conf, "wordcount");
// 执行项目的jar包位置--本类名.class
job.setJarByClass(Work2_word.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(1);
//输入输出路径全改为hdfs集群上的路径
FileInputFormat.setInputPaths(job ,new Path("/wc/"));
FileOutputFormat.setOutputPath(job,new Path("/wc_res3"));
boolean b = job.waitForCompletion(true);
System.out.println(b);
}
}

五、Zookeeper分布式协调工具
5.1 zk介绍
ZooKeeper是分布式应用系统协调工具,简而言之是集群的管理者。监视集群中各个节点的状态,提供上下线感知,根据状态反馈执行相应的操作。
作用有:读数据、写数据、监控通知
zk的作用很简单,就是提供协调服务:帮使用者存储一些信息(存储各节点的状态信息)
帮使用者读取一些信息(读取状态信息)
帮使用者监控节点状态变化,并将状态变化反馈给使用者
应用有:
分布式系统中的主节点选举! 比如hbase中的老大的产生 Hmaster(HA)
分布式系统中的主从节点感知!
分布式系统中的配置文件同步!
系统服务器的动态上下线感知!!!
再hadoop集群中,你需要配置,配置namenode datanode,配置主从节点,以及副本个数等一些配置
在yarn中,yarn是进行资源的管理。需要设置ResourceManager和NodeManager,及各个节点的可使用资源情况,还有一些日志的存放位置,以及存留时间等
而zookeeper是集群的管理者,比如说hadoop,主节点宕机后就会无法工作,所以使用zookeeper来进行资源的管理,会启动备用主节点。zookeeper本身也是一个集群,有leader和folwer,leader挂掉之后,会重新选举leader,所以这样一来保证了集群的安全性。
复制

zookeeper:集群的管理者。当集群中的主节点宕机后,系统便无法工作,所以,有了zookeeper后,主节点宕机,zk会感知到,然后启动备用主节点,解决了主节点宕机的问题。而且可以监控集群中各个节点的状态变化,并且zookeeper本身也是一个集群,有一个leader和follwer,能够保证数据的可靠性。
复制
5.2 zookeeper的角色以及原理
zk中有一个leader和其他的follower
leader通过‘选举’产生-->zk有自己的选举机制
原理:
1、所有请求由leader接收(比如创建节点)-->leader将事物请求转换成一个Proposal,并给他生成唯一的事物id-->leader将proposal放入每个follower
2、follower接收到proposal后首先以事物日志的方式写入磁盘,成功后返回leader一个ack响应
3、leader只要收到过半的follower的ack响应,就会进行广播,进行提交proposal,同时leader也会完成proposal的提交。
5.3 zk的安装
上传-->解压-->修改配置文件-->分发-->写入一键启动脚本-->启动
配置了环境变量 可以在任意位置执行启动命令:
zk.sh start
zk.sh status
zk.sh stop
(主节点):leader
(从节点):follower
hdfs的启动和停止命令:
start-dfs.sh
stop-dfs.sh
主节点:nameNode
从节点:dataNode
yarn的启动和停止命令:
start-yarn.sh
stop-yarn.sh
主节点:resourceManager
从节点:nodeManager
HBase的启动和停止命令:
start-hbase.sh
stop-hbase.sh
(主节点):HMaster
(从节点):HRegionserver

5.4 shell客户端
进入客户端:在bin目录下:bin/zkCli.sh
1、创建节点
创建节点 一定要给值(value)
create -s -e -p /node1 value
-s 有序节点
-e 临时节点
-p 永久节点
create -s -e /node1 name1
2、查看节点
ls / 查看/下所有的字节点
get /node1 查看node1节点的信息
3、删除和更新节点
rmr /node1 删除节点
set /node1 new_value 更新节点

5.5 zokeeper --java端操作
首先在java中添加一些依赖
要先获得zk的对象实例,然后便可以执行一些操作
复制
1、zk基本命令在java中的操作
创建节点-------create
查看节点名字---getChildren
查看节点值-----getData
更新节点值-----set
删除节点-------delete
判断节点存在----exists
public class Demo1 {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("linux01:2181,linux02:2181,linux03:2181", 2000, null);
create(zk);
//get(zk);
// ls(zk);
//exists(zk);
//delete(zk);
zk.close();
}
public static void create(ZooKeeper zk) throws Exception {
//创建永久无序节点--节点名为dns_1--节点值为new1
zk.create("/dns_1","new1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
public static void get(ZooKeeper zk) throws Exception {
//获取节点的信息
byte[] data = zk.getData("/dns_1", null, null);
System.out.println(new String(data));
}
public static void ls(ZooKeeper zk) throws Exception {
//获取根目录下所有的子节点
List<String> list = zk.getChildren("/", null);
System.out.println(list);
}
public static void exists(ZooKeeper zk) throws Exception {
//判断节点是否存在
Stat stat = zk.exists("/dns_1", true);
if (stat==null){
System.out.println("节点不存在");
}else{
System.out.println("节点存在");
}
}
public static void delete(ZooKeeper zk) throws Exception {
//删除节点 -1代表是当前版本 delete是递归删除
zk.delete("/dns_1",-1);
}
}

2、java端模拟zk的监控功能
--监控子节点的上下线感知
--监控子节点的状态-->上线、下线、更新
public class Demo2 {
static ZooKeeper zk=null;
public static void getConnect() throws Exception{
zk = new ZooKeeper("linux01:2181,linux02:2181,linux03:2181", 2000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("发现一个客户.....");
}
});
}
public static void getServerList() throws Exception{
//获得根目录下所有的子节点 以及子节点的值---并且做监控,当子节点的个数或者值发生改变时立马做出回应
List<String> list = zk.getChildren("/", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
List<String> list = null;
try {
list = zk.getChildren("/", this);
System.out.println("当前节点状态信息发生了变化...");
for (String name : list) {
byte[] data = zk.getData("/" + name, null, null);
System.out.println("变化后的正在工作的节点为:"+name+"--"+new String(data));
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
for (String name : list) {
byte[] data = zk.getData("/" + name, null, null);
System.out.println("当前正在工作的节点为:"+name+"--"+new String(data));
}
}
public static void server() throws InterruptedException {
System.out.println("当前节点正在运行......");
Thread.sleep(Integer.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
getConnect();
getServerList();
server();
}
}

六、集群之间的对比
1、页面的访问端口
linux01:16010 hbase的页面访问端口
linux01:9870 hadoop的页面访问端口 hdfs的页面访问
linux01:8088 yarn的页面访问端口
2181 zookeeper的端口
8020 JDBC连接sql的端口