如何用java导入hbase.dat文件

如题所述

开发环境
硬件环境:Centos 6.5 服务器3台(一台为Master节点,两台为Slave节点)
软件环境:Java 1.7.0_71、IDEA、Hadoop-2.6.2、Hbase-1.1.4
一、生成日志文件
假设日志文件有六列,每列之间由空格间隔
例如:
aaa 20.3.111.3 bbb user nothing 2016-05-01
www 22.3.201.7 ggg user nothing 2016-05-02
...
...12341234

日志文件存在于HDFS:/in/文件夹下
二、创建Java项目,将HBase包中的jar包导入工程
为了保证大量数据操作,所以使用MapReduce函数实现,但只涉及到插入表操作,没用统计操作,所以只需要实现Map函数即可。
1.首先创建一个配置文件,包含用户自定义参数
MRDriver.properties
#mapreduce
hbase.zookeeper.quorum=hadoop101
mapreduce.job.tracker=hadoop101:9001

#HTable
HTable.tableName=hbases4
HTable.tableName.colFamily=logs

#HDFS File
mapreduce.inputPath=hdfs://hadoop101:9000/in/*
12345678910111234567891011

2.获得配置文件信息
HConfiguration.java
public class HConfiguration {
public static String hbase_zookeeper_quorum;
public static String mapreduce_job_tracker;

//创建的表名和列族名
public static String tableName;
public static String colFamily;

//行键在行中的位置
public static int htable_rowkey_first;
public static int htable_rowkey_second;

public static String mapreduce_inputPath;

static{
try {
InputStream in = MRDriver.class.getClassLoader().getResourceAsStream("MRDriver.properties");
Properties props = new Properties();
props.load(in);

hbase_zookeeper_quorum = props.getProperty("hbase.zookeeper.quorum");
mapreduce_job_tracker = props.getProperty("mapreduce.job.tracker");

tableName = props.getProperty("HTable.tableName");
colFamily = props.getProperty("HTable.tableName.colFamily");

htable_rowkey_first = Integer.parseInt(props.getProperty("HTable.rowkey.first"));
htable_rowkey_second = Integer.parseInt(props.getProperty("HTable.rowkey.second"));

mapreduce_inputPath = props.getProperty("mapreduce.inputPath");

} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}
}

3.创建操作HBase的单例类
MRDriver.java
public class MRDriver {

private static MRDriver single = null;
private static HTable table = null;

public MRDriver() {
}

// 静态工厂方法
public static MRDriver getInstance() {
if (single == null) {
single = new MRDriver();
}
return single;
}

// 声明静态配置
static Configuration conf = null;

static {
conf = HBaseConfiguration.create();
// 配置hbase.zookeeper.quorum: 后接zookeeper集群的机器列表
conf.set("hbase.zookeeper.quorum", HConfiguration.hbase_zookeeper_quorum);
conf.set("hbase.zookeeper.property.clientPort", "2181");

}

// 获取htable实例,节约系统资源
public static HTable getHTable(String tableName) {
try {
if (table == null) {
table = new HTable(conf, Bytes.toBytes(tableName));
}
} catch (IOException e) {
e.printStackTrace();
}
return table;
}

/*
* 创建表
*
* @tableName 表名
*
* @family 列族列表
*/
public void creatTable(String tableName, String family) throws Exception {

HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(family));
if (admin.tableExists(tableName)) {
MyMRDriver.logger.info("table Exists!");
} else {
admin.createTable(desc);
MyMRDriver.logger.info("create table Success!");
}
}

/*
* 为表添加数据
*
* @rowKey rowKey 行健
*
* @tableName 表名
*
* @column1 列族名列表
*
* @value1 列族值的列表
*/
@SuppressWarnings({"resource", "deprecation"})
public void addData(String rowKey, String tableName, ArrayList<String> column, String[] value) throws IOException {
Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey
getHTable(tableName);// 获取表htable用来增删改查
HColumnDescriptor columnFamilies = table.getTableDescriptor() // 获取所有的列族
.getColumnFamilies()[0];

String familyName = columnFamilies.getNameAsString(); // 获取列族名
if (familyName.equals(HConfiguration.colFamily)) { // article列族put数据
for (int j = 0; j < column.size(); j++) {
put.add(Bytes.toBytes(familyName), Bytes.toBytes(column.get(j)), Bytes.toBytes(value[j]));
}
}
table.put(put);
MyMRDriver.logger.info("Add data Success!");
}

}

4.创建实现Map函数
HMap.java
public class HMap extends Mapper<LongWritable, Text, LongWritable, Text> {

MRDriver myDriver = MRDriver.getInstance();

// 起多个列名
ArrayList<String> arrayList = new ArrayList<String>();
// 一行分割-数组
String[] lineValue = new String[]{};

int lineNum = 0;

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
super.map(key, value, context);
//根据空格分隔
lineValue = value.toString().split(" ");

if (lineNum == 0) {// 给子列族起名
for (int i = 0; i < lineValue.length; i++) {
arrayList.add("log" + i);
}
lineNum++;
}
// 添加数据
Date date = new Date();
//这里为了方便使用了getTime()作为rowkey,实际开发中这样并不好
myDriver.addData(date.getTime() + "" , HConfiguration.tableName, arrayList, lineValue);
}
}
1234567891011121314151617181920212223242526272829303112345678910111213141516171819202122232425262728293031

5.实现Job调度main方法
MyMRDriver.java
public class MyMRDriver {
public static Logger logger = Logger.getLogger(MRDriver.class);

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

//初始化前,请先修改配置文件 MRDriver.properties
MRDriver myDriver = MRDriver.getInstance();

Job job = new Job(new Configuration(), "HDFS2HBase2");
job.setJarByClass(MyMRDriver.class);

try {
myDriver.creatTable(HConfiguration.tableName, HConfiguration.colFamily);
} catch (Exception e) {
e.printStackTrace();
}

// 设置 Map 和 Reduce 处理类
job.setMapperClass(HMap.class);

// 设置输入和输出格式
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);

// 设置输入目录
FileInputFormat.addInputPath(job, new Path(HConfiguration.mapreduce_inputPath));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}
}

6.运行主方法测试

也可以在hbase shell中使用count检查是否插入成功
温馨提示:内容为网友见解,仅供参考
无其他回答
相似回答