大数据 hbase KNOWU 2024-11-15 2024-11-15 hbase介绍 HBase产生背景介绍:
由于 HAOOP 不支持随机读写的操作, 仅支持顺序性读写操作, 适合于进行批量化处理操作
HBase是采用 java 语言开发, HBase基于HDFS , 是一个 支持高效的随机读写能力的noSQL型 数据库
HBase支持三种方式进行查询数据:
1) 支持主键查询
2) 支持主键的范围查询
3) 支持全表查询
HBase本质上就是一个存储容器, 不支持多行事务, 仅支持单行事务, 不支持SQL语句, 数据存储格式都是一切皆字节
HBase集群, 可以向hadoop一样, 通过横向扩展方式, 来提升HBase处理和存储的能力
HBase的表具有以下特征:
大: 表支持存储上十亿行数据, 上百万列
面向列: 数据是面向于列(列族 )式的存储方案
稀疏性: 在HBase的表存储数据, 如果某个字段为null, 并不会占用磁盘任何空间, 所以可以将表构建非常稀疏
应用场景:
数据需要进行随机读写操作
数据量比较的大
数据比较的稀疏
hbase和其他软件的区别
hbase和RDBMS的区别
HBase: 以表形式存储数据, 不支持SQL 不支持事务, 仅支持单行事务操作, 数据存储是分布式存储 ,存储是结构化和半结构数据 ,不支持join
RDBMS:以表形式存储数据, 支持SQL 支持多行事务操作, 数据存储中心化存储, 存储主要是以结构化数据为主,支持join
hbase 和 HDFS的区别
HBASE: 强依赖于HDFS , 数据最终 存储在HDFS之上的, 支持高效的随机读写 操作
HDFS: 分布式存储容器, 适合于批量化数据存储, 不支持随机读写能力
说明:
注意到 HBASE 和HDFS 既有联系 又有矛盾, HBASE基于HDFS , 而HDFS不支持随机读写, 但是HBASE支持随机读写
hbase和hive的区别
HBASE: 基于hadoop的软件 , hbase是nosql存储容器, HBASE延迟型较低, 接入在线业务
HIVE: 基于hadoop的软件, 数仓分析工具 , hive延迟较高, 接入离线业务 , 用于 OLAP操作
hbase部署 hbase的安装操作
安装HBase的易错点:
修改hbase-site.xml的时候, 没有检查 zookeeper的目录位置
没有将 htrace-core-3.1.0-incubating.jar 放置到hbase的lib目录下
没有讲conf/regionserves中的localhost信息删除, 以及此文件存在空行
如何启动HBase:
1 2 3 4 5 6 7 # 第一步: 三台节点都要执行: cd /export/server/zookeeper-3.4.6/bin ./zkServer.sh start # 第二步: 通过 jps 查询 三个节点是否都出现了以下这个进程 QuorumPeerMain # 第三步: 三台节点, 依次检查启动状态, 必须看到 两个 follower 一个 leader
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 1) 在node1的任意位置下执行: start-all.sh 2) 检查 三个节点是否都启动 node1 : namenode datanode resourceManager nodemanager node2 : seconderyNamenode datanode nodemanager node3 : datanode nodemanager 3) 分别打开 50070 和 8088 的端口号 检查 在50070 检查是否退出安全模式 三个datanode是否存在 在8088端口号, 检查 active node 是否为 3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 1) 在node1节点上, 任意目录下执行: start-hbase.sh 2) 检查: 在三个节点依次 通过 jps查询 node1 : HMaster HRegionServer node2 : HRegionServer node3 : HRegionServer 此检查, 如果第三步正常, 不需要在检查, 如果第三步一直无法显示, 请在2分钟后, 在此查询, 是否有减少进程 如果那个节点没有启动, 请查询其日志文件: 日志存储的目录 : /export/server/hbase-2.1.0/logs 查看两个文件 : hbase-root-regionserver-node[N].log hbase-root-regionserver-node[N].out 查看日志命令 : tail -200f xxx.文件 3) 登录 hbase的管理界面: 端口号 16010 访问 : http://node1:16010
hbase的表模型
Table: hbase的表, 在hbase中, 数据也是通过表形式组织在一起, hbase可以构建多张表
rowkey: 行键(主键) 类似于RDBMS中的PK , 保证唯一且非空 , rowkey仅会安装 字典序 方案进行排序
列: 列族 + 列限定符(列名)组成的
列族: 在一个表可以构建多个列族的, 每个列族下可以有多个列名, 支持有上百万个列
注意:
在建表的时候, 必须要指定列族
在建表的时候, 建议列族越少越好, 能用一个解决的, 坚决不使用两个的
列名: 一个列名必然属于某一个列族的, 列名不需要在建表的时候指定, 在后期添加数据的时候动态指定
时间戳:
每一个数据都是由时间戳的概念, 默认为添加数据的时间, 当然也可以手动指定时间
版本号: 是否需要保留每个数据的历史变更信息, 以及保留多少个
默认值: 1 表示只保留最新的版本
版本号是基于时间戳来记录各个历史版本的
单元格: 如何确定一个单元格
说明: 在建表的时候,必须指定二个参数 一个 表名 一个是列族
hbase常用操作 hbase的相关操作_shell命令 hbase的基本shell操作
1 [root@node2 ~]# hbase shell
1 2 3 4 # 查看某一个命令如何使用: # 格式: help '命令名称' hbase(main):012:0> help 'create'
查询集群的状态: status
查询Hbase有那些表: list
如何创建一张表
1 2 # 格式: create '表名' , '列族名称1','列族名称2' ....
1 2 # 格式: put '表名','rowkey值','列族:列名','值'
1 2 格式: get '表名','rowkey',['列族1','列族2' ...],['列族1:列名'],['列族1','列族2:列名' ...]
1 2 3 4 5 6 7 8 9 10 格式: +,[{COLUMNS =>['列族1','列族2']} | {COLUMNS =>['列族1:列名','列族2'],VERSIONS=>N} ] , [{FORMATTER =>'toString'}] , [{LIMIT =>N}] 范围查询的格式: scan '表名', {COLUNMS=>['列族1','列族2']} | {COLUMNS =>['列族1:列名','列族2'], STARTROW =>'起始rowkey的值', ENDROW=>'结束rowkey'} 注意: 包头不包尾 说明: {FORMATTER =>'toString'} : 用于显示中文 {LIMIT =>N} : 显示前 N条数据
1 修改与添加数据的操作 是一致的, 只需要保证 rowkey相同 就是 修改操作
1 2 格式: delete '表名','rowkey','列族:列名'
1 2 格式: deleteall '表名','rowkey',['列族:列名']
1 2 3 4 5 delete 和 deleteall区别: 两个操作都是用来执行删除数据操作 区别点 :1) delete操作 只能删除表中某个列的数据 deleteall支持删除某行数据 2) 通过delete删除某个列的时候, 默认只是删除其最新的版本, 而deleteall直接将其所有的版本数据全部都删除
说明:
在删除表的时候, 必须先禁用表, 然后才能删除表
hbase的高级shell命令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 格式 : scan '表名',{FILTER=>"过滤器名称(比较运算符,比较器表达式)" } 常见的过滤器 : rowkey过滤器 : RowFilter : 实现行键字符串的比较和过滤操作 PrefixFilter : rowkey 前缀过滤器 列族过滤器 : FamilyFilter : 列族过滤器 列名过滤器 : QualifierFilter : 列名过滤器, 中显示对应列名的数据 列值过滤器 : ValueFilter : 列值过滤器, 找到符合值的数据 SingleColumnValueFilter : 在执行的列族和列名中进行比较具体的值, 将符合条数据整行数据全部都返回(包含条件的内容字段) SingleColumnVlaueExcludeFilter : 在执行的列族和列名中进行比较具体的值, 将符合条数据整行数据全部都返回(去除条件内容的字段) 其他过滤器 : PageFilter : 用来执行分页操作 比较运算符 : = > < >= <= != 比较器 : BinaryComparator : 完整匹配字节数组 BinaryPrefixComparator : 匹配字节数组前缀 NullComparator : 匹配Null SubstringComparator : 模糊匹配字符串 比较器表达式 : BinaryComparator : binary:值 BinaryPrefixComparator : binaryprefix:值 NullComparator : null SubstringComparator : substring:值 如果不知道过滤器的构造参数, 可以查看此地址: http ://hbase.apache.org/2.2/devapidocs/index.html
高级的shell管理命令:
2.1: whoami : 显示Hbase当前使用用户
2.2: describe: 展示表的结构信息
2.3: exists 判断表是否存在
2.4: is_enabled 和 is_disabled 判断表是否启用和是否禁用
2.5 : alter: 该命令可以改变表和列族信息
如何增加列族:
alter ‘表名’ ,NAME=>’列族名’,[VERSIONS=>N]
如何删除列族:
alter ‘表名’,’delete’=>’列族名’
hbase的javaAPI的操作
项目的准备工作
创建maven项目:
1.1) 先构建一个父工程: bigdata_parent_01
1.2) 接着将父工程中 src 删除
1.3) 创建一个子工程: day01_hbase
导入相关的依赖:pom
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 <repositories > <repository > <id > aliyun</id > <url > http://maven.aliyun.com/nexus/content/groups/public/</url > <releases > <enabled > true</enabled > </releases > <snapshots > <enabled > false</enabled > <updatePolicy > never</updatePolicy > </snapshots > </repository > </repositories > <dependencies > <dependency > <groupId > org.apache.hbase</groupId > <artifactId > hbase-client</artifactId > <version > 2.1.0</version > </dependency > <dependency > <groupId > commons-io</groupId > <artifactId > commons-io</artifactId > <version > 2.6</version > </dependency > <dependency > <groupId > junit</groupId > <artifactId > junit</artifactId > <version > 4.12</version > <scope > test</scope > </dependency > <dependency > <groupId > org.testng</groupId > <artifactId > testng</artifactId > <version > 6.14.3</version > <scope > test</scope > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > 3.1</version > <configuration > <target > 1.8</target > <source > 1.8</source > </configuration > </plugin > </plugins > </build >
创建表
实现步骤:
1 2 3 4 5 6 7 8 9 1) 创建HBase连接对象: 2) 从连接对象中获取相关的管理对象: Admin(对表的操作) 和 Table(对表数据操作) 3) 执行相关的操作 4) 处理结果集 -- 此步骤只有查询操作 5) 释放资源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Test public void test01 () throws Exception{ Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum" ,"node1:2181,node2:2181,node3:2181" ); Connection hbConn = ConnectionFactory.createConnection(conf); Admin admin = hbConn.getAdmin(); boolean flag = admin.tableExists(TableName.valueOf("WATER_BILL" )); if (!flag){ TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf("WATER_BILL" )); ColumnFamilyDescriptorBuilder familyDesc = ColumnFamilyDescriptorBuilder.newBuilder("C1" .getBytes()); descBuilder.setColumnFamily(familyDesc.build()); TableDescriptor desc = descBuilder.build(); admin.createTable(desc); } admin.close(); hbConn.close(); }
添加数据
1 2 3 4 5 6 7 8 9 1) 创建Hbase的连接对象 2) 通过连接对象 获取相关的管理对象: admin 和 table 3) 执行相关的操作: 添加数据 4) 处理结果集: -- 此步骤不需要 5) 释放资源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Test public void test02 () throws Exception{ Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum" ,"node1:2181,node2:2181,node3:2181" ); Connection hbConn = ConnectionFactory.createConnection(conf); Table table = hbConn.getTable(TableName.valueOf("WATER_BILL" )); Put put = new Put (Bytes.toBytes(4944191 )); put.addColumn("C1" .getBytes(),"NAME" .getBytes(),"登卫红" .getBytes()); put.addColumn("C1" .getBytes(),"ADDRESS" .getBytes(),"贵州省铜仁市德江县7单元267室" .getBytes()); put.addColumn("C1" .getBytes(),"SEX" .getBytes(),"男" .getBytes()); put.addColumn("C1" .getBytes(),"PAY_TIME" .getBytes(),"2020-05-10" .getBytes()); table.put(put); table.close(); hbConn.close(); }
抽取一些公共的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private Connection hbConn;private Table table;private Admin admin;private String tableName = "WATER_BILL" ;@Before public void before () throws Exception{ Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum" ,"node1:2181,node2:2181,node3:2181" ); hbConn = ConnectionFactory.createConnection(conf); admin = hbConn.getAdmin(); table = hbConn.getTable(TableName.valueOf(tableName)); } @Test public void test03 () { } @After public void after () throws Exception{ admin.close(); table.close(); hbConn.close(); }
查询某一条数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Test public void test03 () throws Exception{ Get get = new Get (Bytes.toBytes(4944191 )); Result result = table.get(get); List<Cell> listCells = result.listCells(); for (Cell cell : listCells) { byte [] rowBytes = CellUtil.cloneRow(cell); int rowkey = Bytes.toInt(rowBytes); byte [] familyBytes = CellUtil.cloneFamily(cell); String family = Bytes.toString(familyBytes); byte [] qualifierBytes = CellUtil.cloneQualifier(cell); String qualifier = Bytes.toString(qualifierBytes); byte [] valueBytes = CellUtil.cloneValue(cell); String value = Bytes.toString(valueBytes); System.out.println("rowkey:" +rowkey +"; 列族为:" +family+"; 列名为:" +qualifier+"; 列值为:" +value); } }
删除数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Test public void test04 () throws Exception{ Delete delete = new Delete (Bytes.toBytes(4944191 )); delete.addFamily("C1" .getBytes()); table.delete(delete); }
删除表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Test public void test05 () throws Exception{ boolean flag = admin.isTableEnabled(TableName.valueOf(tableName)); if (flag){ admin.disableTable(TableName.valueOf(tableName)); } admin.deleteTable(TableName.valueOf(tableName)); }
导入数据的操作
1 hbase org.apache.hadoop.hbase.mapreduce.Import 表名 HDFS数据文件路径
1 hbase org.apache.hadoop.hbase.mapreduce.Import WATER_BILL /water_bill/input
基于scan的扫描查询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Test public void scan () throws Exception{ Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum" , "node1:2181,node2:2181,node3:2181" ); Connection hbConn = ConnectionFactory.createConnection(conf); Table table = hbConn.getTable(TableName.valueOf("WATER_BILL" )); Scan scan = new Scan (); FilterList filter = new FilterList (); SingleColumnValueFilter startValueFilter = new SingleColumnValueFilter ("C1" .getBytes(),"RECORD_DATE" .getBytes(), CompareOperator.GREATER_OR_EQUAL,"2020-06-01" .getBytes()); SingleColumnValueFilter endValueFilter = new SingleColumnValueFilter ("C1" .getBytes(),"RECORD_DATE" .getBytes(), CompareOperator.LESS_OR_EQUAL,"2020-07-01" .getBytes()) ; filter.addFilter(startValueFilter); filter.addFilter(endValueFilter); scan.setFilter(filter); ResultScanner results = table.getScanner(scan); for (Result result : results) { List<Cell> cells = result.listCells(); for (Cell cell : cells) { String qulifier = Bytes.toString(CellUtil.cloneQualifier(cell)); if ("NAME" .equals(qulifier)){ String name = Bytes.toString(CellUtil.cloneValue(cell)); System.out.println(name); } } } table.close(); hbConn.close(); }
hbase的架构及原理 hbase高可用架构部署
hbase的高可用, 主要指的是让HBase的主节点, 有多台, 当其中一台出现故障后, 可以让其他的节点顶上来
如何配置呢?
在node1 进入 hbase的conf目录下, 创建一个 backup-masters
1 2 3 4 5 6 cd /export/server/hbase-2.1.0/conf/ vim backup-masters 内容如下: node2.itcast.cn node3.itcast.cn
1 2 3 cd /export/server/hbase-2.1.0/conf/ scp backup-masters node2:$PWD scp backup-masters node3:$PWD
1 2 3 在 node1执行: stop-hbase.sh start-hbase.sh
hbase的集群架构图
region server的上线和下线 region server的上线流程:
说明:
当regionServer上线后, master会立即感知到, 此时regionServer需要上master汇报自己当下管理那些region, 然后master会根据各个regionServer汇报的region的管理情况, 然后在读取meta表获取到所有的region, 与之比较, 查看是否还有未分配的region, 如果有, 将这个未分配的region, 均匀的分配给各个regionServer上, 保证负载均衡
regionServer的下线流程
说明: 当HregionServer下线后, 对应这个regionServer所管理的region就处于无人管理的状态(无分配状态),此时master就需要将这些没有分配的region, 重新分配给其他的regionServer上 即可, 当regionServer有重新上线之后, 从之前regionServer上, 解除一些region, 将region分配给当前这个新启动regionServer上 (存在时间间隔, 不是立即执行)
master的上线和下线
说明:
master短暂的下线, 并不会太大的影响HBase的集群, 因为hbase的读写操作是不经过HMaster, 而大多数的请求都是读写的请求,
master下线主要是会影响到对元数据操作的请求, 比如说 创建表 删除表 修改表
HBase的读写原理 读取数据的流程 1 2 3 4 5 6 7 8 9 10 11 12 Meat表内具体存放哪些信息: rowkey:由四个字段拼接起来,分别是 表名-StratRow-TimeStamp-EncodedName。 数据分为4列: info:regioninfo:EncodedName、RegionName、Region的StartRow、Region的StopRow; info:seqnumDuringOpen:存储Region打开时的sequenceId; info:server:存储Region落在哪个RegionServer上; info:serverstartcode:存储所在的RegionServer启动时间戳;
1 2 3 4 5 6 7 8 9 10 11 由客户端发起读取数据的请求 : scan '表名' 1) 首先第一步连接zookeeper, 从zookeeper中获取 HBase:Meta 表的位置信息(meta被那个regionServer所管理) HBase :Meta 表 是 hbase的管理表, 有且只有一个region 主要是用于存储 hbase的元数据信息, 比如 有那些表, 每个表有那些region, 每个region被那些regionServer管理 ...... 2) 连接对应(hbase meta表的)regionServer, 获取meta表中数据, 从meta表获取对应要查询的表有几个region, 以及这些region被那些regionserver所管理, 与当下要读取表的元数据信息全部都获取到 3) 并发的连接(查询表的)regionServer, 从各个regionServer的region中读取表的数据即可, 如果是基于scan扫描 此时会将所有的数据全部扫描的到客户端(边读边处理)), 如果是get操作, 此时从region中, 找到具体的rowkey 从region读取数据顺序 : memStore --> blockCache --->StoreFile(小File) --> 大HFile
数据的写入流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 客户端发起写入数据请求 : put 'user' ,'rk001','C1:name','张三' 1) 首先连接zookeeper, 获取HBase:Meta表所在的regionServer的地址 2) 连接regionServer, 从meta表获取要写入数据的表,根据rowkey找到对应的region被那个regionServer所管理 3) 连接对应regionServer, 开始进行数据写入操作 4) 首先先将数据写入到这个regionServer的HLog(WAL)日志中, 然后再将数据写入到memStore(可能会写入到多个memStore)中 5) 当HLog 和 memStore都成功将数据写入完成, 此时 客户端的写入流程就结束了, 客户端返回写入成功.... -------------------------以上为客户端流程------------------------------------ 服务端流程 :6) 随着客户端不断的写入, memStore中数据越来越多, 当memStore数据达到一定的阈值(128M/1小时)后, 就会执行flush刷新机制, 将memStore数据 "最终" 刷新到HDFS上, 形成一个storeFile(小File)文件 7) 随着不断的flush的刷新操作, 在HDFS上, 会存储越来越多的小File文件, 当这些小的Hfile文件达到一定的阈值(3个及以上)后, 就会启动compact(合并压缩)机制, 将多个小Hfile "最终" 合并为一个大的HFile文件 8) 随着不断的compact的合并压缩, 这个大的Hfile 也会越来越大, 当这个大的Hfile达到一定的阈值("最终"10GB)后, 就会触发 split机制, 将大的Hfile 进行一分为二, 形成两个新的Hfile, 此时对应region 也会进行一份为二, 形成两个新的region, 每个region管理其中一个新的大Hfile即可, 一旦split分裂完毕, 此时旧的region就会下线(注意: 在执行split分裂过程中, 当下分裂的表是不接受读写请求)
Hbase三大机制 HBase的flush刷新机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 flush机制 : 刷新机制 目的 : 将memStore中最终写入到HDFS上, 形成一个storeFile文件 阈值 : 大小阈值 : 128M 时间阈值 : 1小时 注意 : 满足了那个阈值, 都会触发flush机制 flush流程 : hbase 2.0 flush 1) 关闭掉当前这个达到阈值的memStore空间, 然后开启一个新的memStore, 以便于正常写入 2) 将这个关闭的memStore 首先放入一个 队列容器 (内存)中, 在HBase的2.0版本后, 会让这个队列容器,尽可能晚的刷新到磁盘上, 此时在队列的memStore变更为只读状态, 在读取数据时候, 依然可以从队列中进行读取操作, 以保证读取的效率 3) 随着memStore不断的达到阈值 , 在队列容器中存储的memStore的数量也会越来越多, 当内存达到一定的阈值(?)后, 触发flush操作, 将队列中所有的数据 进行合并操作,然后 一次性刷新到磁盘上, 形成一个storeFile文件; 注意 : 此操作, 仅在hbase2.0版本后才支持, 在2.0版本下, 写入到队列之后, 直接将数据刷新HDFS上, 形成一个个storeFile, 即使刷新慢了, 导致队列中有了多个memStore, 依然一个memStore就是一个storeFile
说明:
内存合并操作, 在hbase2.0后就开始支持了, 但是hbase2.x版本, 默认是不开启内存合并的, 如果开启, 需要手动设置
如何开启内存合并操作:
方式1: 全局配置
配置在: hbase-site.xml ,建议配置 adaptive
方式2: 针对某一个表来设置
合并的方案: 共计有三种
basic(基础型):
作用: 在合并的过程中, 不关心是否有重复数据, 或者过期的版本数据, 直接进行合并即可, 效率最高的
eager(饥渴型):
作用: 在合并的过程中, 会积极的判断数据是否有重复, 以及是否有过期, 会将重复的 过期的版本, 全部清洗掉, 然后合并在一起
adaptive(适应型):
作用: 检查数据, 如果发现数据重复和过期版本 的比例以及达到 eager方案, 就采用饥渴型, 否则就采用基础型
HBase的storeFile的合并机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 compact 合并压缩机制: 目的 : 将多个storeFile(小Hfile) 合并为一个更大的HFile文件 阈值 : 达到3个及以上 minor : 目的 : 将多个小的storeFile 合并为一个较大的Hfile文件过程 流程 : 1) 当storeFile文件数量达到阈值(3个及以上)后, 首先先将这几个小storeFile进行合并操作, 形成一个较大的storeFile文件 2) 在合并过程中, minor 操作不会对数据进行任何的删除, 去掉重复操作, 仅仅做一个基本排序合并工作即可, 整体执行效率是非常快的 major : 目的 : 将这个较大的Hfile 和之前大的Hfile 进行合并形成一个最终的大Hfile操作 流程 : 1) 默认情况下, 当达到一定的阈值(7天|手动)后触发major操作, 将磁盘中较大的HFile和之前大的Hfile进行合并, 形成一个最终的大Hfile文件即可 2) 在合并的过程中, 打上重复标记的数据, 打上过期版本标记的数据, 在major执行过程中, 就会进行全部处理掉 注意 : 由于major在合并的过程中, 需要对所有数据进行合并操作, 此时无法对数据进行相关的操作, 而且此操作由于数据量较大, 执行时间较大, 此操作会Hbase性能影响较大 所以在实际生产中, 一般是关闭major, 改为手动执行 合并做法 : 将HDFS数据读取出来, 边读边进行处理, 边将数据通过追加的形式添加到HDFS上
Hbase的split机制(region分裂)
此公式主要是用于计算 表对应region, 何时执行分裂的操作
1 2 3 4 5 6 比如说 : 当最初始的时候, 表只有一个Region , 此时 1^2 * 128 与 10GB 做比较, 那个小, 我们就会在那个值上执行分裂, 此时第一次应该在 region的Hfile数据量达到 128M 的时候执行分裂 当第二次分裂, R=2, 经过计算后, 当region的Hfile数据量达到 512M的时候, 就会执行分裂 以此类推 直到表的region数量达到 9个及以上的时候, 此时region分裂按照 10GB 分裂一次
HBase的Bulk Load 批量加载操作 假设: 目前有一批数据, 此数据量比较大, 需要将这些数据写入到HBase中, 如果采用hbase的 普通的javaAPI操作, 会将数据正常的写入流程 进入到HBase中, 而正常数据流程: 先将数据写入到HLog 然后将数据写入memStore, 然后从memStore到storeFile, 再从storeFile到Hfile, 而且在整个写入流程中, 需要大量的占用服务器的资源
如果这个时候, 还有大量的请求, 从Hbase的这个表中读取数据, 由于服务器的资源都被写入的请求占用了, 此时读取的请求可能无法实施, 或者返回结果会很慢, 此时对网络的带宽造成较大的影响
思考 如何解决呢?
1 2 3 4 1) 将这一批数据 先转换为 HFile的文件 2) 将HFile文件直接导入HBase中, 让Hbase直接加载即可 此操作不需要先写入HLog 然后到内存, 然后HDFS过程, 直接将数据到达HDFS操作
解决的应用场景
5.1 需求说明
目前在HDFS上有一份 CSV文件, 此文件中记录大量的转账数据, 要求将这些转换数据 存储到Hbase中, 由于初始数据量过于庞大, 可以采用 bulk_load 将数据批量加载HBase中
准备工作
在Hbase中创建目标表:
1 create 'TRANSFER_RECORD','C1'
将数据 上传到 HDFS中
1 2 3 4 5 hdfs dfs -mkdir -p /hbase/bulkload/input rz将数据上传到 Linux中 hdfs dfs -put bank_record.csv /hbase/bulkload/input
在IDEA中构建项目: day02_hbase_bulk_load
导入相关的pom依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 <repositories > <repository > <id > aliyun</id > <url > http://maven.aliyun.com/nexus/content/groups/public/</url > <releases > <enabled > true</enabled > </releases > <snapshots > <enabled > false</enabled > <updatePolicy > never</updatePolicy > </snapshots > </repository > </repositories > <dependencies > <dependency > <groupId > org.apache.hbase</groupId > <artifactId > hbase-client</artifactId > <version > 2.1.0</version > </dependency > <dependency > <groupId > commons-io</groupId > <artifactId > commons-io</artifactId > <version > 2.6</version > </dependency > <dependency > <groupId > org.apache.hbase</groupId > <artifactId > hbase-mapreduce</artifactId > <version > 2.1.0</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-mapreduce-client-jobclient</artifactId > <version > 2.7.5</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-common</artifactId > <version > 2.7.5</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-mapreduce-client-core</artifactId > <version > 2.7.5</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-auth</artifactId > <version > 2.7.5</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-hdfs</artifactId > <version > 2.7.5</version > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > 3.1</version > <configuration > <target > 1.8</target > <source > 1.8</source > </configuration > </plugin > </plugins > </build >
MR-bulkload 将CSV数据转换为HFile文件格式数据(天龙八部)==>内存多的话,需要改成spark来完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 package com.hbase.bulkLoad;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class BulkLoadMapper extends Mapper <LongWritable,Text,ImmutableBytesWritable, Put> { private ImmutableBytesWritable k2 = new ImmutableBytesWritable (); @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if (line != null && !"" .equals(line.trim())){ String[] fields = line.split("," ); byte [] rowkey = fields[0 ].getBytes(); k2.set(rowkey); Put v2 = new Put (rowkey); v2.addColumn("C1" .getBytes(),"code" .getBytes(),fields[1 ].getBytes()); v2.addColumn("C1" .getBytes(),"rec_account" .getBytes(),fields[2 ].getBytes()); v2.addColumn("C1" .getBytes(),"rec_bank_name" .getBytes(),fields[3 ].getBytes()); v2.addColumn("C1" .getBytes(),"rec_name" .getBytes(),fields[4 ].getBytes()); v2.addColumn("C1" .getBytes(),"pay_account" .getBytes(),fields[5 ].getBytes()); v2.addColumn("C1" .getBytes(),"pay_name" .getBytes(),fields[6 ].getBytes()); v2.addColumn("C1" .getBytes(),"pay_comments" .getBytes(),fields[7 ].getBytes()); v2.addColumn("C1" .getBytes(),"pay_channel" .getBytes(),fields[8 ].getBytes()); v2.addColumn("C1" .getBytes(),"pay_way" .getBytes(),fields[9 ].getBytes()); v2.addColumn("C1" .getBytes(),"status" .getBytes(),fields[10 ].getBytes()); v2.addColumn("C1" .getBytes(),"timestamp" .getBytes(),fields[11 ].getBytes()); v2.addColumn("C1" .getBytes(),"money" .getBytes(),fields[12 ].getBytes()); context.write(k2,v2); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 package com.hbase.bulkLoad;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Table;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class BulkLoadDriver extends Configured implements Tool { @Override public int run (String[] args) throws Exception { Job job = Job.getInstance(super .getConf(), "BulkLoadDriver" ); job.setJarByClass(BulkLoadDriver.class); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path ("hdfs://node1:8020/hbase/bulkload/input" )); job.setMapperClass(BulkLoadMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setNumReduceTasks(0 ); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class); job.setOutputFormatClass(HFileOutputFormat2.class); Connection hbConn = ConnectionFactory.createConnection(super .getConf()); Table table = hbConn.getTable(TableName.valueOf("TRANSFER_RECORD" )); HFileOutputFormat2.configureIncrementalLoad(job,table,hbConn.getRegionLocator(TableName.valueOf("TRANSFER_RECORD" ))); HFileOutputFormat2.setOutputPath(job, new Path ("hdfs://node1:8020/hbase/bulkload/output" )); boolean flag = job.waitForCompletion(true ); return flag ? 0 :1 ; } public static void main (String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum" ,"node1:2181,node2:2181,node3:2181" ); int i = ToolRunner.run(conf, new BulkLoadDriver (), args); System.exit(i); } }
将Hfile文件格式数据加载HBase中
语法格式要求
1 hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles MR输出路径 HBase表名
执行导入操作:
1 hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles /hbase/bulkload/output TRANSFER_RECORD
但是, 各位 你们今天执行可能会报错: 尝试了10次 依然无法导入错误
1 2 3 查看 Hbase的日志: regionServer的日志 优先查看 当下执行导入操作的这个regionServer的日志, 如果没有错误 在查询导入的表, 对应region属于哪个regionServer, 查询这个regionServer的日志
1 hbase的采用 域名 和 hdfs的采用的域名不一致导致的, hbase到导入数据的时候, 发现这个域名不一致, 以为不是同一个集群 导致失败
解决方案:
查询 hbase-site.xml中 hbase.root.dir配置的hdfs的域名是什么?
查询 hdfs的 core-site.xml中 fs.defaultFS的配置的hdfs的域名是什么?
如果两个不一致, 建议大家修改 core-site.xml
修改后, 将这这个配置发送给 node2 和 node3
重启 hadoop 和 hbase 即可,然后重新尝试导入操作
Spark bulkLoad 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 object App { def main (args: Array[String]) : Unit = { System.setProperty("HADOOP_USER_NAME" , "root" ) val sparkSession = SparkSession .builder() .config("spark.serializer" , "org.apache.spark.serializer.KryoSerializer" ) .master("local[*]" ) .getOrCreate() val rowKeyField = "id" val df = sparkSession.read.format("json" ).load("/people.json" ) val fields = df.columns.filterNot(_ == "id" ).sorted val data = df.rdd.map { row => val rowKey = Bytes.toBytes(row.getAs(rowKeyField).toString) val kvs = fields.map { field => new KeyValue (rowKey, Bytes.toBytes("hfile-fy" ), Bytes.toBytes(field), Bytes.toBytes(row.getAs(field).toString)) } (new ImmutableBytesWritable (rowKey), kvs) }.flatMapValues(x => x).sortByKey() val hbaseConf = HBaseConfiguration.create(sparkSession.sessionState.newHadoopConf()) hbaseConf.set("hbase.zookeeper.quorum" , "linux-1:2181,linux-2:2181,linux-3:2181" ) hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "hfile" ) val connection = ConnectionFactory.createConnection(hbaseConf) val tableName = TableName.valueOf("hfile" ) creteHTable(tableName, connection) val table = connection.getTable(tableName) try { val regionLocator = connection.getRegionLocator(tableName) val job = Job.getInstance(hbaseConf) job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) job.setMapOutputValueClass(classOf[KeyValue]) HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator) val savePath = "hdfs://linux-1:9000/hfile_save" delHdfsPath(savePath, sparkSession) job.getConfiguration.set("mapred.output.dir" , savePath) data.saveAsNewAPIHadoopDataset(job.getConfiguration) val bulkLoader = new LoadIncrementalHFiles (hbaseConf) bulkLoader.doBulkLoad(new Path (savePath), connection.getAdmin, table, regionLocator) } finally { table.close() connection.close() } sparkSession.stop() } def creteHTable (tableName: TableName, connection: Connection) : Unit = { val admin = connection.getAdmin if (!admin.tableExists(tableName) ) { val tableDescriptor = new HTableDescriptor (tableName) tableDescriptor.addFamily(new HColumnDescriptor (Bytes.toBytes("hfile-fy" ))) admin.createTable(tableDescriptor) } } def delHdfsPath (path: String, sparkSession: SparkSession) { val hdfs = FileSystem.get(sparkSession.sessionState.newHadoopConf()) val hdfsPath = new Path (path) if (hdfs.exists(hdfsPath)) { hdfs.delete(hdfsPath, true ) } } }
HBase和Hive的集成操作 hbase和hive的对比说明
hive: 就是一个数据仓库的工具 , 基于HADOOP, 数据存储在Datanode, 执行翻译为MR, 支持SQL, 支持join 主要是用于离线分析操作 与清洗操作, 延迟较高
hbase: 是一个nosql型数据库, 用于存储数据, 基于hadoop, 数据最终存储在datanode, 不支持SQL, 不支持join 主要是用于接入在线业务, 延迟较低 , 具有高效的随机读写能力
说明:
hive 和 hbase都是基于hadoop的不同的工具, hive和hbase可以集成在一起,hive on hbase ,使用hql进行批量分析查询,若是要求随机读写需集成phoenix。
1.2 hbase如何hive进行集成操作
集成步骤:
将hive提供的一个和hbase整合的通信包, 导入到Hbase的lib目录下
1 2 cd /export/server/hive-2.1.0/lib/ cp hive-hbase-handler-2.1.0.jar /export/server/hbase-2.1.0/lib/
将 这个通信包 发送给 node1 和 node2的hbase的lib目录下
1 2 3 cd /export/server/hbase-2.1.0/lib/ scp hive-hbase-handler-2.1.0.jar node1:/export/server/hbase-2.1.0/lib/ scp hive-hbase-handler-2.1.0.jar node2:/export/server/hbase-2.1.0/lib/
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 cd /export/server/hive-2.1.0/conf/ vim hive-site.xml 添加以下内容: <property > <name > hive.zookeeper.quorum</name > <value > node1,node2,node3</value > </property > <property > <name > hbase.zookeeper.quorum</name > <value > node1,node2,node3</value > </property > <property > <name > hive.server2.enable.doAs</name > <value > false</value > </property >
1 2 3 4 5 cd /export/server/hive-2.1.0/conf/ vim hive-env.sh 添加以下内容: export HBASE_HOME=/export/server/hbase-2.1.0
1 2 3 4 5 6 1) 先启动 zookeeper , 保证zookeeper启动良好 2) 接着启动 hadoop集群, 保证hadoop是启动良好 3) 然后启动 hbase集群, 保证hbase集群是启动良好的 4) 最后启动 hive , 保证hive是启动良好的 说明 : 3 和 4 可以调换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 第一步: 在hbase的shell客户端下, 创建一个表 并添加相关的数据 hbase(main):007:0> create 'hbase_hive_score','cf' hbase(main):007:0> put 'hbase_hive_score' ,'1','cf:name','zhangsan' hbase(main):007:0> put 'hbase_hive_score' ,'1','cf:age','25' hbase(main):008:0> put 'hbase_hive_score' ,'2','cf:name','lisi' hbase(main):009:0> put 'hbase_hive_score' ,'2','cf:age','30' hbase(main):010:0> put 'hbase_hive_score' ,'3','cf:name','wangwu' hbase(main):011:0> put 'hbase_hive_score' ,'3','cf:age','18' hbase(main):012:0> scan 'hbase_hive_score' ROW COLUMN+CELL 1 column=cf:age, timestamp=1615427034130, value=25 1 column=cf:name, timestamp=1615427024464, value=zhangsan 2 column=cf:age, timestamp=1615427052348, value=30 2 column=cf:name, timestamp=1615427045923, value=lisi 3 column=cf:age, timestamp=1615427082291, value=18 3 column=cf:name, timestamp=1615427073970, value=wangwu 第二步: 在hive中对hbase的这个表进行映射匹配操作, 由于数据是被hbase所管理, 在hive中建表选择外部表 语法格式: create external table 表名 ( 字段1 类型, 字段2 类型, 字段3 类型 .... ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,列族1:列名1...') tblproperties('hbase.table.name'='hbase的表名'); 注意: 表名 : 建议和hbase表名保持一致 (不一致也是OK的) 字段 : 建议和hbase列名保持一致 字段的第一个, 建议放置的主键字段, 不需要加primary key create external table day03_hivehbase.hbase_hive_score ( id int, name string, age int ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,cf:name,cf:age') tblproperties('hbase.table.name'='hbase_hive_score');
hive集成存在的问题:
Apache Phoenix Apache Phoenix 仅仅是一款可以通过SQL的方式来操作(CRUD)hbase的工具 ,底层大量的利用hbase的协处理器。
phoenix集成hbase与hive、impala集成的对比
HBase的协处理器 3.1 协处理器的基本介绍
hbase提供的协处理器主要有二大类
1 2 3 4 5 可以将observer看做是 数据库的触发器 或者可以理解为监听器, 可以通过observer提供一些钩子(事件), 对某些事件进行监听操作, 一旦触发了这个事件, 理解通知绑定监听事件的人即可 这类协处理器还可以做什么事情呢? 1) 操作日志记录 2) 权限的管理
<img src="https://onedrive.ipfscan.us.kg/_layouts/52/download.aspx?share=EYN0_O-4wL5EqYbr6x4K3AYBdXJsQuSPqzXRcJuolzlAug" alt="hbase.assets/image-20210311153033576.png" title="image-20210311153033576.png">
1 2 3 4 这类协处理器 可以将其看做是 数据库的中存储过程, 也可以类似于在java中定义一个方法, 将相关的功能放置在这个方法中 即可 一旦定义这样协处理器, 可以将这个协处理器提交到server(服务)端, 有各个服务端来执行这段操作, 将执行的结果返回给客户端, 客户端在根据返回结果做相应的处理即可 作用 : 做 聚合操作 sum count max ...
3.2 如何设置协处理器
设置方式一: 静态设置 全局设置
1 此配置需要配置到hbase-site.xml中 全局有效, 每个hbase表 都会有这个协处理器
设置方式二: 动态设置, 只针对某个表有效
第一步: 禁用表
第二步: 添加协处理器
第三步: 启用表
如何卸载动态设置协处理器:
第一步: 禁用表
第二步: 删除协处理器
第三步: 启用表
apache Phoenix的安装
注意:
1 2 3 1) 在安装完成后, 如果hbase无法启动, 请检查 hbase的配置文件 以及lib目录,3个节点都需要检查 是否OK 2) 在安装完成后, 如果Phoenix无法启动, 一启动就报错, 检查 Phoenix的bin目录下的hbase-site.xml 其内容是否是hbase的conf目录下的那个hbase-site.xml的内容
Apache Phoenix的基本入门操作 Grammar | Apache Phoenix
1 2 3 4 5 6 7 8 9 create table [if not exists ] 表名 ( rowkey名称 数据类型 primary key , 列族名.列名1 数据类型 , 列族名.列名2 数据类型 , 列族名.列名3 数据类型 , 列族名.列名4 数据类型 ..... );
1 2 3 4 5 6 7 8 9 create table order_dtl ( id varchar primary key , c1.status varchar , c1.money integer , c1.pay_way integer , c1.user_id varchar , c1.operation varchar , c1.category varchar );
执行完成后, 可以在hbase的webui中查看到, 多出来一张表, 在此表中默认的region数量为1, 同时给表加入很多协处理器
注意: Phoenix会自动将小写变更为大写: 表名 列族 列名
需求: 字段必须为小写, 不使用大写 , 如何做
create table "order_dtl_01" (
"id" varchar primary key,
"c1"."status" varchar ,
"c1".money integer ,
c1."pay_way" integer ,
c1.user_id varchar,
c1.operation varchar,
c1.category varchar
);
注意: 如果想要使用小写, 只需要在需要小写的内容两端加上双引号(必须为双引号)
单引号 表示是普通字符串
推荐: 建议使用大写, 如果为小写, 后续所有的操作, 只要用到这个小写的内容, 必须加双引号
注意: Phoenix会自动将小写变更为大写: 表名 列族 列名
需求: 字段必须为小写, 不使用大写 , 如何做
1 2 3 4 5 格式: upsert into 表名(列族.列名1,列族.列名2,... ) values(值1,值2 ....) 案例: upsert into ORDER_DTL values('000002','未提交',4070,1,'4944191','2020/04/25 12:09:16','手机');
查询操作: 与标准的SQL是一致的
只不过 不支持 join 不支持多表关联 仅支持单表查询
删除数据: 与标准SQL是一致的
分页查询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 语法: select * from 表 limit 每页显示n条 offset(m-1) (从第m条开始显示n条) 案例: 1) 首先先添加一坨数据 UPSERT INTO "ORDER_DTL" VALUES('000002','已提交',4070,1,'4944191','2020-04-25 12:09:16','手机;'); UPSERT INTO "ORDER_DTL" VALUES('000003','已完成',4350,1,'1625615','2020-04-25 12:09:37','家用电器;;电脑;'); UPSERT INTO "ORDER_DTL" VALUES('000004','已提交',6370,3,'3919700','2020-04-25 12:09:39','男装;男鞋;'); UPSERT INTO "ORDER_DTL" VALUES('000005','已付款',6370,3,'3919700','2020-04-25 12:09:44','男装;男鞋;'); UPSERT INTO "ORDER_DTL" VALUES('000006','已提交',9380,1,'2993700','2020-04-25 12:09:41','维修;手机;'); UPSERT INTO "ORDER_DTL" VALUES('000007','已付款',9380,1,'2993700','2020-04-25 12:09:46','维修;手机;'); UPSERT INTO "ORDER_DTL" VALUES('000008','已完成',6400,2,'5037058','2020-04-25 12:10:13','数码;女装;'); UPSERT INTO "ORDER_DTL" VALUES('000009','已付款',280,1,'3018827','2020-04-25 12:09:53','男鞋;汽车;'); UPSERT INTO "ORDER_DTL" VALUES('000010','已完成',5600,1,'6489579','2020-04-25 12:08:55','食品;家用电器;'); UPSERT INTO "ORDER_DTL" VALUES('000011','已付款',5600,1,'6489579','2020-04-25 12:09:00','食品;家用电器;'); UPSERT INTO "ORDER_DTL" VALUES('000012','已提交',8340,2,'2948003','2020-04-25 12:09:26','男装;男鞋;'); UPSERT INTO "ORDER_DTL" VALUES('000013','已付款',8340,2,'2948003','2020-04-25 12:09:30','男装;男鞋;'); UPSERT INTO "ORDER_DTL" VALUES('000014','已提交',7060,2,'2092774','2020-04-25 12:09:38','酒店;旅游;'); UPSERT INTO "ORDER_DTL" VALUES('000015','已提交',640,3,'7152356','2020-04-25 12:09:49','维修;手机;'); UPSERT INTO "ORDER_DTL" VALUES('000016','已付款',9410,3,'7152356','2020-04-25 12:10:01','维修;手机;'); UPSERT INTO "ORDER_DTL" VALUES('000017','已提交',9390,3,'8237476','2020-04-25 12:10:08','男鞋;汽车;'); UPSERT INTO "ORDER_DTL" VALUES('000018','已提交',7490,2,'7813118','2020-04-25 12:09:05','机票;文娱;'); UPSERT INTO "ORDER_DTL" VALUES('000019','已付款',7490,2,'7813118','2020-04-25 12:09:06','机票;文娱;'); UPSERT INTO "ORDER_DTL" VALUES('000020','已付款',5360,2,'5301038','2020-04-25 12:08:50','维修;手机;'); UPSERT INTO "ORDER_DTL" VALUES('000021','已提交',5360,2,'5301038','2020-04-25 12:08:53','维修;手机;'); UPSERT INTO "ORDER_DTL" VALUES('000022','已取消',5360,2,'5301038','2020-04-25 12:08:58','维修;手机;'); UPSERT INTO "ORDER_DTL" VALUES('000023','已付款',6490,0,'3141181','2020-04-25 12:09:22','食品;家用电器;'); UPSERT INTO "ORDER_DTL" VALUES('000024','已付款',3820,1,'9054826','2020-04-25 12:10:04','家用电器;;电脑;'); UPSERT INTO "ORDER_DTL" VALUES('000025','已提交',4650,2,'5837271','2020-04-25 12:08:52','机票;文娱;'); UPSERT INTO "ORDER_DTL" VALUES('000026','已付款',4650,2,'5837271','2020-04-25 12:08:57','机票;文娱;'); 2) 采用分页查询: 每页显示 5 条 , 显示第一页
Apache Phoenix的预分区操作 通过Phoenix来构建表, 默认情况下, 只有一个region
Phoenix预分区的方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 语法格式: create table [if not exists ] 表名 ( rowkey名称 数据类型 primary key , 列族名.列名1 数据类型 , 列族名.列名2 数据类型 , 列族名.列名3 数据类型 , 列族名.列名4 数据类型 ..... ) compression= 'GZ' split on (region分区方案) ; 案例: drop table order_dtl;create table order_dtl ( id varchar primary key , c1.status varchar , c1.money integer , c1.pay_way integer , c1.user_id varchar , c1.operation varchar , c1.category varchar ) compression= 'GZ' split on ('10' ,'20' ,'30' );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 格式: create table [if not exists ] 表名 ( rowkey名称 数据类型 primary key , 列族名.列名1 数据类型 , 列族名.列名2 数据类型 , 列族名.列名3 数据类型 , 列族名.列名4 数据类型 ..... ) compression= 'GZ' , salt_buckets= N ; 案例 drop table order_dtl;create table order_dtl ( id varchar primary key , c1.status varchar , c1.money integer , c1.pay_way integer , c1.user_id varchar , c1.operation varchar , c1.category varchar ) compression= 'GZ' , salt_buckets= 10 ;
总结: 如果使用Phoenix的加盐预分区方案, Phoenix在添加数据的时候, 会自动在rowkey的前面进行加盐处理, 但是对用户从操作Phoenix角度来说是无感操作,除非我们去hbase查看原始内容
apache Phoenix的视图 默认情况下, Phoenix中只展示由Phoenix自己创建表, 如果说hbase的表是通过hbase自己来构建的, 在Phoenix中无法查看到, 那么也就意味着, 无法通过Phoenix来操作hbase原有表
如果想通过Phoenix对hbase原有表进行SQL的操作, 此时可以利用Phoenix提供的视图来解决
如何实现视图呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 格式: create view "名称空间"."hbase对应的表的表名" ( key varchar primary key , "列族"."列名" 类型, ..... ) [default_colunm_family= '列族名' ]; 注意事项: 视图的名称 一定要与 需要建立视图的hbase的表名是一致的 key的名称是可以任意的,但是必须添加primary key 普通的列, 需要和 hbase的对应表保持一致
案例: 针对 WATER_BILL 表 构建视图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 create view "WATER_BILL" ( ID varchar primary key , C1.NAME varchar , C1.ADDRESS varchar , C1.LATEST_DATE varchar , C1.NUM_CURRENT UNSIGNED_DOUBLE , C1.NUM_PREVIOUS UNSIGNED_DOUBLE , C1.NUM_USAGE UNSIGNED_DOUBLE , C1.PAY_DATE varchar , C1.RECORD_DATE varchar , C1.SEX varchar , C1.TOTAL_MONEY UNSIGNED_DOUBLE ); UNSIGNED_DOUBLE: 无符号的double 类型
查询视图, 观察是否有数据:
查询 六月份的用户的用水量
1 select name,num_usage, record_date from water_bill where record_date between '2020-06-01' and '2020-06-30' ;
Apache Phoenix的二级索引 索引目的: 提高查询的效率
2.1 Phoenix索引的分类
在Phoenix中共计提供了四种索引:
1 2 3 4 5 6 7 在构建了全局索引之后, 会单独的形成一张索引表,单独的索引表与目标表拥有相同的region数量, 当查询数据时候, 可以先到索引表查询, 查询到之后, 再到目标中进行查询即可, 但是如果对数据进行修改, 此时索引表的数据也会随之进行修改操作 注意 : 在修改索引的表, 对于全局索引而言, 需要做全局更新操作, 代价较大, 效率较低 在查询的过程中, 如果SQL语句中出现了非索引的字段, 此时全局索引无法生效 全局索引一般和覆盖索引搭配使用,读的效率很高,但对写入的效率影响较大, 所以说 全局索引 比较适用于 读多 写少的场景
1 2 格式: create index 索引名称 on 表名(列名1 ,列名2 ....)
1 2 格式: drop index 索引名称 on 表名;
*
1 2 3 4 5 在构建了本地索引后, 不会单独创建一张索引表, 索引数据直接附带在目标表对应字段的后面, 这样在进行修改(增 删 改)数据操作的时候 , 直接在目标对索引数据一次性就处理掉了, 适用于写的多场景 在执行查询的时候, Phoenix会自动选择是否使用本地索引, 即使有非索引的字段, 依然可用 注意 : 如果表在构建的时候, 采用的加盐预分区的方案, 建议大家不要使用本地索引, 因为有部分操作是不支持的
1 2 格式: create local index 索引名称 on 表名(列名1 ,列名2 ....)
1 2 格式: drop index 索引名称 on 表名;
1 2 3 覆盖索引无法单独使用, 必须和全局索引或者本地索引配合使用, 在构建覆盖索引后, 将对应索引字段数据, 直接放置在本地索引或者全局索引字段的后面即可, 这样在查询的时候, 可以不去查询目标表, 直接在索引表中可以所有需要的字段数据, 这样可以提升读取数据时间 一般会和全局索引组合使用
适用于: 将一些不参与条件的字段 但是会参与展示的字段 放置在覆盖索引上
如何构建覆盖索引呢?
1 2 格式: create [local ] index 索引名称 on 表名(列名1 , 列名2. ...) include(列1 ,列2. ..)
1 无法单独使用, 在本地索引或者全局索引中使用, 主要用于针对某个函数的结果来构建索引, 这样以后用到这个对应结果数据的函数, 那么也可以执行索引优化
适用于: 多条SQL语句中, 需要频繁的使用某个函数的结果
如何构建呢?
1 2 格式: create [local ] index 索引名称 on 表名(列1 ,列2 ,函数名称(字段...)...)
2.2 案例一: 创建全局索引+覆盖索引
1 2 3 4 5 6 select USER_ID,ID,MONEY from order_dtl where USER_ID= '8237476' ;使用 explain 检查其执行的效率: explain select USER_ID,ID,MONEY from order_dtl where USER_ID= '8237476' ; 结论: 通过扫描全表 获取的结果数据
1 2 1 ) 创建索引: create index idx_index_order_dtl on order_dtl(user_id) include(ID,MONEY);
1 2 2 ) 重新检查刚刚SQL , 是否会执行操作 explain select USER_ID,ID,MONEY from order_dtl where USER_ID= '8237476' ;
如果查询的中, 出现了非索引的字段, 会出现什么问题呢?
总结: 全局索引无法生效
要求: 就想使用全局索引, 而且不想对这个非索引的字段构建索引
解决方案: 通过强制使用全局索引方案 来生效
1 explain select * from order_dtl where USER_ID= '8237476' ;
删除索引
1 drop index idx_index_order_dtl on order_dtl;
2.3 案例二: 本地索引
需求: 根据 订单ID 订单状态 支付金额 支付方式 用户ID 查询数据
1 2 1) 构建索引: create local index IDX_LOCAL_order_dtl on order_dtl(ID,STATUS,MONEY,PAY_WAY,USER_ID);
说明: 虽然可以通过!table看到索引表, 本质上在hbase上没有构建索引表
1 2 2 ) 测试1 : 所有字段都是本地索引 explain select USER_ID,ID,MONEY from order_dtl where USER_ID= '8237476' ;
1 2 3 ) 测试2 : 有部分字段没有索引 explain select USER_ID,ID,MONEY,CATEGORY from order_dtl where USER_ID= '8237476' ;
1 2 3 4 5 3 ) 测试3 : 查询所有的字段 explain select * from order_dtl where USER_ID= '8237476' ; 原因: 表是加盐预分区的操作
注意: 本地索引 会直接对目标表数据产生影响, 所以一旦构建本地索引, 将不再支持通过原生API 查询数据
2.4 案例三: 实现WATER_BILL查询操作
1 2 3 select name,num_usage, record_date from water_bill where record_date between '2020-06-01' and '2020-06-30'; 花费时间为: 8s多
1 2 3 4 5 6 7 8 1 ) 创建索引: 全局| 本地 + 覆盖 create index IDX_INDEX_WATER_BILL ON WATER_BILL(record_date) include (name,num_usage); 2 ) 执行查询操作: select name,num_usage, record_date from water_bill where record_date between '2020-06-01' and '2020-06-30' ; 花费时间: 7 s左右
总结索引的使用
1 2 3 4 5 6 7 8 9 1) 给那些经常查询使用字段添加索引 2) 如果前期经常使用, 但是后期不使用, 请将索引信息删除 3) 如果前期不经常使用,但是后期经常使用, 后期还需要添加索引 索引好处 : 提升查询的效率 弊端 : 数据会冗余存储 占用磁盘空间 索引 : 以空间换时间操作
hbase的表结构设计 hbase的名称空间(命名空间) hbase的名称空间 类似于 在mysql 或者 hive中的数据库, 只不过在hbase将其称为叫做名称空间(命名空间)
思考: mysql或者hive 为什么会有数据库的概念呢?
利于管理,利于维护
业务划分更加明确
可以针对性设置权限
同样的, hbase的名称空间也具有相似作用
在hbase中, 默认情况下, 是有两个名称空间的, 一个 hbase 和 default
名称为 hbase的名称空间: 专门用于放置hbase内部的管理表的, 此名称空间, 一般不操作, hbase自己维护即可
内部有一张核心的元数据表: meta表
此表存储了hbase中所有自定义表的相关的元数据信息
名称为 default 的名称空间: 默认的名称空间, 在创建hbase表的时候, 如果没有指定名称空间, 默认就创建到此空间下
类似于 hive中也有一个default数据库
在测试 学习的时候可以使用
如何操作HBase的名称空间呢?
1 2 3 4 5 6 7 8 9 10 11 12 # 如何创建名称空间: create_namespace '名称空间' # 查看当下有那些名称空间 list_namespace # 查看某一个名称空间 describe_namespace '名称空间' # 删除名称空间 drop_namespace '名称空间' 注意: 在删除这个空间的时候, 需要保证此空间下没有任何表 # 如何在某个名称空间下创建表 create '名称空间名称:表名','列族' ... 注意: 一旦建立在default以外的名称空间下, 在后续操作这个表, 必须携带名称空间,否则hbase会到default空间下
hbase表的列族的设计 结论: 在建表的时候, 如果能用一个列族来解决的, 坚决使用一个即可, 能少则少
原因:
过多的列族会导致在region中出现过多的store模块, 而每个store模块中由一个memStore 和 多个storeFile构成的, 这样会导致数据在存储或者读取的时候, 需要跨域多个内存和多个文件才能将整行的数据都获取到, 此时增大了IO操作, 从而导致效率比较低
而且这个列族多了之后, 会导致region可能会频繁的要进行刷新 和 compact合并操作
多个列族在文件层次上存储在不同的HFile文件中。
思考: 什么场景下 可能会构建多个列族呢? 一般为 2~5
假设有一个表, 表中字段比较多, 但是某些字段不常使用, 此时可以将经常使用的字段放入某一个列族中, 另一个放置不常使用字段即可
假设一个表要对接多个不同业务, 而多个不同业务可能会用到不同的字段, 可以根据业务 划分不同的列族中
hbase表的预分区 在hbase中 表默认情况下 只有一个region, 而一个region只能被一个regionServer所管理
思考:
假设通过默认的方式, 创建了一张hbase的表, 接着需要向这个表写入大量的数据, 同时也有大量的读请求在查询这个表中数据, 此时会发生什么问题呢?
出现问题: 对应管理region的regionServer的服务器可能出现延迟较大, 甚至出现宕机的问题
原因: 只有一个region, 对应只有一个regionServer, 一个regionServer需要承载这个表所有并发读写请求
如何解决呢?
1 如果可以在建表的时候, 一次性构建出多个region, 然后多个region能够均匀的分布在不同的regionServer上, 这样问题及迎刃而解了
实现方式: 通过HBase的预分区
hbase的版本确界和TTL 什么是数据版本确界
数据版本的确界 所描述的就是在hbase中 数据的历史版本的数量
下界: hbase中对应单元格 至少需要保留多少版本, 即使数据已经过期了 ,
上界: hbase中对应单元格 最多可以保留多少个版本, 如果比设置多了, 最早期本部会被覆盖掉
2.6.2 什么是数据的TTL
在hbase中, 可以针对数据设置过期时间, 当时间过期后, hbase会自动将过期数据给清理掉
2.6.3 代码演示数据版本确界和TTL
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public class HBaseTTLTest { public static void main (String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum" ,"node1:2181,node2:2181,node3:2181" ); Connection hbConn = ConnectionFactory.createConnection(conf); Admin admin = hbConn.getAdmin(); if ( !admin.tableExists(TableName.valueOf("day03_hbaseTTL" ))){ TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf("day03_hbaseTTL" )); ColumnFamilyDescriptorBuilder familyDesc = ColumnFamilyDescriptorBuilder.newBuilder("C1" .getBytes()); familyDesc.setMinVersions(3 ); familyDesc.setMaxVersions(5 ); familyDesc.setTimeToLive(30 ); ColumnFamilyDescriptor family = familyDesc.build(); descBuilder.setColumnFamily(family); TableDescriptor desc = descBuilder.build(); admin.createTable(desc); } Table table = hbConn.getTable(TableName.valueOf("day03_hbaseTTL" )); for (int i = 1 ; i<= 2 ; i++){ Put put = new Put ("rk001" .getBytes()); put.addColumn("C1" .getBytes(),"NAME" .getBytes(),("zhangsan" +i).getBytes()); table.put(put); } Get get = new Get ("rk001" .getBytes()); get.readAllVersions(); Result result = table.get(get); List<Cell> cells = result.listCells(); for (Cell cell : rawCells) { System.out.println(Bytes.toString(CellUtil.cloneValue(cell))); } table.close(); admin.close(); hbConn.close(); } }
总结:
即使所有的历史版本的数据都过期了, hbase也会至少保留 min_version个最新版本数据, 以保证我们在查询数据的时候. 可以有数据返回
hbase的中rowkey的设计原则 说明:
单纯的通过预分区 是无法解决 向hbase存储数据的高并发的读写问题, 因为如果所有的数据都是某一个region内的数据, 此时依然相当于表只有一个region
解决方案:
需要在预分区的基础, 让rowkey的值 能够均匀的落在不同的region上, 才可以解决
所以: rowkey设计良好, 直接关系到, 数据应该存储到那个region中, 如果设置不良好, 可能会导致数据倾斜问题
设置rowkey有什么样原则呢?
1 2 3 4 5 6 7 8 1) 避免将 递增行键/时序数据 放置在rowkey最前面 2) 避免rowkey和列的长度过大 说明, 太长rowkey 和列 会导致 占用空间更大, 导致在内存中存储数据行数会更少, 从而最终导致提前进行flush操作, 影响效率 rowkey支持大小 : 64kb 一般情况下 : 建议在 100字节以下 , 大部分长度为 10~20左右 3) 使用Long等类型 比String类型更省空间 4) 保证rowkey的唯一性
如何避免热点的问题呢?
反转操作
弊端 : 如果是相关性比较强的数据, 此种打乱会导致读写效率降低
加盐策略(随机在rowkey前面加一个随机字符串或者随机数)
弊端: 如果是相关性比较强的数据, 此种打乱会导致读写效率降低
哈希策略 (MurmurHash3)
hbase的表的压缩方案的选择
在生产中如何选择压缩方式呢?
1 2 1) 必须要压缩, 如果表的数据, 需要进行频繁的数据读写操作, 而且数据量比较大, 建议采用 snappy压缩方案 2) 必须要压缩, 如果表的数据, 需要进行大量写入操作, 但是读取操作不是特别频繁, 建议采用 GZIP|GZ
如何在hbase设置表的压缩方案? 默认情况下HBASE是不压缩的, 压缩方案是针对列族来说的
配置操作:
1 2 3 格式: 建表时: create '表名' , {NAME=>'列族',COMPRESSION=>'GZ|SNAPPY'} 对已经存在表, 通过修改表结构: alter '表名', {NAME=>'列族',COMPRESSION=>'GZ|SNAPPY'}
注意:
如果需要采用SNAPPY 压缩方案, 可能需要对HBase进行重新编译 , 在编译时类似于 Hadoop, 将支持压缩的C接口放入hbase编译包中 才可以
如果要采用LZO的压缩方案, 需要放置LZO的压缩的jar包
HBase数据结构 简介
传统关系型数据库,一般都选择使用B+树作为索引结构,而在大数据场景下,HBase、Kudu这些存储引擎选择的是LSM树。LSM树,即日志结构合并树(Log-Structured Merge-Tree)。
LSM树设计思想
LSM 的主要思想是划分不同等级的结构,换句话来理解,就是LSM中不止一个数据结构,而是存在多种结构
一个结构在内存、其他结构在磁盘(HBase存储结构中,有内存——MemStore、也有磁盘——StoreFile)
内存的结构可以是B树、红黑树、跳表等结构(HBase中是跳表),磁盘中的树就是一颗B+树
C0层保存了最近写入的数据,数据都是有序的,而且可以随机更新、随机查询
C1到CK层的数据都是存在磁盘中,每一层中key都是有序存储的