hbase

12/27/2023 hbase

# 创建表

命令格式1:create '表名','列簇名1','列簇名2'...
命名格式2:create '表名',{NAME=>'列簇名1'},{NAME=>'列簇名2'}...

#创建一张名为student的表,包含基本信息(baseinfo)、学校信息(schoolinfo)两个列簇
create 'student','baseinfo','schoolinfo'
1
2

# 删除表

#删除表前需要先禁用表
disable 'student'
#删除表
drop 'student'
1
2
3
4

# 修改表名

先快照,再用快照构建新表,最后删除快照

snapshot 'student','temp'
clone_snapshot 'temp','stu'
delect_snapshot 'temp'
1
2
3

# 查看所有表

list
1

# 查看表基本信息

desc 'student'
1

# 检查表是否存在

exists 'student'
1

# 表的启用和禁用

#禁用表
disable 'student'
#检查表是否被禁用
is_disabled 'student'
#启用表
enable 'student'
#检查表是否被启用
is_enabled 'student'
1
2
3
4
5
6
7
8

# 新增列簇

命令格式:alter '表名','列簇名'

alter 'student','teacherinfo'
1

# 删除列簇

命令格式:alter '表名',{NAME=>'列簇名',METHOD='delete'}

alter 'student', {NAME => 'teacherinfo', METHOD => 'delete'}
1

# 插入表数据

命令格式:put '表名','行键','列簇名:列名','值'[,时间戳]

put 'student', 'r1','baseinfo:name','tom'
put 'student', 'r1','baseinfo:birthday','1990-01-09'
put 'student', 'r1','baseinfo:age','29'
put 'student', 'r1','schoolinfo:name','Havard'
put 'student', 'r1','schoolinfo:localtion','Boston'

put 'student', '2','baseinfo:name','jack'
put 'student', '2','baseinfo:birthday','1998-08-22'
put 'student', '2','baseinfo:age','21'
put 'student', '2','schoolinfo:name','yale'
put 'student', '2','schoolinfo:localtion','New Haven'

put 'student', '3','baseinfo:name','maike'
put 'student', '3','baseinfo:birthday','1995-01-22'
put 'student', '3','baseinfo:age','24'
put 'student', '3','schoolinfo:name','yale'
put 'student', '3','schoolinfo:localtion','New Haven'

put 'student', '4','baseinfo:name','maike-jack'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 删除指定行、指定行中的列

# 删除指定行
delete 'student','3'
# 删除指定行中指定列的数据
delete 'student','3','baseinfo:name'
1
2
3
4

# get查询

# 获取指定行中所有列的数据信息
get 'student','3'
# 获取指定行中指定列族下所有列的数据信息
get 'student','3','baseinfo'
# 获取指定行中指定列的数据信息
get 'student','3','baseinfo:name'
1
2
3
4
5
6

# scan查询

#查询整表数据
scan 'student'
#查询指定列簇的数据
scan 'student', {COLUMN=>'baseinfo'}
# 查询指定列的数据
scan 'student', {COLUMNS=> 'baseinfo:birthday'}
# 查看指定列两个版本的数据(3.3中我们设置了3个版本)
scan 'student', {COLUMNS=> 'baseinfo:birthday',VERSIONS=>2}
# 查看前3条数据
scan 'student',{LIMIT=>3}
1
2
3
4
5
6
7
8
9
10

# java 操作hbase 已验证

hbase的例子1 (opens new window)
博客中的api取自hbase的例子2 (opens new window)

package org.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class HBaseAPI {
    // 声明静态配置
    private static Configuration conf = null;

    static {
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "192.168.216.100");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
    }

    /**
     * 检查指定的表是否存在于HBase中
     *
     * @param tableName 要检查的表的名称
     * @return 如果表存在,则返回true;否则返回false
     * @throws IOException 如果在检查过程中发生IO异常
     */
    private static boolean isExist(String tableName) throws IOException {
        // 使用try-with-resources语句确保资源(如数据库连接)在使用后能被正确关闭
        try (Connection connection = ConnectionFactory.createConnection(conf);
             Admin hAdmin = connection.getAdmin()) {
            // 通过表名检查表是否存在
            return hAdmin.tableExists(TableName.valueOf(tableName));
        }
    }


    /**
     * 根据给定的表名和列族创建HBase表
     * 如果表已经存在,则输出提示信息;否则,创建新表
     *
     * @param tableName 表的名称
     * @param columnFamilies 列族的名称数组
     * @throws Exception 如果创建表时发生错误,则抛出异常
     */
    public static void createTable(String tableName, String[] columnFamilies)
            throws Exception {
        // 使用try-with-resources语句确保连接和管理员资源在使用后正确关闭
        try (Connection connection = ConnectionFactory.createConnection(conf);
             Admin admin = connection.getAdmin()) {
            TableName tName = TableName.valueOf(tableName);
            // 检查表是否已经存在
            if (admin.tableExists(tName)) {
                System.out.println("表 " + tableName + " 已存在!");
            } else {
                // 创建表描述符,并为每个列族添加描述
                HTableDescriptor desc = new HTableDescriptor(tName);
                for (String columnFamily : columnFamilies) {
                    desc.addFamily(new HColumnDescriptor(columnFamily));
                }
                // 创建表
                admin.createTable(desc);
                System.out.println("创建表 " + tableName + " 成功!");
            }
        }
    }


    /**
     * 根据表名删除HBase表
     * <p>
     * 此方法首先检查表是否存在,如果存在,则禁用表,然后删除它如果表不存在,它将打印一条消息表示表不存在
     * 使用try-with-resources语句确保数据库连接和管理员资源在使用后正确关闭
     *
     * @param tableName 要删除的表的名称
     * @throws Exception 如果删除表时发生错误,则抛出异常
     */
    public static void deleteTable(String tableName) throws Exception {
        // 创建到HBase的连接并获取管理员权限
        try (Connection connection = ConnectionFactory.createConnection(conf);
             Admin admin = connection.getAdmin()) {
            // 将表名转换为HBase的TableName对象
            TableName tName = TableName.valueOf(tableName);
            // 检查表是否存在
            if (admin.tableExists(tName)) {
                // 禁用表,以便它可以被安全地删除
                admin.disableTable(tName);
                // 删除表
                admin.deleteTable(tName);
                // 提示表删除成功
                System.out.println("删除表 " + tableName + " 成功!");
            } else {
                // 提示表不存在
                System.out.println("删除的表 " + tableName + " 不存在!");
            }
        }
    }


    /**
     * 向指定的HBase表中添加一行数据
     *
     * @param tableName 表名,必须是合法的HBase表名
     * @param rowKey 新行的行键
     * @param columnFamily 列族名,指定数据存储的列族
     * @param column 列名,在列族中的具体列
     * @param value 列的值,要存储的数据
     * @throws Exception 如果添加数据过程中发生错误,则抛出异常
     */
    public static void addRow(String tableName, String rowKey,
                              String columnFamily, String column, String value) throws Exception {
        // 创建HBase连接和表对象
        try (Connection connection = ConnectionFactory.createConnection(conf);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            // 构建Put对象,指定要添加的数据行
            Put put = new Put(Bytes.toBytes(rowKey));
            // 添加列值到行中
            put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
            // 将行数据添加到表中
            table.put(put);
            System.out.println("数据插入成功");
        }
        // 资源自动管理,确保连接和表对象在使用后正确关闭
    }
    /**
     * 批量插入数据到HBase表中
     *
     * @param tableName 表名,用于定位数据存储的位置
     * @param columnFamily 列族名,HBase中数据存储的单元
     * @param dataList 数据列表,包含待插入的所有数据
     * @throws IOException 如果操作过程中发生IO异常
     */
    public static void executeInsertBatch(String tableName, String columnFamily,
                                          List<DataModel> dataList) throws IOException  {
        // 使用try-with-resources语句确保数据库连接和表的资源在使用后能被正确关闭
        try (Connection connection = ConnectionFactory.createConnection(conf);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            // 创建一个Put操作的列表,用于批量插入数据
            List<Put> puts = new ArrayList<>();
            // 遍历数据列表,为每条数据创建一个Put操作,并添加到puts列表中
            for (DataModel dataModel : dataList) {
                String rowKey = dataModel.getRowKey();
                // 创建一个Put操作,指定行键和列族
                Put put = new Put(Bytes.toBytes(rowKey));
                // 添加具体列的数据
                put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("rank"), Bytes.toBytes(dataModel.getRank()));
                put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("title"), Bytes.toBytes(dataModel.getTitle()));
                put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("rating"), Bytes.toBytes(dataModel.getRating()));
                put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("year"), Bytes.toBytes(dataModel.getYear()));
                put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("genre"), Bytes.toBytes(dataModel.getGenre()));
                puts.add(put);
            }
            // 执行批量插入操作
            table.put(puts);
            // 插入成功后打印提示信息
            System.out.println("数据批量插入成功");
        }
    }


    /**
     * 批量插入数据到HBase表中
     *
     * @param tableName 表名,用于定位数据存储的位置
     * @param columnFamily 列族名,HBase中数据存储的单元
     * @param column 列名,用于指定具体要存储的列
     * @param dataList 数据列表,包含待插入的所有数据
     * @throws IOException 如果操作过程中发生IO异常
     */
    public static void executeInsertBatch(String tableName, String columnFamily, String column, List<DataModel> dataList) throws IOException {
        // 使用try-with-resources语句确保数据库连接和表的资源在使用后能被正确关闭
        try (Connection connection = ConnectionFactory.createConnection(conf);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            // 创建一个Put操作的列表,用于批量插入数据
            List<Put> puts = new ArrayList<>();
            // 遍历数据列表,为每条数据创建一个Put操作,并添加到puts列表中
            for (DataModel dataModel : dataList) {
                String rowKey = dataModel.getRowKey();
                // 创建一个Put操作,指定行键和列族
                Put put = new Put(Bytes.toBytes(rowKey));
                // 添加具体列的数据
                put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(dataModel.getCountry()));
                puts.add(put);
            }
            // 执行批量插入操作
            table.put(puts);
            // 插入成功后打印提示信息
            System.out.println("数据批量插入成功");
        }
    }



    /**
     * 从指定的表中删除一行数据
     *
     * @param tableName 表名,用于定位数据存储的位置
     * @param rowKey 要删除的行的键,唯一标识一行数据
     * @throws Exception 如果删除操作遇到任何问题,将抛出异常
     */
    public static void delRow(String tableName, String rowKey) throws Exception {
        // 创建HBase的连接,并获取指定表的对象
        try (Connection connection = ConnectionFactory.createConnection(conf);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            // 构造删除操作的对象,并指定要删除的行键
            Delete del = new Delete(Bytes.toBytes(rowKey));
            // 执行删除操作
            table.delete(del);
            System.out.println("数据删除成功");
        }
        // 资源管理器将自动关闭Connection和Table对象
    }


    /**
     * 批量删除HBase表中的行
     *
     * @param tableName 表名,用于定位要操作的HBase表
     * @param rowKeys 待删除的行的键列表,每个键对应一行数据
     * @throws Exception 如果删除过程中发生任何错误,将抛出异常
     *
     * 此方法通过创建与HBase的连接,然后指定表名和待删除的行键列表,
     * 来批量删除表中的多行数据它使用了try-with-resources语句来确保
     * 资源(如Connection和Table)在使用后能被正确关闭
     */
    public static void delMultiRows(String tableName, List<String> rowKeys)
            throws Exception {
        // 创建与HBase的连接,并获取指定表的对象
        try (Connection connection = ConnectionFactory.createConnection(conf);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            // 初始化一个用于存储Delete操作的列表
            List<Delete> delList = new ArrayList<>();
            // 遍历待删除的行键列表
            for (String rowKey : rowKeys) {
                // 创建Delete对象,指定要删除的行键
                Delete del = new Delete(Bytes.toBytes(rowKey));
                // 将Delete对象添加到列表中
                delList.add(del);
            }
            // 执行批量删除操作
            table.delete(delList);
            // 打印删除成功的消息
            System.out.println("数据批量删除成功");
        }
        // 自动关闭资源
    }


    /**
     * 从指定的HBase表中获取指定行的数据
     *
     * @param tableName 表名,作为HBase表的标识
     * @param rowKey 指定的行键,用于唯一标识一行数据
     * @throws Exception 如果在获取数据过程中遇到任何问题,将抛出异常
     */
    public static void getRow(String tableName, String rowKey) throws Exception {
        // 创建HBase连接和表对象
        try (Connection connection = ConnectionFactory.createConnection(conf);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            // 构造Get请求,指定要获取的行键
            Get get = new Get(Bytes.toBytes(rowKey));
            // 执行Get请求,获取结果
            Result result = table.get(get);
            // 检查结果是否为空,如果非空则遍历并打印每个单元格的数据
            if (!result.isEmpty()) {
                for (Cell cell : result.rawCells()) {
                    // 提取并打印每个单元格的列族、列限定符、值和时间戳信息
                    byte[] family = CellUtil.cloneFamily(cell);
                    byte[] qualifier = CellUtil.cloneQualifier(cell);
                    byte[] value = CellUtil.cloneValue(cell);
                    long timestamp = cell.getTimestamp();
                    System.out.print("行名:" + Bytes.toString(result.getRow()));
                    System.out.print(" 时间戳:" + timestamp);
                    System.out.print(" 列族名:" + Bytes.toString(family));
                    System.out.print(" 列名:" + Bytes.toString(qualifier));
                    System.out.println(" 值:" + Bytes.toString(value));
                }
            } else {
                // 如果结果为空,则说明未找到指定行键的数据,打印提示信息
                System.out.println("未找到 " + rowKey + " 的记录");
            }
        }
        // 自动关闭资源,包括连接和表对象
    }


    /**
     * 获取表中的所有行数据
     * 该方法通过创建HBase客户端连接,扫描整个表,并打印出每一行的详细信息,包括行键、列族、列限定符、值和时间戳
     *
     * @param tableName 表名,用于定位要查询的数据表
     * @throws Exception 如果在连接数据库或获取数据过程中出现错误,则抛出异常
     */
    public static void getAllRows(String tableName) throws Exception {
        // 创建数据库连接,表对象和扫描器对象,用于从HBase表中读取数据
        try (Connection connection = ConnectionFactory.createConnection(conf);
             Table table = connection.getTable(TableName.valueOf(tableName));
             ResultScanner scanner = table.getScanner(new Scan())) {
            // 遍历扫描器返回的结果集,即表中的每一行数据
            for (Result result = scanner.next(); result != null; result = scanner.next()) {
                // 遍历每一行中的所有单元格,即列的值
                for (Cell cell : result.rawCells()) {
                    // 提取并打印单元格的列族、列限定符、值和时间戳信息
                    byte[] family = CellUtil.cloneFamily(cell);
                    byte[] qualifier = CellUtil.cloneQualifier(cell);
                    byte[] value = CellUtil.cloneValue(cell);
                    long timestamp = cell.getTimestamp();

                    // 打印行名、时间戳、列族名、列名和值信息
                    System.out.print("行名:" + Bytes.toString(result.getRow()));
                    System.out.print(" 时间戳:" + timestamp);
                    System.out.print(" 列族名:" + Bytes.toString(family));
                    System.out.print(" 列名:" + Bytes.toString(qualifier));
                    System.out.println(" 值:" + Bytes.toString(value));
                }
            }
        }
        // 资源管理器将会自动关闭连接、表和扫描器对象
    }

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323

依赖
由于程序依赖log4j:log4j,所以要把该包的排除操作注释掉


<properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
    <flink.version>1.14.6</flink.version>
    <spark.version>3.1.1</spark.version> <!--k8s  3.3.0   local  2.4.0-->
    <hive.version>3.1.0</hive.version>
    <kafka.version>2.5.0</kafka.version>
    <scala.version>2.12</scala.version>
    <hadoop.version>3.1.1</hadoop.version>
    <apache-phoenix-version>4.14.2-HBase-1.4</apache-phoenix-version>
    <scope.type.test>test</scope.type.test>
    <scope.type.compile>compile</scope.type.compile>
    <scope.type.provided>provided</scope.type.provided>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <apollo.version>1.5.1</apollo.version>
    <jedis.version>3.3.0</jedis.version>
    <lombok.version>1.18.10</lombok.version>
    <logback.classic.version>1.2.9</logback.classic.version>
    <guava.version>30.1.1-jre</guava.version>
    <elasticsearch.version>6.7.0</elasticsearch.version>
    <dingtalk.version>1.0.1</dingtalk.version>
    <commons.dbutils.version>1.6</commons.dbutils.version>
    <mysql.connector.version>8.0.29</mysql.connector.version>
    <tdengine.connector.version>3.2.2</tdengine.connector.version>
    <hikaricp.version>4.0.3</hikaricp.version>
    <jpmml.version>1.4.13</jpmml.version>
    <scala.logging.version>3.9.2</scala.logging.version>
    <logback.kafka.appender.version>0.2.0-RC2</logback.kafka.appender.version>
    <junit.version>4.13.1</junit.version>
    <flink.jackson.version>2.10.1-12.0</flink.jackson.version>
    <jackson.version>2.14.3</jackson.version>
    <okhttp.version>3.14.9</okhttp.version>
    <parquet.avro.version>1.11.1</parquet.avro.version>
    <avro.version>1.10.0</avro.version>
    <httpclient.version>4.5.13</httpclient.version>
    <httpasyncclient.version>4.1.3</httpasyncclient.version>
    <httpcore.nio.version>4.4.13</httpcore.nio.version>
    <httpcore.version>4.4.13</httpcore.version>
    <findbugs.version>3.0.2</findbugs.version>
    <protobuf.version>3.19.4</protobuf.version>
    <okio.version>1.17.2</okio.version>
    <nacos.version>2.1.2</nacos.version>
    <codec.version>1.15</codec.version>
    <commons.cli.version>1.4</commons.cli.version>
    <aws.version>1.11.788</aws.version>
    <scala.maven.plugin.version>4.0.2</scala.maven.plugin.version>
    <spark.scala.version>2.12</spark.scala.version>
    <scala.lib.version>2.12.18</scala.lib.version>
    <spark.jackson.version>2.10.1</spark.jackson.version>
    <hbase.version>2.2.7</hbase.version>
</properties>
<dependencies>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>${lombok.version}</version>
</dependency>
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>${logback.classic.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>${hadoop.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>${hbase.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>${hbase.version}</version>
    <exclusions>
        <exclusion>
            <artifactId>hadoop-annotations</artifactId>
            <groupId>org.apache.hadoop</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-common</artifactId>
    <version>${hbase.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
            <source>${maven.compiler.source}</source>
            <target>${maven.compiler.target}</target>
            <encoding>${project.build.sourceEncoding}</encoding>
        </configuration>
    </plugin>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.1</version>
        <!-- 去掉dependency-reduced-pom.xml -->
        <configuration>
            <createDependencyReducedPom>false</createDependencyReducedPom>
        </configuration>
        <executions>
            <execution>
                <phase>package</phase>
                <goals>
                    <goal>shade</goal>
                </goals>
                <configuration>

                    <!--项目中,存在一个jar包,版本不同,还必须保留两种版本-->
                    <!--举例:flink cep和cdc都需要guava,但一个需要18版本,一个需要30版本-->
                    <!--这个表示会生成两个jar包,一个原始的,一个shade的-->
                    <shadedArtifactAttached>true</shadedArtifactAttached>
                    <!--这个表示生成的shade的包 它的后缀名称是什么,通过这个后缀名称,在引用的时候的,就不会出现引用了shade包的情况了。 -->
                    <shadedClassifierName>with-guava-${flink.version}</shadedClassifierName>
                    <relocations>
                        <relocation>
                            <pattern>org.apache.flink.shaded.guava18.com.google</pattern>
                            <shadedPattern>hidden.org.apache.flink.shaded.guava18.com.google</shadedPattern>
                            <includes>
                                <include>org.apache.flink.shaded.guava18.com.google.*</include>
                            </includes>
                        </relocation>
                    </relocations>


                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>module-info.class</exclude>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                                <exclude>core-site.xml</exclude>
                                <exclude>hive-site.xml</exclude>
                                <exclude>hdfs-site.xml</exclude>
                            </excludes>
                        </filter>
                        <filter>
                            <!--这里重写了 tomcat-embed-core 包的 Tomcat 类-->
                            <!--就让 tomcat-embed-core 包的 Tomcat 类在打包时排除,让自己写的不被覆盖-->
                            <artifact>org.apache.flink:flink-connector-jdbc_2.11</artifact>
                            <excludes>
                                <exclude>org/apache/flink/connector/jdbc/dialect/AbstractDialect.java</exclude>
                                <exclude>org/apache/flink/connector/jdbc/dialect/JdbcDialects.java</exclude>
                                <exclude>
                                    org/apache/flink/connector/jdbc/internal/converter/AbstractJdbcRowConverter.java
                                </exclude>
                            </excludes>
                        </filter>
                    </filters>
                    <finalName>${project.artifactId}-jar-with-dependencies</finalName>
                    <transformers>
                        <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <mainClass>reference.conf</mainClass>
                        </transformer>
                        <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                            <resource>reference.conf</resource>
                        </transformer>
                        <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                    </transformers>
                    <artifactSet>
                        <excludes>
                            <exclude>org.slf4j:slf4j-simple</exclude>
                            <exclude>org.slf4j:slf4j-log4j12</exclude>
                            <!--<exclude>log4j:log4j</exclude>-->
                            <exclude>org.apache.logging.log4j:log4j-api</exclude>
                            <exclude>org.apache.logging.log4j:log4j-core</exclude>
                            <exclude>org.apache.logging.log4j:log4j-slf4j-impl</exclude>
                        </excludes>
                    </artifactSet>
                </configuration>
            </execution>
        </executions>
    </plugin>
</plugins>
</build>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193

# spark 操作hbase 已验证

package org.example

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Author apophis
 * @File SparkWriteHbaseExample
 * @Time 2024/5/13 8:53
 * @Description docker cp D:\IdeaProject\person\example-demo\target\example-demo-jar-with-dependencies.jar master:/data/jar
 *              此处数据源是project-spark-cosmetics-analysis项目的clean.csv
 *              spark-submit --master local[4] --class org.example.SparkWriteHbaseExample example-demo-jar-with-dependencies.jar
 *              hbase命令:
 *              create 'test_table_thr','cf1','cf2'
 *              scan 'test_table_thr'
 */
object SparkWriteHbaseExample {
  def main(args: Array[String]): Unit = {
    // 设置运行环境
    val sparkConf: SparkConf = new SparkConf()

    val spark: SparkSession = SparkSession.builder()
      .config(sparkConf)
      .appName("SparkWriteHbaseExample")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("ERROR")
    val df: DataFrame = spark.read.format("csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .load("/output/data_clean/clean.csv")
      .limit(10)
      .select("id", "price", "brand")
      .toDF()

    df.show(20)
    val cf1: Array[Byte] = Bytes.toBytes("cf1")
    val col_price: Array[Byte] = Bytes.toBytes("price")
    val cf2: Array[Byte] = Bytes.toBytes("cf2")
    val col_brand: Array[Byte] = Bytes.toBytes("brand")

    val hbaseTableName = "test_table_thr"
    df.rdd.foreachPartition(list => {
      val hbaseConf: Configuration = HBaseConfiguration.create()
      hbaseConf.set("hbase.zookeeper.quorum", "master");
      hbaseConf.set("hbase.zookeeper.property.clientPort", "2181");
      val conn: Connection = ConnectionFactory.createConnection(hbaseConf)
      val table: Table = conn.getTable(TableName.valueOf(hbaseTableName))
      val putList = new java.util.LinkedList[Put]()
      list.foreach(row => {
        val rk: Array[Byte] = Bytes.toBytes(row.getAs[String]("id"))
        val price: Array[Byte] = Bytes.toBytes(row.getAs[Double]("price"))
        val brand: Array[Byte] = Bytes.toBytes(row.getAs[String]("brand"))
        val put = new Put(rk)
        put.addColumn(cf1, col_price, price)
        put.addColumn(cf2, col_brand, brand)
        putList.add(put)
      })
      table.put(putList)
      table.close()
    })
    sc.stop()


    spark.stop()
  }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>example-demo</artifactId>
    <version>1.0-SNAPSHOT</version>


    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
        <spark.version>3.1.2</spark.version> <!--k8s  3.3.0   local  2.4.0-->
        <hive.version>3.1.2</hive.version>
        <kafka.version>2.5.0</kafka.version>
        <scala.version>2.12</scala.version>
        <hadoop.version>3.2.0</hadoop.version>
        <apache-phoenix-version>4.14.2-HBase-1.4</apache-phoenix-version>
        <scope.type.test>test</scope.type.test>
        <scope.type.compile>compile</scope.type.compile>
        <scope.type.provided>compile</scope.type.provided>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <apollo.version>1.5.1</apollo.version>
        <jedis.version>3.3.0</jedis.version>
        <lombok.version>1.18.10</lombok.version>
        <logback.classic.version>1.2.9</logback.classic.version>
        <guava.version>30.1.1-jre</guava.version>
        <elasticsearch.version>6.7.0</elasticsearch.version>
        <dingtalk.version>1.0.1</dingtalk.version>
        <commons.dbutils.version>1.6</commons.dbutils.version>
        <mysql.connector.version>8.0.33</mysql.connector.version>
        <tdengine.connector.version>3.2.2</tdengine.connector.version>
        <hikaricp.version>4.0.3</hikaricp.version>
        <jpmml.version>1.4.13</jpmml.version>
        <scala.logging.version>3.9.2</scala.logging.version>
        <logback.kafka.appender.version>0.2.0-RC2</logback.kafka.appender.version>
        <junit.version>4.13.1</junit.version>
        <flink.jackson.version>2.10.1-12.0</flink.jackson.version>
        <jackson.version>2.14.3</jackson.version>
        <okhttp.version>3.14.9</okhttp.version>
        <parquet.avro.version>1.11.1</parquet.avro.version>
        <avro.version>1.10.0</avro.version>
        <httpclient.version>4.5.13</httpclient.version>
        <httpasyncclient.version>4.1.3</httpasyncclient.version>
        <httpcore.nio.version>4.4.13</httpcore.nio.version>
        <httpcore.version>4.4.13</httpcore.version>
        <findbugs.version>3.0.2</findbugs.version>
        <protobuf.version>3.19.4</protobuf.version>
        <okio.version>1.17.2</okio.version>
        <nacos.version>2.1.2</nacos.version>
        <codec.version>1.15</codec.version>
        <commons.cli.version>1.4</commons.cli.version>
        <aws.version>1.11.788</aws.version>
        <scala.maven.plugin.version>4.0.2</scala.maven.plugin.version>
        <scala.lib.version>2.12.18</scala.lib.version>
        <spark.jackson.version>2.10.1</spark.jackson.version>
        <jblas.version>1.2.3</jblas.version>
        <spark.redis.version>3.1.0</spark.redis.version>
        <hbase.version>2.2.7</hbase.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.classic.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <!--可以理解为驱动包,可能不需要-->
        <!--<dependency>
            <groupId>org.apache.hbase.connectors.spark</groupId>
            <artifactId>hbase-spark</artifactId>
            <version>1.0.1</version>
        </dependency>-->

    </dependencies>
    <build>
        <plugins>
            <!-- scala编译插件 mixed Java/Scala projects, see https://davidb.github.io/scala-maven-plugin/example_java.html -->
            <plugin>
                <!-- see http://davidb.github.com/scala-maven-plugin -->
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.lib.version}</scalaVersion>
                    <args>
                        <arg>-target:jvm-1.8</arg>
                    </args>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <!-- 去掉dependency-reduced-pom.xml -->
                <configuration>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <!--项目中,存在一个jar包,版本不同,还必须保留两种版本-->
                            <!--举例:flink cep和cdc都需要guava,但一个需要18版本,一个需要30版本-->
                            <!--这个表示会生成两个jar包,一个原始的,一个shade的-->
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <!--这个表示生成的shade的包 它的后缀名称是什么,通过这个后缀名称,在引用的时候的,就不会出现引用了shade包的情况了。 -->
                            <finalName>${project.artifactId}-jar-with-dependencies</finalName>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>module-info.class</exclude>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                        <exclude>core-site.xml</exclude>
                                        <exclude>hive-site.xml</exclude>
                                        <exclude>hdfs-site.xml</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>reference.conf</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.slf4j:slf4j-simple</exclude>
                                    <exclude>org.slf4j:slf4j-log4j12</exclude>
                                    <exclude>log4j:log4j</exclude>
                                    <exclude>org.apache.logging.log4j:log4j-api</exclude>
                                    <exclude>org.apache.logging.log4j:log4j-core</exclude>
                                    <exclude>org.apache.logging.log4j:log4j-slf4j-impl</exclude>
                                </excludes>
                            </artifactSet>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190

# python 操作 hbase 已验证

#!/usr/bin/python
# -*- encoding: utf-8 -*-
"""
@Author: apophis
@File: 1.py
@Time: 2024/5/30 23:52
@Description: 工程描述
"""
# 使用happybase,需要提前开启hbase的thrift  /export/software/hbase-2.2.7/bin/hbase-daemon.sh start thrift
# pip3 install happybase -i https://mirrors.aliyun.com/pypi/simple/
import happybase

# 连接并查询全部表
con = happybase.Connection('172.17.0.2')
con.open()  # 打开thrift传输,TCP连接
print(con.tables())  # 输出所有表名 返回一个list
con.close()  # 关闭传输

# 创建表
con = happybase.Connection('172.17.0.2')
con.open()  # 打开thrift传输,TCP连接
families = {
    'name': dict(max_versions=2),  # 设置最大版本为2
    'sex': dict(max_versions=1, block_cache_enabled=False),
    'local': dict(),  # 使用默认值.版本默认为3
}
con.create_table('student', families)  # games是表名,families是列簇,列簇使用字典的形式表示,每个列簇要添加配置选项,配置选项也要用字典表示
print(con.tables())

# 查看表
game_tbl = con.table('student')
info = game_tbl.families()  # 相当于desc
print(info)
# 插入数据
game_tbl = con.table('student')
lol_info = {
    'name:login': 'admin6',
    'name:real': 'zhangsan'
}
game_tbl.put('1001', lol_info)

# 查询数据
game_tbl = con.table('student')
# cell=game_tbl.cells('1001','name:login')[0].decode('utf-8') #获取最后一个版本的cell
cell = game_tbl.cells('1001', 'name:login')  # 获取所有版本的一个list
print(cell)

# 删除表
con.delete_table('student', disable=True)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
Last Updated: 7/3/2024, 1:28:07 PM