柚子快報邀請碼778899分享:數據庫 Hbase與MR的交互
柚子快報邀請碼778899分享:數據庫 Hbase與MR的交互
Hbase與MR的交互
?
小白的Hbase學習筆記
?
目錄
Hbase與MR的交互
1.從Hbase中讀取學生表信息 統(tǒng)計每個班級下的人數 并將最后的結果寫入HDFS或者本地文件系統(tǒng)
2.統(tǒng)計每個年齡的人數
1)寫入一列數據
2)寫入兩列數據
?
?
1.從Hbase中讀取學生表信息 統(tǒng)計每個班級下的人數 并將最后的結果寫入HDFS或者本地文件系統(tǒng)
?
package com.shujia.hbaseMROP;
import com.google.common.collect.Table;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
//需求:
// 從Hbase中讀取學生表信息 統(tǒng)計每個班級下的人數 并將最后的結果寫入HDFS或者本地文件系統(tǒng)
public class Hbase2HDFS {
//添加依賴
//Map端
//Map端輸入的應該是班級名稱 -> 1
public static class myHbaseMapper extends TableMapper
@Override
protected void map(ImmutableBytesWritable key, Result value, Mapper
String row = Bytes.toString(key.get());
String rowKey = Bytes.toString(value.getRow());
System.out.println("row:"+row+"rowKey:"+rowKey);
byte[] bytes = value.getValue(
Bytes.toBytes("info")
, Bytes.toBytes("clazz")
);
if(bytes != null){
String clazz = Bytes.toString(bytes);
context.write(new Text(clazz),new IntWritable(1));
}
}
}
//reduce端
public static class myReducer extends Reducer
@Override
protected void reduce(Text key, Iterable
int num=0;
for (IntWritable value : values) {
num += value.get();
}
context.write(key,new IntWritable(num));
}
}
//driver
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
//Job連接Hbase
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,master:2181");//如果端口不是默認2181 那么可以輸入
Job job = Job.getInstance(conf);
job.setJobName("Hbase2HDFS");
job.setJarByClass(Hbase2HDFS.class);
//設置Mapper
/**
* TableName table,
* Scan scan,
* Class extends TableMapper> mapper,
* Class> outputKeyClass,
* Class> outputValueClass, Job job) throws IOException
*/
TableMapReduceUtil.initTableMapperJob(
TableName.valueOf("jan:tbl1")
,new Scan()
,myHbaseMapper.class
,Text.class
,IntWritable.class
,job
);
//設置Reducer
job.setReducerClass(myReducer.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//輸入(上面連接zookeeper)
// 輸出的配置
FileOutputFormat.setOutputPath(job,new Path("D:\\IDEA_workplace\\hbasedemo15\\hbaseOut\\out"));
//提交
job.waitForCompletion(true);
}
}
?
?
2.統(tǒng)計每個年齡的人數
?
1)寫入一列數據
?
package com.shujia.hbaseMROP;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
//需求:
// 統(tǒng)計每個年齡的人數
public class Hbase2Hbase {
//一個MR程序包含三部分
//Mapper
public static class ReadHbaseMapper extends TableMapper
@Override
protected void map(ImmutableBytesWritable key, Result value, Mapper
byte[] age_value = value.getValue(
Bytes.toBytes("info")
, Bytes.toBytes("age")
);
if(age_value != null){
context.write(new Text(Bytes.toString(age_value)),new IntWritable(1));
}
}
}
//Reducer
public static class WriteHbaseReducer extends TableReducer
@Override
protected void reduce(Text key, Iterable
int num=0;
for (IntWritable value : values) {
num +=value.get();
}
Put put = new Put(Bytes.toBytes(key.toString()));
//在Hbase中創(chuàng)建存儲數據的表
//create 'jan:age_cnt',{NAME => 'info1',VERSIONS => 1}
//寫入一列數據
put.addColumn(
Bytes.toBytes("info1")
,Bytes.toBytes("cnt")
,Bytes.toBytes(num)
);
//寫入兩行數據
/**
* put.addColumn(
* Bytes.toBytes("info1")
* ,Bytes.toBytes("age")
* ,Bytes.toBytes(key.toString())
* );
*/
context.write(NullWritable.get(),put);
}
}
//driver
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum","node1,node2,master");
Job job = Job.getInstance(conf);
job.setJobName("Hbase2Hbase");
job.setJarByClass(Hbase2Hbase.class);
//設置Mapper
/**
* TableName table,
* Scan scan,
* Class extends TableMapper> mapper,
* Class> outputKeyClass,
* Class> outputValueClass,
* Job job
*/
TableMapReduceUtil.initTableMapperJob(
TableName.valueOf("jan:tbl1")
,new Scan()
,ReadHbaseMapper.class
,Text.class
,IntWritable.class
,job
);
//設置Reducer
TableMapReduceUtil.initTableReducerJob(
"jan:age_cnt"
,WriteHbaseReducer.class
,job
);
//提交
job.waitForCompletion(true);
/**
* scan 'jan:age_cnt'
*
* truncate 'jan:age_cnt'
*/
}
}
?
scan 'jan:age_cnt'
truncate 'jan:age_cnt'
?
?
?
2)寫入兩列數據
?
package com.shujia.hbaseMROP;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
//需求:
// 統(tǒng)計每個年齡的人數
public class Hbase2Hbase {
//一個MR程序包含三部分
//Mapper
public static class ReadHbaseMapper extends TableMapper
@Override
protected void map(ImmutableBytesWritable key, Result value, Mapper
byte[] age_value = value.getValue(
Bytes.toBytes("info")
, Bytes.toBytes("age")
);
if(age_value != null){
context.write(new Text(Bytes.toString(age_value)),new IntWritable(1));
}
}
}
//Reducer
public static class WriteHbaseReducer extends TableReducer
@Override
protected void reduce(Text key, Iterable
int num=0;
for (IntWritable value : values) {
num +=value.get();
}
Put put = new Put(Bytes.toBytes(key.toString()));
//在Hbase中創(chuàng)建存儲數據的表
//create 'jan:age_cnt',{NAME => 'info1',VERSIONS => 1}
//寫入一列數據
put.addColumn(
Bytes.toBytes("info1")
,Bytes.toBytes("cnt")
,Bytes.toBytes(num)
);
//寫入兩行數據
put.addColumn(
Bytes.toBytes("info1")
,Bytes.toBytes("age")
,Bytes.toBytes(key.toString())
);
context.write(NullWritable.get(),put);
}
}
//driver
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum","node1,node2,master");
Job job = Job.getInstance(conf);
job.setJobName("Hbase2Hbase");
job.setJarByClass(Hbase2Hbase.class);
//設置Mapper
/**
* TableName table,
* Scan scan,
* Class extends TableMapper> mapper,
* Class> outputKeyClass,
* Class> outputValueClass,
* Job job
*/
TableMapReduceUtil.initTableMapperJob(
TableName.valueOf("jan:tbl1")
,new Scan()
,ReadHbaseMapper.class
,Text.class
,IntWritable.class
,job
);
//設置Reducer
TableMapReduceUtil.initTableReducerJob(
"jan:age_cnt"
,WriteHbaseReducer.class
,job
);
//提交
job.waitForCompletion(true);
/**
* scan 'jan:age_cnt'
*
* truncate 'jan:age_cnt'
*/
}
}
?
?
柚子快報邀請碼778899分享:數據庫 Hbase與MR的交互
相關閱讀
本文內容根據網絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉載請注明,如有侵權,聯系刪除。