Hadoop简介

营长技术分享976

Hadoop(Hdfs\MR\Yarn\ZK)

一、Hadoop简介

1.1 hadoop简介

作用:海量数据的存储,海量数据的分析

组件:hdfs--------数据的存储

  MapReduce---数据的的分析

  yarn--------资源的调度

  comments----工具

优点:成本低--普通电脑就可以

  易扩容--扩容机制

  可靠----通过多个地方保存副本

  高效----分布式存储

一键开启命令:start-dfs.sh

一键关闭命令:stop-dfs.sh

访问文件系统页面:linux01:8020


1.2 hadoop分布式文件管理系统核心原理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-On5zOgPg-1657697962243)(img\image-20210620141336631.png)]


二、Hdfs

2.1 hdfs 介绍

hdfs 是一种分布式文件管理系统,可以操作多台机器,目前在学习中,不可能每个人搞几台电脑,来组建一个集群,于是通过linux虚拟机创建三台虚拟机,每台虚拟机的内存是2G磁盘是20G,20G的磁盘最终还是写到本机上。


2.2 hadoop集群的搭建

1、在linux01中下载安装hadoop系统,然后修改配置文件,最后分发给linux02和linux03


注:在配置文件中,指定了三台虚拟机各自的角色,linux01作为主节点和从节点,而2和3作为从节点,当把配置好的hadoop包分发给linux02和linux03的时候,linux02和linux03便接受了这个配置,作为从节点,而且三台虚拟机的连接是通过提前设置好的免密协议。有了免密协议,有了配置文件,此时初始化namenode,由于从节点要跟主节点进行连接,要向主节点进行汇报,所以必须先开启主节点,之后再开启从节点。(因为麻烦,所以后面设置了一键全部开启)。都开启之后,hadoop集群便搭建成功,此时三台虚拟机便构成了一个hadoop集群,其中linux01是主节点,linux02和linux03是从节点。有什么操作在主节点上进行操作,具体的执行由从节点来执行。


2、开启hadoop分布式文件管理系统后与不开启之前三台虚拟机的区别:

在未开启hadoop的时候,三台虚拟机只是三台可以免密传输的虚拟机,各自执行着各自的任务

开启hadoop之后,此三台虚拟机不仅是三台可以操作的虚拟机,而且形成了一个hadoop集群,有了各自的角色;当我在linux01中上传了一个文件的时候,会自动在三个从节点上保存三个副本,也就是linux01、02、03都存有此文件。


3、数据在存入的时候存到了哪?

 1、namenode存储元数据(数据存储位置信息)的位置是/opt/hdpdata/name

 2、datanode存储数据的位置是/opt/hdpdata/data 后面有个很深的目录,数据就存放在那,

  而且更改了名字,nameNode提供的访问目录是虚拟目录

 数据存储的最终目的地还是落在了本地磁盘上。


2.3 hdfs 组成结构图


)]

2.4 shell命令–hdfs的操作命令

shell命令本来是在bin目录下才能执行,配置环境变量后,便可以在任意位置执行。

上传:hdfs dfs -put 本地文件全路径  /hdfs路径

下载:hdfs dfs -get /hdfs路径 /本地路径

创建文件夹:hdfs dfs -mkdir -p /a/b/c

移动:hdfs dfs -mv /x /y

改名:hdfs dfs -mv /y /yy

删除:hdfs dfs -rm -r /aa

  hdfs dfs -rm /a/a.txt

查看:hdfs dfs -ls /a

  hdfs dfs -ls -a /

查看内存:hdfs dfs -df -h

  hdfs dfs -du -h /aa

查看文档: hdfs dfs -tail/head /a.txt

修改权限:hdfs dfs -chmod 750 /aa


2.5 maven使用–java中操作hdfs

1、maven是一个工具,通过它 可以在java上写shall命令、即可以在java上操作hdfs


2、首先在java中配置maven的运行环境,和一些命令,以及在本地配置hadoop的运行环境


3、要想在java中操作hdfs 首先要获得hdfs的操作对象,然后通过这个对象 便可以执行hdfs的操作,这些操作跟shall命令相似,下载 上传 查看 删除 创建 等命令

public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        URI uri = new URI("hdfs://linux01:8020");

        String userName="root";

        FileSystem fs = FileSystem.newInstance(uri, conf, userName);


        //创建文件夹

        boolean b = fs.mkdirs(new Path("/a/b/c"));


        //删除--可以递归删除--删除前可以判断文件是否存在

        boolean b1 = fs.delete(new Path("/a/b"), true);


        //移动 改名

        boolean b2 = fs.rename(new Path("/a/b"), new Path("/a/c"));

        

        //查看

        RemoteIterator<LocatedFileStatus> ite = fs.listFiles(new Path("/a/b"), true);

        while(ite.hasNext()){

            LocatedFileStatus next = ite.next();

            //通过next可以获得很多信息

            next.getPath();

            next.getBlockSize();

        }

        FileStatus[] fss = fs.listStatus(new Path("/a/b"));

        for (FileStatus file : fss) {

            //可以判断是文件还是文件夹

            boolean b3 = file.isFile();

        }


        //读操作 读一个字节

        FSDataInputStream inputStream = fs.open(new Path("/wc/wc.txt"));

        int read = inputStream.read();

        //读操作 读一行

        BufferedReader buff = new BufferedReader(new InputStreamReader(inputStream));

        String line = buff.readLine();


        //写操作--追加写

        FSDataOutputStream append = fs.append(new Path("/wc/wc.txt"));

        append.writeUTF("aaa");

    } 


2.6 原理加强

2.6 .1 数据存储位置

数据通过物理切块,分别存储在不同的从节点中,默认每个物理切块保存三个副本。

每个从节点都有统一的数据存储目录。

当从节点向主节点注册的时候,主节点会给从节点分配:统一的集群id、统一的数据存储目录、从节点的专属uuid 。


复制


.png)]

2.6.2 上传数据流程

1、客户端请求上传数据

2、nameNode接收请求,检查从节点资源,返回可以接收数据的从节点

3、客户端向返回的可接收数据的从节点进行发送数据,发送的时候以数据包的形式进行发送。

4、从节点之间建立连接,一方面从节点接收客户端发来的数据,并写到本地磁盘中;

 另一方面,在接收数据的同时向其他从节点发送数据,其他从节点也是接收数据并写到本地磁盘,   并且在接收数据的同时向其他从节点发送数据。

5、从节点向客户端反馈第一块数据传送成功

6、主节点更新元数据信息。

7、开始传送第二块数据,重复上面2-7步骤。

(对本地的传输时通过本地输入输出来读写,其他的是通过网络输入输出流来读写)


2.6.3 下载数据流程

1、客户端请求下载数据

2、nameNode接收到请求,返回元数据信息(返回数据存储的位置)

3、客户端拿着元数据信息,去找相应的从节点进行下载第一块数据

4、从节点与客户端进行数据的传输

5、开始进行第二块数据的传输,重复以上步骤。



2.6.4 主从节点之间的通信

主从节点直接有着实时的通信。

1、从节点向主节点注册,主节点向从节点返回 统一的集群id、统一的数据存储路径、从节点的唯一id

2、从节点汇报自己的存储资源

3、心跳机制:工作状态、存储资源、领取任务等。

(心跳机制:主从节点之间每3秒进行一次通信,若从节点10次没有汇报,则主节点会记录它有问题,若5分钟还是没有汇报,则会提出此从节点)


2.6.5

简单来讲:1、nameNode记录元数据信息,不断更新的元数据会生成日志。

  2、nameNode将日志发送给SecondNameNode,SecondNameNode将日志信息记录到本地,并 且不断下载namenode记录元数据信息的镜像文件,并且将不断更新的元数据信息和日志 拼到一起再发送给nameNode。

  3、删除老的日志文件


三、MR

3.1 MR程序设计思想

3.2 MR程序注意事项

MR:对数据进行处理分析

 mapper阶段:对数据进行一行一行的处理

 reducer阶段:主要是对数据进行聚合

注: 1、map方法一行执行一次;reduce方法一个key执行一次


    2、mapper阶段,setup方法会在map方法之前执行

       reducer阶段,cleanup会在reduce方法全部执行完后再执行--可用于对最终数据再进行处理

       

    3、可以只有mapper阶段,而无reducer阶段

    

    4、当一个MR程序无法得到最终结果时,可以使用多个MR程序--MR串联。这也暴露出了MR的弊端: MR程序模式单一,不适合做迭代式的数据处理,顶多要求两次MR就要最终结果,否则太耗资源

    

    5、可以自己设置排序规则、分区规则、分组合并规则

    

    6、输出的数据类型可以改为Sequence类型,能够更快的读取数据

    

    7、由于MR中间有网络传输,所以传送的数据要实现序列化,为了简化序列化的内容,可以使用   hadoop的序列化方式。

   


3.3 MR案例

MovieBean:



//{"movie":"661","rate":"3","timeStamp":"978302109","uid":"1"}

//写一个movieBean实现hadoop的序列化---实现Writable接口---目的是简化序列化的内容,去掉一些不重要的内容

public class MovieWritable implements Writable {

    private String movie;

    private  double rate;

    private String timeStamp;

    private int uid;



        //hadoop的序列化

    @Override

    public void write(DataOutput dataOutput) throws IOException {

        dataOutput.writeUTF(movie);

        dataOutput.writeDouble(rate);

        dataOutput.writeUTF(timeStamp);

        dataOutput.writeInt(uid);

    }


    //hadoop的反序列化

    @Override

    public void readFields(DataInput dataInput) throws IOException {

        movie = dataInput.readUTF();

        rate = dataInput.readDouble();

        timeStamp=dataInput.readUTF();

        uid=dataInput.readInt();


    }





    @Override

    public String toString() {

        return "MovieWritable{" +

                "movie='" + movie + '\'' +

                ", rate=" + rate +

                ", timeStamp='" + timeStamp + '\'' +

                ", uid=" + uid +

                '}';

    }


    public String getMovie() {

        return movie;

    }


    public void setMovie(String movie) {

        this.movie = movie;

    }


    public double getRate() {

        return rate;

    }


    public void setRate(double rate) {

        this.rate = rate;

    }


    public String getTimeStamp() {

        return timeStamp;

    }


    public void setTimeStamp(String timeStamp) {

        this.timeStamp = timeStamp;

    }


    public int getUid() {

        return uid;

    }


    public void setUid(int uid) {

        this.uid = uid;

    }


    public MovieWritable() {

    }


    public MovieWritable(String movie, double rate, String timeStamp, int uid) {

        this.movie = movie;

        this.rate = rate;

        this.timeStamp = timeStamp;

        this.uid = uid;

    }


}


MR程序:



/*

* 案例:统计每部电影评分最高的前5条评论的评分

* mapper阶段 key为 电影id value为MovieBean

* reducer阶段 key为movieBean,value为null

*   将movieBean存到集合中,然后集合按照评分降序排列,取前五条进行输出

* main方法 执行

*

* */

public class Work2_movie2 {

    //Mapper阶段

    static  class Movie2_Mapper extends Mapper<LongWritable, Text,Text, MovieWritable>{

        Text k=new Text();

        @Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            try {

                String s = value.toString();

                MovieWritable movieWritable = JSON.parseObject(s, MovieWritable.class);

                String mid = movieWritable.getMovie();

                k.set(mid);

                context.write(k,movieWritable);

            } catch (Exception e) {

                e.printStackTrace();

            }

        }

    }


    //Reducer阶段

    static class Movie2_Reducer extends Reducer<Text,MovieWritable,MovieWritable, NullWritable>{


        @Override

        protected void reduce(Text key, Iterable<MovieWritable> values, Context context) throws IOException, InterruptedException {

            //这个集合要放在reducer方法里,每一个key都对应一个自己的集合,然后对集合排 序,最后将前五输出。这样的话得到的结果就是每个电影的前五评论

            ArrayList<MovieWritable> list=new ArrayList<>();

            for (MovieWritable value : values) {

                /*

                这里不可以直接list.add(value)

                因为迭代器进行遍历数据的时候,使用的是一个对象来遍历所有的数据,所以,若   直接add则加的是同一个对象,只不过这个对象的数据随着循环遍历而值进行不断更 改而已,最后添加到集合中的

                全都是最后一次更新的数据

                */

                MovieWritable mb=new MovieWritable();

                mb.setMovie(value.getMovie());

                mb.setRate(value.getRate());

                mb.setTimeStamp(value.getTimeStamp());

                mb.setUid(value.getUid());

                list.add(mb);

            }


            Collections.sort(list, new Comparator<MovieWritable>() {

                @Override

                public int compare(MovieWritable o1, MovieWritable o2) {

                    return Double.compare(o2.getRate(),o1.getRate());

                }

            });


            for (int i = 0; i <Math.min(5,list.size()); i++) {

                MovieWritable movieWritable = list.get(i);

                context.write(movieWritable,NullWritable.get());

            }

        }

    }


    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "Movie2");


        job.setMapperClass(Movie2_Mapper.class);

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(MovieWritable.class);


        job.setReducerClass(Movie2_Reducer.class);

        job.setOutputKeyClass(MovieWritable.class);

        job.setOutputValueClass(NullWritable.class);


        //job.setNumReduceTasks(2); 这里可以设置由几个reducer任务来执行,有几个reducer执行,便会输出几个结果文件


        //这里FileInputFormat 导包的时候选最长的 带lib的 setInputPaths要选以path来代替路径File的

        FileInputFormat.setInputPaths(job,new Path("E:\\work2\\mrdata\\movie\\input"));

        FileOutputFormat.setOutputPath(job,new Path("E:\\work2\\mrdata\\movie\\output_m_2"));


        //提交任务 等待完成 成功则返回true

        boolean b = job.waitForCompletion(true);

        System.out.println(b);


    }

}


3.4 MR原理

3.4.1 MR处理数据原理简述

1、根据文件个数、文件大小、切片的大小来划分任务切片,一个任务切片执行一个mapperTask

2、mapperTask进行读取数据,然后处理数据,最后发送出去。全程数据都是以K-V的形式进行处理。

 map方法一行数据执行一次、一行数据生成一个K-V

3、数据在mapper阶段,执行完map方法后,数据要经过分区器,通过hash值取模来进行分区,然后区内 排序。

4、reducer阶段,拉取属于自己区的数据,然后进行分组合并排序,一组执行一次reduce方法,最终将 结果输出



3.4.2 MR处理数据原理详解



1、根据文件个数、文件大小、切片的大小来划分任务切片,一个任务切片执行一个mapperTask

2、MT读取数据,以KV的形式进行读取,也是以KV的形式进行处理,一行执行一次map方法

3、map方法将处理后的数据KV进行输出到reducer。中间要经过分区器,对数据进行分区,生成分区编号

4、分区后的数据会进到环形缓冲区,待缓冲区存到一定量的时候便会将数据溢出到磁盘,溢出到磁盘的时候进行排序、分区存储。

5、环形缓冲区假设存到80%开始溢出,此时一遍数据接着存入,另一边溢出,待存到快100%的时候还没溢出完便会阻塞等待溢出完。

6、由于溢出会有多次溢出,为了减少Reducer拉取数据的次数,磁盘会对多次溢出的数据再次按照分区进行组合、排序。

7、Reducer拉取属于自己区的数据,然后进行分组、排序,放到迭代器里

8、迭代器进行遍历数据、聚合

9、最终将结果发送到hdfs中。


在MR程序中会进行三次排序:

1、从环形缓冲区中溢出的时候会进行排序,将同一个区的数据放到一起并排序

 0区:a a a a b b c 1区:a a a a b b c

 0区:a a a a b b c 1区:a a a a b b c

2、多次溢出之后,会再次进行区内排序,将同一个分区的数据放到一起并排序

 0区:a a a a a a a a b b b b c c  1区:a a a a a a a a b b b b c c

3、reducer抓取数据后会进行分组、合并、聚合

 0区:a a a a a a a a b b b b c c

 1区:a a a a a a a a b b b b c c

 ==>0区:a-8 b-4 c-2

 ==>1区:a-8 b-4 c-2


3.5 数据倾斜问题


 由于数据本身就有倾斜的问题,所以要对数据做一定的处理,解决数据倾斜的问题

 一般三种方法:

  一、增加Reducer模块。

  二、改变分区规则

  三、将数据打散(key拼上随机数),这样便一定程度解决了倾斜问题,但要多写一个MR程序进 行聚合结果


以将数据打散的方法为例

/*

 *数据倾斜的处理方式

 * 方式一:

 *   将数据打散:将每个数据再拼一个随机数->随机数的设置要根据reducerTask的个数来确定

 *   这样的话,便可以根据任务量来进行分区,优点是使任务量更加均衡,解决了数据倾斜的问题

 *   缺点是,不能直接得到最终结果,需要再做一次MR程序,将上面的结果再聚合一下

 *

 * 方式二:多增加一些ReducerTask的个数

 *方式三:改变默认的分区规则--默认是根据哈希值取模来分区

 * 案例:统计单词的个数

 *   文件中a a a a v v b b f  s s 明显a很多 而f就一个 显然有数据倾斜问题,所以要是用上面的解决方案来进行解决数据倾斜的问题。

 * */

 

 

public class Demo2 {

    static class Word_Mapper extends Mapper<LongWritable, Text,Text, IntWritable>{

        Random r=new Random();

        int number=0;

        Text k=new Text();

        IntWritable v=new IntWritable(1);

        //获得reducerTast的个数

        @Override

        protected void setup(Context context) throws IOException, InterruptedException {

            number = context.getNumReduceTasks();

        }


        @Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] words = value.toString().split("\\s+");

            for (String word : words) {

                //根据reducerTast的个数来确定随机数的范围

                int i = r.nextInt(number);

                String word2=word+" "+i;

                k.set(word2);

                context.write(k,v);

            }

        }

    }


    static class Word_Reducer extends Reducer<Text,IntWritable,Text,IntWritable>{

        IntWritable v=new IntWritable();

        Text k=new Text();

        @Override

        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

            int count=0;

            for (IntWritable value : values) {

                count++;

            }

            v.set(count);

            String[] split = key.toString().split("\\s");

            String word = split[0];

            k.set(word);

            context.write(k,v);

        }

    }


    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "Word2_new");


        job.setMapperClass(Word_Mapper.class);

        job.setReducerClass(Word_Reducer.class);


        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);


        job.setNumReduceTasks(2);


        FileInputFormat.setInputPaths(job,new Path("E:\\work2\\mrdata\\skew\\input"));

        FileOutputFormat.setOutputPath(job,new Path("E:\\work2\\mrdata\\skew\\output3"));


        boolean b = job.waitForCompletion(true);

        System.out.println(b);

    }

}




public class Demo2_2 {

    static class Word2_Mapper extends Mapper<LongWritable, Text,Text, IntWritable>{

        Text k=new Text();

        IntWritable v=new IntWritable();

        @Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] split = value.toString().split("\\s+");

            String word = split[0];

            String count = split[1];

            k.set(word);

            v.set(Integer.parseInt(count));

            context.write(k,v);

        }

    }


    static class Word2_Reducer extends Reducer<Text, IntWritable,Text, IntWritable>{

        IntWritable v=new IntWritable();

        @Override

        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

            int sum=0;

            for (IntWritable value : values) {

                int i = Integer.parseInt(value.toString());

                sum+=i;

            }

            v.set(sum);

            context.write(key,v);

        }

    }


    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "word2");


        job.setMapperClass(Word2_Mapper.class);

        job.setReducerClass(Word2_Reducer.class);


        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);


        job.setNumReduceTasks(1);


        FileInputFormat.setInputPaths(job,new Path("E:\\work2\\mrdata\\skew\\output4_1"));

        FileOutputFormat.setOutputPath(job,new Path("E:\\work2\\mrdata\\skew\\output4_2"));


        boolean b = job.waitForCompletion(true);

        System.out.println(b);

    }

}


四、yarn 资源调度

4.1 yarn的简述

 1、管理集群的运算资源:所有的从节点向主节点汇报自己的运算资源

 2、根据机器的运算资源合理分配任务 , 进行任务的监控

 去别于hdfs,Hdfs是管理集群的存储资源;yarn是管理集群的运算资源。

复制

4.2 yarn的安装

 启动命令: start-yarn.sh   stop-yarn.sh

 1、在etc/hadoop/目录下   yarn-site.xml文件 写入一些配置,包括resourceManager和 nodeManager,以及各个从节点的可使用内存情况和计算机核数情况。

 2、在hadoop的启动sbin中 添加指令

 3、将配置文件发送给linux02和03


4.3 yarn的原理

1 提交job,ResourceManager生成appmaster接收这个任务,分析是什么类型的job,然后分配资源, 将任务分配给不同的子节点nodeManager

2 这个nodeManager接收任务并开启容器启动程序 MRMaster

3 MRAppMaster根据作业计算出需要的容器数量,向ResourceManager申请容器

4 ResourceManager选择NodeManager 创建容器启动YarnChild,运行不同的task任务

5 任务结束以后nodemanager会汇报自己容器的任务进度 , 当所有的任务结束以后 释放容器

 提交job,ResourceManager接收任务,通过appMaster分析是什么任务类型,然后计算节点的运算资源,分配任务给nodeManager;nodeManager接收任务,启动MRMaster容器启动程序,MRMaster计算任务量需要多少容器,然后向ResourceManager申请容器,ResourceManager返回ok,然后启动yarnChild开始执行任务,然后节点会向ResourceManager进行任务进度汇报。


1、客户端client向YARN主节点ResourceManager提交作业job 


2、主节点ResourceManager在某个DataNode从节点上启动一个Container运行    

   applicationMaster(运行应用的管理者)

   注:Container是一个容器 其中放了内存和硬盘空间

   applicationMaster是临时的作业管理者, 相当于项目经理 。applicationMaster根据作业的大    小等情况,可以向ResourceManager主节点申请内存空间、cpu以及硬盘等资源。

3、applicationMaster向ResourceManager请求资源,为了运行MapReduce中所有的任务;

   ResourceManager将分配nodeManager上的资源,并告知applicationMaste申请的资源都在那台机    器上。

4、applicationMaster联系nodeManager,启动对应Container中的相关task(map task,reduce    task)。

5、运行的task会实时的向applicationMaster进行汇报(心跳机制),用于监听整个应用。

6、当所有task(reduceTask)运行完成,applicationMaste告知ResourceManager,销毁applicationMaste。


4.4 yarn实例

/**

         * 提交Job

         *    1) 修改运行模式   yarn

         *    2) 配置resourcemanage的位置

         *    3) 将项目打包

         *    4) HDFS文件系统

         *    5) 用户名

         *    6) 跨平台参数  true

         *

         */

步骤:1、在MR程序中,main方法中修改一些配置-->

  // 1 设置默认的运行模式 yarn---conf.set("mapreduce.framework.name", "yarn");

        // 2 RM机器的位置conf.set("yarn.resourcemanager.hostname", "linux01");

        // 3 执行项目的jar包位置--本类名.class-->job.setJarByClass(Work2_word.class);

        // 4 输入输出路径全改为hdfs集群上的路径-->输入路径改为hdfs原因是读取资源的时候就 在本机器上,更加快捷

              FileInputFormat.setInputPaths(job ,new Path("/wc/"));

              FileOutputFormat.setOutputPath(job,new Path("/wc_res3"));

     2、将程序打包成jar:点maven栏的Lifesycle中的package将项目打包,并且复制到d:\\work

  3、开启hdfs集群-->start-dfs.sh    开启yarn集群-->start-yarn.sh

  4、将jar包上传到hdfs中 /wc.jar

  5、执行运行程序的命令:hadoop jar /wc.jar com.doit05.demo.Work1_word(src.java

   后的类的全路径名)

  6、程序便在yarn集群上运行。


/*

* 在本项目中有个文件word.txt

* 统计这个文件中每个单词的个数

*

* */

public class Work2_word {

    static class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable>{


        @Override

        protected void map(LongWritable key, Text line, Context context) throws IOException, InterruptedException {

            String lineStr = line.toString();

            String[] words = lineStr.split("\\s+");

            for (String word : words) {

                Text kout = new Text(word);

                IntWritable vout = new IntWritable(1);

                context.write(kout,vout);

            }


        }

    }

    /**

     * 继承Reducer类

     */

    static class WordCountReducer extends Reducer<Text, IntWritable ,Text , IntWritable>{

        @Override

        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

            int cnt = 0 ;

            for (IntWritable value : values) {

                cnt++ ;

            }

            context.write(key , new IntWritable(cnt));

        }

    }


    // 3 运行程序

    public static void main(String[] args) throws Exception {


        /*

        * 设置用户名--默认操作的文件系统--跨平台参数--都可以省去,因为虚拟机上有一些配置文件

        * 当new Configuration()的·时候,虚拟机会寻找这个conf配置文件,所以已经配置的和默认的配置就不用再写了


        * jar包的位置不要写死

        * */

        Configuration conf = new Configuration();

        // 1 设置默认的运行模式  yarn

        conf.set("mapreduce.framework.name", "yarn");

        // 2 RM机器的位置

        conf.set("yarn.resourcemanager.hostname", "linux01");


        

        Job job = Job.getInstance(conf, "wordcount");

        // 执行项目的jar包位置--本类名.class

        job.setJarByClass(Work2_word.class);


        job.setMapperClass(WordCountMapper.class);

        job.setReducerClass(WordCountReducer.class);


        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);


        job.setNumReduceTasks(1);

        //输入输出路径全改为hdfs集群上的路径

        FileInputFormat.setInputPaths(job ,new Path("/wc/"));

        FileOutputFormat.setOutputPath(job,new Path("/wc_res3"));


        boolean b = job.waitForCompletion(true);

        System.out.println(b);

    }

}



五、Zookeeper分布式协调工具

5.1 zk介绍

 ZooKeeper是分布式应用系统协调工具,简而言之是集群的管理者。监视集群中各个节点的状态,提供上下线感知,根据状态反馈执行相应的操作。

 作用有:读数据、写数据、监控通知

 zk的作用很简单,就是提供协调服务:帮使用者存储一些信息(存储各节点的状态信息)

         帮使用者读取一些信息(读取状态信息)

         帮使用者监控节点状态变化,并将状态变化反馈给使用者

 应用有:

 分布式系统中的主节点选举!  比如hbase中的老大的产生 Hmaster(HA)

 分布式系统中的主从节点感知!

 分布式系统中的配置文件同步!

 系统服务器的动态上下线感知!!!

再hadoop集群中,你需要配置,配置namenode datanode,配置主从节点,以及副本个数等一些配置

在yarn中,yarn是进行资源的管理。需要设置ResourceManager和NodeManager,及各个节点的可使用资源情况,还有一些日志的存放位置,以及存留时间等

而zookeeper是集群的管理者,比如说hadoop,主节点宕机后就会无法工作,所以使用zookeeper来进行资源的管理,会启动备用主节点。zookeeper本身也是一个集群,有leader和folwer,leader挂掉之后,会重新选举leader,所以这样一来保证了集群的安全性。

复制

zookeeper:集群的管理者。当集群中的主节点宕机后,系统便无法工作,所以,有了zookeeper后,主节点宕机,zk会感知到,然后启动备用主节点,解决了主节点宕机的问题。而且可以监控集群中各个节点的状态变化,并且zookeeper本身也是一个集群,有一个leader和follwer,能够保证数据的可靠性。

复制

5.2 zookeeper的角色以及原理

zk中有一个leader和其他的follower

leader通过‘选举’产生-->zk有自己的选举机制


原理:

1、所有请求由leader接收(比如创建节点)-->leader将事物请求转换成一个Proposal,并给他生成唯一的事物id-->leader将proposal放入每个follower

2、follower接收到proposal后首先以事物日志的方式写入磁盘,成功后返回leader一个ack响应

3、leader只要收到过半的follower的ack响应,就会进行广播,进行提交proposal,同时leader也会完成proposal的提交。



5.3 zk的安装

上传-->解压-->修改配置文件-->分发-->写入一键启动脚本-->启动


配置了环境变量 可以在任意位置执行启动命令:

zk.sh start

zk.sh status

zk.sh stop

(主节点):leader

(从节点):follower


hdfs的启动和停止命令:

start-dfs.sh

stop-dfs.sh

主节点:nameNode

从节点:dataNode


yarn的启动和停止命令:

start-yarn.sh

stop-yarn.sh

主节点:resourceManager

从节点:nodeManager


HBase的启动和停止命令:

start-hbase.sh

stop-hbase.sh

(主节点):HMaster

(从节点):HRegionserver


5.4 shell客户端

进入客户端:在bin目录下:bin/zkCli.sh

1、创建节点

 创建节点 一定要给值(value)

 create -s -e -p /node1 value

 -s   有序节点

 -e   临时节点

 -p   永久节点

 create -s -e /node1 name1

2、查看节点

 ls / 查看/下所有的字节点

 get /node1  查看node1节点的信息

3、删除和更新节点

 rmr /node1   删除节点

 set /node1 new_value 更新节点


5.5 zokeeper --java端操作

首先在java中添加一些依赖

要先获得zk的对象实例,然后便可以执行一些操作

复制

1、zk基本命令在java中的操作

创建节点-------create

查看节点名字---getChildren

查看节点值-----getData

更新节点值-----set

删除节点-------delete

判断节点存在----exists


public class Demo1 {


    public static void main(String[] args) throws Exception {

        ZooKeeper zk = new ZooKeeper("linux01:2181,linux02:2181,linux03:2181", 2000, null);

       create(zk);

        //get(zk);

       // ls(zk);

        //exists(zk);

        //delete(zk);


        zk.close();

    }


    public static void create(ZooKeeper zk) throws Exception {

        //创建永久无序节点--节点名为dns_1--节点值为new1

        zk.create("/dns_1","new1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);


    }

    public static void get(ZooKeeper zk) throws Exception {

        //获取节点的信息

        byte[] data = zk.getData("/dns_1", null, null);

        System.out.println(new String(data));

    }

    public static void ls(ZooKeeper zk) throws Exception {

        //获取根目录下所有的子节点

        List<String> list = zk.getChildren("/", null);

        System.out.println(list);

    }

    public static void exists(ZooKeeper zk) throws Exception {

        //判断节点是否存在

        Stat stat = zk.exists("/dns_1", true);

        if (stat==null){

            System.out.println("节点不存在");

        }else{

            System.out.println("节点存在");

        }

    }

    public static void delete(ZooKeeper zk) throws Exception {

        //删除节点 -1代表是当前版本 delete是递归删除

        zk.delete("/dns_1",-1);

    }

}


2、java端模拟zk的监控功能

 --监控子节点的上下线感知

 --监控子节点的状态-->上线、下线、更新

 public class Demo2 {

    static ZooKeeper zk=null;

    public static void getConnect() throws Exception{

        zk = new ZooKeeper("linux01:2181,linux02:2181,linux03:2181", 2000, new Watcher() {

            @Override

            public void process(WatchedEvent watchedEvent) {

                System.out.println("发现一个客户.....");

            }

        });

    }


    public static void getServerList() throws Exception{

        //获得根目录下所有的子节点 以及子节点的值---并且做监控,当子节点的个数或者值发生改变时立马做出回应

        List<String> list = zk.getChildren("/", new Watcher() {

            @Override

            public void process(WatchedEvent watchedEvent) {

                List<String> list = null;

                try {

                    list = zk.getChildren("/", this);

                    System.out.println("当前节点状态信息发生了变化...");

                    for (String name : list) {

                        byte[] data = zk.getData("/" + name, null, null);

                        System.out.println("变化后的正在工作的节点为:"+name+"--"+new String(data));

                    }

                } catch (Exception e) {

                    e.printStackTrace();

                }


            }

        });


        for (String name : list) {

            byte[] data = zk.getData("/" + name, null, null);

            System.out.println("当前正在工作的节点为:"+name+"--"+new String(data));

        }


    }


    public static void server() throws InterruptedException {

        System.out.println("当前节点正在运行......");

        Thread.sleep(Integer.MAX_VALUE);

    }


    public static void main(String[] args) throws Exception {

        getConnect();

        getServerList();

        server();


    }

}


六、集群之间的对比

1、页面的访问端口

linux01:16010  hbase的页面访问端口

linux01:9870   hadoop的页面访问端口  hdfs的页面访问

linux01:8088   yarn的页面访问端口

2181    zookeeper的端口

8020          JDBC连接sql的端口

标签: 信息技术

相关文章

OpenStack架构

OpenStack架构

OpenStack架构OpenStack作为开源、可扩展、富有弹性的云操作系统,其设计基本原则①按照不同的功能和通用性划分不同项目,拆分子系统按照不同的功能划分不同服务,并且服务之间相互隔离,只通过A...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。