位置: IT常识 - 正文

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架)

编辑:rootadmin
大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出 3.6.1OutputFormat接口实现类

推荐整理分享大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架),希望有所帮助,仅作参考,欢迎阅读内容。

文章相关热门搜索词:大数据框架结构,大数据技术框架有哪些,大数据处理框架,大数据整合的框架,大数据 框架,大数据 框架,大数据框架基础教程,大数据框架基础教程,内容如对您有帮助,希望把文章链接给更多的朋友!

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。

1、文本输出TextOutputFormat

默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,疑问TextOutputFormat调用toString()方法把他们转换为字符串。

2、SequenceFileOutputFormat

将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。

3、自定义OutputFormat

根据用户需求,自定义实现输出。

3.6.2自定义OutputFormat

1、使用场景

为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。

例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同的目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。

大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出(大数据相关框架)

2、自定义OUtputFormat步骤

(1)自定义一个类继承FileOutputFormat。

(2)改写RecordWriter,具体改写输出数据的方法write()。

3.6.3自定义OutputFormat案例实操

1、需求

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。

(1)输入数据

http://www.baidu.comhttp://www.google.comhttp://cn.bing.comhttp://www.atguigu.comhttp://www.sohu.comhttp://www.sina.comhttp://www.sin2a.comhttp://www.sin2desa.comhttp://www.sindsafa.com

(2)期望输出数据

http://www.atguigu.comhttp://cn.bing.comhttp://www.baidu.comhttp://www.google.comhttp://www.sin2a.comhttp://www.sin2desa.comhttp://www.sina.comhttp://www.sindsafa.comhttp://www.sohu.com

2、需求分析

3、案例实操

(1)编写FilterMapper类

package com.cuiyf41.output;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 写出 context.write(value, NullWritable.get()); }}

(2)编写FilterReducer类

package com.cuiyf41.output;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> { Text k = new Text(); @Override protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 1 获取一行 String line = key.toString(); // 2 拼接 line = line + "\r\n"; // 3 设置key k.set(line); // 4 输出 context.write(k, NullWritable.get()); }}

(3)自定义一个OutputFormat类

package com.atguigu.mapreduce.outputformat;import java.io.IOException;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)throws IOException, InterruptedException {// 创建一个RecordWriterreturn new FilterRecordWriter(job);}}

(4)编写RecordWriter类

package com.cuiyf41.output;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream atguiguOut = null; FSDataOutputStream otherOut = null; public FilterRecordWriter(TaskAttemptContext job) { // 1 获取文件系统 FileSystem fs; try { fs = FileSystem.get(job.getConfiguration()); // 2 创建输出文件路径 Path atguiguPath = new Path("e:/atguigu.log"); Path otherPath = new Path("e:/other.log"); // 3 创建输出流 atguiguOut = fs.create(atguiguPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // 判断是否包含“atguigu”输出到不同文件 if (key.toString().contains("atguigu")) { atguiguOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 关闭资源 IOUtils.closeStream(atguiguOut); IOUtils.closeStream(otherOut); }}

(5)编写FilterDriver类

package com.cuiyf41.output;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FilterDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "e:/input/log.txt", "e:/output2" }; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FilterDriver.class); job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要将自定义的输出格式组件设置到job中 job.setOutputFormatClass(FilterOutputFormat.class); Path input = new Path(args[0]); Path output = new Path(args[1]); // 如果输出路径存在,则进行删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output,true); } FileInputFormat.setInputPaths(job, input); // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, output); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
本文链接地址:https://www.jiuchutong.com/zhishi/299790.html 转载请保留说明!

上一篇:谷歌研究员走火入魔事件曝光:认为AI已具备人格,被罚带薪休假,聊天记录让网友San值狂掉...(谷歌研究院)

下一篇:猿创征文|【React 三】组件实例的三大属性(state、props、refs)(关于猿猴的作文)

  • 小黄车怎么关闭车锁(小黄车怎么关闭自动续费)

    小黄车怎么关闭车锁(小黄车怎么关闭自动续费)

  • 山姆会员副卡怎么转给别人(山姆会员副卡怎么给别人)

    山姆会员副卡怎么转给别人(山姆会员副卡怎么给别人)

  • 滴答音乐下载失败怎么回事(滴答音乐下载不了歌曲)

    滴答音乐下载失败怎么回事(滴答音乐下载不了歌曲)

  • 免费转pdf软件有哪些(免费转pdf为word的软件)

    免费转pdf软件有哪些(免费转pdf为word的软件)

  • 小米10有光学防抖吗(小米10光学防抖怎么关闭)

    小米10有光学防抖吗(小米10光学防抖怎么关闭)

  • QQ空间为什么出现陌生访客(qq空间为什么出不来了)

    QQ空间为什么出现陌生访客(qq空间为什么出不来了)

  • 华为mate30pro锁屏时间位置在哪(华为mate30pro锁屏步数怎么关闭)

    华为mate30pro锁屏时间位置在哪(华为mate30pro锁屏步数怎么关闭)

  • ipadmini4什么时候上市的		(ipadmini4什么时候上市的多少钱)

    ipadmini4什么时候上市的 (ipadmini4什么时候上市的多少钱)

  • 淘宝满减后退货怎么办(淘宝满减后退货还会满减吗)

    淘宝满减后退货怎么办(淘宝满减后退货还会满减吗)

  • 淘宝货物拒收后怎么申请退款呢(淘宝货物拒收后怎么申请退款吗)

    淘宝货物拒收后怎么申请退款呢(淘宝货物拒收后怎么申请退款吗)

  • 苹果手机中的wapi是什么东西(苹果手机中的watch可以删除吗)

    苹果手机中的wapi是什么东西(苹果手机中的watch可以删除吗)

  • 苹果电池保修期内多少容量才可以换新(苹果电池保修期内低于多少免费换)

    苹果电池保修期内多少容量才可以换新(苹果电池保修期内低于多少免费换)

  • 苹果xr有分屏功能吗(苹果xr能分屏)

    苹果xr有分屏功能吗(苹果xr能分屏)

  • 快手可以艾特自己吗(快手艾特自己话题)

    快手可以艾特自己吗(快手艾特自己话题)

  • 手机wifi密码忘了怎么办(手机Wifi密码忘了怎么办)

    手机wifi密码忘了怎么办(手机Wifi密码忘了怎么办)

  • pr导出mp4视频变成绿色(pr导出mp4变模糊)

    pr导出mp4视频变成绿色(pr导出mp4变模糊)

  • 座机暂停服务是啥意思(座机的暂停是什么意思)

    座机暂停服务是啥意思(座机的暂停是什么意思)

  • 进程的三种状态分别是(进程的三种状态详解)

    进程的三种状态分别是(进程的三种状态详解)

  • vivo的黑名单在哪找(vivo黑名单在手机哪里找)

    vivo的黑名单在哪找(vivo黑名单在手机哪里找)

  • volte手机中文叫什么(volte的中文)

    volte手机中文叫什么(volte的中文)

  • 屏幕漏液可以坚持多久(屏幕漏液可以坚持一个月吗)

    屏幕漏液可以坚持多久(屏幕漏液可以坚持一个月吗)

  • 苹果x怎么清灰(苹果x怎么清灰喇叭麦克风)

    苹果x怎么清灰(苹果x怎么清灰喇叭麦克风)

  • 三星a60对比realmex(三星a60和a60s)

    三星a60对比realmex(三星a60和a60s)

  • 复制的文字有底色怎么去掉(复制的文字有底色怎么办)

    复制的文字有底色怎么去掉(复制的文字有底色怎么办)

  • 克拉莫克湖上的倒影,英国坎布里亚湖区 (© Damian Harrison/Alamy)(克拉莫16首)

    克拉莫克湖上的倒影,英国坎布里亚湖区 (© Damian Harrison/Alamy)(克拉莫16首)

  • 直连路由、静态路由、动态路由(直连路由,静态路由)

    直连路由、静态路由、动态路由(直连路由,静态路由)

  • vue面试题整理(2022-持续更新中...)(vue面试题2020)

    vue面试题整理(2022-持续更新中...)(vue面试题2020)

  • PointNet解读(point network)

    PointNet解读(point network)

  • 税务局退回个税手续费会计分录
  • 一般纳税人注销公司麻烦吗
  • 开具增值税发票哪些情形不用交税?
  • 计提企业所得税会计科目
  • 零星采购不要发票可以吗
  • 合并业务的好处
  • 装修期内免租金可以办理营业执照吗
  • 购买税控设备怎么抵扣
  • 注册资本的变更
  • 红冲增值税专用发票需要收回原发票吗
  • 企业所得税季报营业收入,营业成本怎么填
  • 公务用车用油
  • 外账成本要注意什么
  • 福利费专票进项转出怎么做账
  • 暂估入库企业所得税税率
  • 绩效奖金是否属于工资判决书
  • 企业购买汽车需要缴纳什么税
  • 税后利润是净利润还是利润总额
  • 全资的子公司
  • macbook设置壁纸后开机变回原样
  • 健康检查查询系统
  • 工程服务的采购合同范本
  • 上个月退货会计分录
  • 如何防止win10自动重启
  • 税收分类编码怎么添加
  • 事业单位财产清查怎么进行
  • win7系统配置服务哪些启用
  • 埃托沙国家公园发展观兽旅游的优势条件
  • 增值税专用发票的税率是多少啊
  • HTML怎么设置文本框
  • php批量上传
  • php使用自定义函数编程求半径r的圆的周长和面积
  • 小规模纳税人的个人所得税怎么算
  • 覆盖的盖
  • php redis数据类型
  • vue element ui
  • 打造出ChatGPT的,是怎样一群人?
  • 城市道路占用费的收费标准
  • 经营范围技术服务技术开发技术咨询技术交流
  • 总分类账户余额表怎么做账
  • MySQL数据库远程登录
  • SQL Server UPDATE语句的用法详解
  • 公司贷款 利息
  • 金税四期的主要功能
  • 收员工伙食费会计分录
  • 机票电子行程单查询
  • 现金发放工资有什么风险
  • 工会经费是不是税费
  • 房地产预售款预交税金计算
  • 资产负债表货币资金怎么填
  • 财务费用和应付利息都在借方
  • 法人在公司账户取钱要纳税么
  • 加计抵减四项服务是什么?
  • 关于发票丢失的处罚
  • 矿山运输设备的点检绩效考核
  • 商业企业购入商品
  • sql Set IDENTITY_INSERT的用法
  • 64位 win10系统安装绿色版mysql-5.7.16-winx64的教程
  • 笔记本电脑的触摸板怎么开启和关闭
  • centos ssh permission denied
  • mac 设置
  • jinjia.exe进程
  • linux中安装telnet
  • scards32.exe - scards32是什么进程 有什么用
  • window预览
  • win8桌面不显示
  • 安卓注入工具
  • activity lunchmode详解
  • cocos2dx-js
  • android去掉状态栏第三方图标
  • jquery删除当前元素
  • unicode编码实现方案
  • js应用实例
  • jquery示例
  • 从安卓设备导入
  • python中闭包的作用
  • unity2d摇杆
  • 销售黄金饰品
  • 个人扣缴客户端怎样迁移信息
  • 税款复核需要多少天
  • 免责声明:网站部分图片文字素材来源于网络,如有侵权,请及时告知,我们会第一时间删除,谢谢! 邮箱:opceo@qq.com

    鄂ICP备2023003026号

    网站地图: 企业信息 工商信息 财税知识 网络常识 编程技术

    友情链接: 武汉网站建设