位置: IT常识 - 正文

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架)

编辑:rootadmin
大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出 3.6.1OutputFormat接口实现类

推荐整理分享大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架),希望有所帮助,仅作参考,欢迎阅读内容。

文章相关热门搜索词:大数据框架结构,大数据技术框架有哪些,大数据处理框架,大数据整合的框架,大数据 框架,大数据 框架,大数据框架基础教程,大数据框架基础教程,内容如对您有帮助,希望把文章链接给更多的朋友!

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。

1、文本输出TextOutputFormat

默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,疑问TextOutputFormat调用toString()方法把他们转换为字符串。

2、SequenceFileOutputFormat

将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。

3、自定义OutputFormat

根据用户需求,自定义实现输出。

3.6.2自定义OutputFormat

1、使用场景

为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。

例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同的目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架)

2、自定义OUtputFormat步骤

(1)自定义一个类继承FileOutputFormat。

(2)改写RecordWriter,具体改写输出数据的方法write()。

3.6.3自定义OutputFormat案例实操

1、需求

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。

(1)输入数据

http://www.baidu.comhttp://www.google.comhttp://cn.bing.comhttp://www.atguigu.comhttp://www.sohu.comhttp://www.sina.comhttp://www.sin2a.comhttp://www.sin2desa.comhttp://www.sindsafa.com

(2)期望输出数据

http://www.atguigu.comhttp://cn.bing.comhttp://www.baidu.comhttp://www.google.comhttp://www.sin2a.comhttp://www.sin2desa.comhttp://www.sina.comhttp://www.sindsafa.comhttp://www.sohu.com

2、需求分析

3、案例实操

(1)编写FilterMapper类

package com.cuiyf41.output;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 写出 context.write(value, NullWritable.get()); }}

(2)编写FilterReducer类

package com.cuiyf41.output;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> { Text k = new Text(); @Override protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 1 获取一行 String line = key.toString(); // 2 拼接 line = line + "\r\n"; // 3 设置key k.set(line); // 4 输出 context.write(k, NullWritable.get()); }}

(3)自定义一个OutputFormat类

package com.atguigu.mapreduce.outputformat;import java.io.IOException;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)throws IOException, InterruptedException {// 创建一个RecordWriterreturn new FilterRecordWriter(job);}}

(4)编写RecordWriter类

package com.cuiyf41.output;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream atguiguOut = null; FSDataOutputStream otherOut = null; public FilterRecordWriter(TaskAttemptContext job) { // 1 获取文件系统 FileSystem fs; try { fs = FileSystem.get(job.getConfiguration()); // 2 创建输出文件路径 Path atguiguPath = new Path("e:/atguigu.log"); Path otherPath = new Path("e:/other.log"); // 3 创建输出流 atguiguOut = fs.create(atguiguPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // 判断是否包含“atguigu”输出到不同文件 if (key.toString().contains("atguigu")) { atguiguOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 关闭资源 IOUtils.closeStream(atguiguOut); IOUtils.closeStream(otherOut); }}

(5)编写FilterDriver类

package com.cuiyf41.output;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FilterDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "e:/input/log.txt", "e:/output2" }; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FilterDriver.class); job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要将自定义的输出格式组件设置到job中 job.setOutputFormatClass(FilterOutputFormat.class); Path input = new Path(args[0]); Path output = new Path(args[1]); // 如果输出路径存在,则进行删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output,true); } FileInputFormat.setInputPaths(job, input); // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, output); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
本文链接地址:https://www.jiuchutong.com/zhishi/299790.html 转载请保留说明!

上一篇:谷歌研究员走火入魔事件曝光:认为AI已具备人格,被罚带薪休假,聊天记录让网友San值狂掉...(谷歌研究院)

下一篇:猿创征文|【React 三】组件实例的三大属性(state、props、refs)(关于猿猴的作文)

  • U大侠u盘启动盘制作常见问题(u盘启动大师教程)

    U大侠u盘启动盘制作常见问题(u盘启动大师教程)

  • 小米10支持的快充功率是多少(小米10支持快充瓦数)

    小米10支持的快充功率是多少(小米10支持快充瓦数)

  • p30摄像头玻璃碎了(p30摄像头玻璃花了)

    p30摄像头玻璃碎了(p30摄像头玻璃花了)

  • 明明拦截短信却能收到(拦截短信看不到了)

    明明拦截短信却能收到(拦截短信看不到了)

  • ipadair3机身弯曲(ipadair4机身弯曲)

    ipadair3机身弯曲(ipadair4机身弯曲)

  • 苹果手机充一夜电对电池有影响吗(苹果手机充一夜电突然黑屏了)

    苹果手机充一夜电对电池有影响吗(苹果手机充一夜电突然黑屏了)

  • internet是什么结构的网络(internet结构)

    internet是什么结构的网络(internet结构)

  • 抖音忘记手机号,有抖音号,还能登录吗(抖音忘记手机号了怎么解绑)

    抖音忘记手机号,有抖音号,还能登录吗(抖音忘记手机号了怎么解绑)

  • oppo曲面屏手机有几款(oppo曲面屏手机贴什么膜)

    oppo曲面屏手机有几款(oppo曲面屏手机贴什么膜)

  • 怎样给手机降温并且不伤害手机(怎样给手机降温华为)

    怎样给手机降温并且不伤害手机(怎样给手机降温华为)

  • 网络硬件故障包括(网络硬件故障包括哪些)

    网络硬件故障包括(网络硬件故障包括哪些)

  • 华为手机无限重启震动(华为手机无限重启开不了机怎么办)

    华为手机无限重启震动(华为手机无限重启开不了机怎么办)

  • 快手黄钻兑换比例(快手黄钻兑换人民币怎么算)

    快手黄钻兑换比例(快手黄钻兑换人民币怎么算)

  • win10gta4无法识别独显(win10无法运行gta4)

    win10gta4无法识别独显(win10无法运行gta4)

  • 原相机曝光怎么开(原相机曝光怎么开华为)

    原相机曝光怎么开(原相机曝光怎么开华为)

  • 华为添加桌面插件在哪(华为如何添加桌面插件)

    华为添加桌面插件在哪(华为如何添加桌面插件)

  • 华为短信验证码自动填充怎么设置(华为短信验证码不能弹出来怎么办)

    华为短信验证码自动填充怎么设置(华为短信验证码不能弹出来怎么办)

  • 手机出现耳机模式怎么取消(手机出现耳机模式没有声音怎么办安卓)

    手机出现耳机模式怎么取消(手机出现耳机模式没有声音怎么办安卓)

  • 微信收款给封了30天怎么办(微信收款封了多久解除)

    微信收款给封了30天怎么办(微信收款封了多久解除)

  • 如何设置下划线(论文封面如何设置下划线)

    如何设置下划线(论文封面如何设置下划线)

  • 报纸怎么做(制作一张带电的报纸怎么做)

    报纸怎么做(制作一张带电的报纸怎么做)

  • 3800x用什么散热器(3800x用什么风冷)

    3800x用什么散热器(3800x用什么风冷)

  • 话费开通绿钻怎么退订(绿钻话费支付方式)

    话费开通绿钻怎么退订(绿钻话费支付方式)

  • 苹果iPhone6S清除浏览器缓存方法(如何清除苹果手机6s垃圾)

    苹果iPhone6S清除浏览器缓存方法(如何清除苹果手机6s垃圾)

  • Vue中的Pinia状态管理工具 | 一篇文章教会你全部使用细节(vue pending)

    Vue中的Pinia状态管理工具 | 一篇文章教会你全部使用细节(vue pending)

  • phpcms图形验证码不显示不出来怎么办(图形验证码api)

    phpcms图形验证码不显示不出来怎么办(图形验证码api)

  • PHPCMS如何禁止IP访问网站(phpcms v9网页禁止复制)

    PHPCMS如何禁止IP访问网站(phpcms v9网页禁止复制)

  • 上下班出了事故算不算工伤
  • 转租仓库交增值税吗
  • 城市生活垃圾处理收费管理办法
  • 查验发票会显示名字吗
  • 劳务报酬能不能按照计件计算
  • 无发票情况说明怎么写学生
  • 长期股权投资审计说明
  • 小规模纳税人开普票要交税吗
  • 研发费用加计扣除条件
  • 一次性取得的租金收入
  • 个人独资交什么税?
  • 商品房买卖合同没有约定逾期交房违约金
  • 盈余公积可用于集体福利吗
  • 企业接受基金投资的规定
  • 外贸 内销
  • 广告服务印花税怎么交
  • 增资有哪些途径
  • 公对公转账备注信息填错了怎么办
  • 退关税怎么入账
  • 备抵法计提坏账准备的公式
  • windows7怎么说
  • windows11永久解决蓝屏
  • ghost后分区没有了
  • 收益性支出与资本性支出的主要特点
  • win10通讯
  • 一般纳税人购进税控收款机抵扣
  • 免抵退税如何进账
  • 原始凭证的内容有哪些
  • 还在用夸克?这3款能安装插件的手机浏览器不香吗_Via_
  • 再就业优惠怎么办理
  • php单例模式连接数据库
  • 前端跨域解决方案设计
  • 2023年我要实现的目标是
  • ps使用背景橡皮擦的时候需要按住什么键
  • java一天速成
  • 企业员工年终奖仲裁
  • 附加税申报核心内容
  • 分公司财务负责人要求
  • 财务报表审计的定义
  • MySQL5.6 Replication主从复制(读写分离) 配置完整版
  • 资产负债表与利润表的关联性可在存货与销售成本
  • 公司给非本单位的个人转账有风险怎么平账
  • 房屋出租简易计税进项税额需要转出么
  • 国有独资企业董事会成员组成规定
  • 企业营改增税率是多少
  • 收到工会经费怎么指定现金流入
  • 短期借款的相关法规
  • 劳务支出如何做账
  • 应付款多付了怎么入账
  • 本单位职工可以在本单位兼职吗
  • sql效率调优
  • ubuntu系统中怎么安装mathematica13.1.0
  • win7怎么设置禁止安装软件
  • centos 搜索
  • win8.1界面如何改为win7
  • windows xp能装微信吗
  • linux crontab用法
  • cocos2dx 3.17
  • cocos2dx开发的游戏
  • unity的粒子系统在哪
  • 汤姆猫arcode
  • Android屏幕外侧滑条
  • sequelize join
  • shell脚本基础教学
  • async/await与promise(nodejs中的异步操作问题)
  • jQuery实现textarea自动增长宽高的方法
  • angular ngshow
  • nodejs bff
  • Python heapq使用详解及实例代码
  • 世界坐标转换成屏幕坐标
  • 如何用node搭建服务器
  • os模块 python
  • JavaScript 事件对象介绍
  • js 回调函数写法
  • 税务登记没去登记会怎么处罚
  • 山东省国家税务局威海培训中心
  • PLC交通信号灯控制程序设计
  • 新开公司交印花税
  • 专票代开流程?
  • 新疆税务总局网站官网
  • 免责声明:网站部分图片文字素材来源于网络,如有侵权,请及时告知,我们会第一时间删除,谢谢! 邮箱:opceo@qq.com

    鄂ICP备2023003026号

    网站地图: 企业信息 工商信息 财税知识 网络常识 编程技术

    友情链接: 武汉网站建设