clickhouse工具

2/10/2023 clickhouse

# python连接clickhouse

#!/usr/bin/python
# -*- encoding: utf-8 -*-
import clickhouse_connect
import re


def format_sql(sql):
    sql = re.sub("\\s+", " ", sql)
    print(sql)
    return sql


class ClickHouse:
    def __init__(self, host, database, password):
        self.host = host
        self.database = database
        self.password = password
        self.client = clickhouse_connect.get_client(host=self.host, database=self.database, password=self.password)

    def execute(self, sql):
        """
        单纯以sql的方式,执行增删改查功能
        :param sql:
        :return:
        """
        self.client.command(format_sql(sql))

    def insert_df(self, table, df):
        """
        将df插入到表中,表名需要完全表名:库名+表名
        https://github.com/ClickHouse/clickhouse-connect/blob/2dbd25b05621436ffa9ad867647485fa2625a5bb/examples/pandas_examples.py
        :param table:
        :param df:
        :return:
        """
        self.client.insert_df(table, df)

    def query_df(self, sql):
        """
        以dataframe形式返回
        :param sql:
        :return:
        """
        return self.client.query_df(format_sql(sql))

    def query(self, sql):
        """
        以双层list形式返回
        :param sql:
        :return:
        """
        return self.client.query(format_sql(sql)).result_set

    def __query_stream(self, sql):
        """
        clickhouse还有一个query_rows_stream,以流的方式接收数据,避免一次性接收过多的数据导致内存爆掉
        :param sql:
        :return:
        """
        print(self.host, sql)

    def close(self):
        """
        关闭连接
        :return:
        """
        self.client.close()


if __name__ == '__main__':
    ck = ClickHouse("", "", "")
    test_sql = "select address from test_table where date='2022-11-30' limit 10"
    for row in ck.query(test_sql):
        print(row)
    print("-" * 50)
    print(ck.query_df(test_sql))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

# clickhouse表创建语句

# 分布式表创建及删除语句

-- 创建表
CREATE TABLE default.{table_name} on CLUSTER {cluster_name} ( col1 String, date Date, num Int256) ENGINE = Distributed('{cluster_name}', 'default', '{table_name}_local', sipHash64(col1));
CREATE TABLE default.{table_name}_local on CLUSTER {cluster_name}( col1 String, date Date, num Int256) ENGINE = MergeTree PARTITION BY substring(col1, -2) ORDER BY (col1, date) SETTINGS index_granularity = 8192;
-- 删除表
DROP TABLE default.{table_name}_local on CLUSTER {cluster_name};
1
2
3
4
5

# 接收kafka数据的建表方式-性能未验证

CREATE TABLE default.queue_test_logs
(
    `logIndex`         Int128,
    `transactionHash`  String,
    `transactionIndex` Int128,
    `address`          String,
    `data`             String,
    `topics`           Array(String),
    `blockTimestamp`   DateTime,
    `blockNumber`      Int256,
    `blockHash`        String,
    `removed`          Int8
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:9092,kafka2:9092,kafka3:9092', kafka_topic_list = 'test_logs', kafka_group_name = 'test_logs_group', kafka_num_consumers = 1, kafka_skip_broken_messages = 65535, kafka_format = 'JSONEachRow'

CREATE TABLE default.queue_test_token_transfers_eee1111
(
    `tokenAddress`    String,
    `operatorAddress` String,
    `fromAddress`     String,
    `toAddress`       String,
    `transferType`    String,
    `name`            String,
    `decimals`        String,
    `symbol`          String,
    `id`              String,
    `value`           String,
    `ids`             Array(String),
    `values`          Array(String),
    `tokenType`       String,
    `transactionHash` String,
    `logIndex`        Int64,
    `blockTimestamp`  DateTime,
    `blockNumber`     Int128,
    `blockHash`       String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:9092,kafka2:9092,kafka3:9092', kafka_topic_list = 'test_token_transfers_eee1111', kafka_group_name = 'test_token_transfers_eee1111_local_group', kafka_num_consumers = 1, kafka_skip_broken_messages = 65535, kafka_format = 'JSONEachRow'

CREATE
MATERIALIZED VIEW default.mv_kafka_test_token_transfers_eee1111 TO default.test_token_transfers_eee1111
(
    `tokenAddress` String,
    `operatorAddress` String,
    `fromAddress` String,
    `toAddress` String,
    `transferType` String,
    `name` String,
    `decimals` String,
    `symbol` String,
    `id` String,
    `value` String,
    `ids` Array(String),
    `values` Array(String),
    `tokenType` String,
    `transactionHash` String,
    `logIndex` Int64,
    `blockTimestamp` DateTime,
    `blockNumber` Int128,
    `blockHash` String
) AS
SELECT tokenAddress,
       operatorAddress,
       fromAddress,
       toAddress,
       transferType,
       name,
       decimals,
       symbol,
       id,
       value,
       ids,
values,
    tokenType,
    transactionHash,
    logIndex,
    blockTimestamp,
    blockNumber,
    blockHash
FROM default.queue_test_token_transfers_eee1111

CREATE TABLE default.queue_test_token_transfers_eee11
(
    `tokenAddress`    String,
    `fromAddress`     String,
    `toAddress`       String,
    `name`            String,
    `decimals`        String,
    `symbol`          String,
    `value`           String,
    `tokenType`       String,
    `transactionHash` String,
    `logIndex`        Int64,
    `blockTimestamp`  DateTime,
    `blockNumber`     Int128,
    `blockHash`       String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:9092,kafka2:9092,kafka3:9092', kafka_topic_list = 'test_token_transfers_eee11', kafka_group_name = 'test_token_transfers_eee11_group', kafka_num_consumers = 1, kafka_skip_broken_messages = 65535, kafka_format = 'JSONEachRow'

CREATE TABLE default.queue_test_token_transfers_eee111
(
    `tokenAddress`    String,
    `fromAddress`     String,
    `toAddress`       String,
    `name`            String,
    `decimals`        String,
    `symbol`          String,
    `value`           String,
    `tokenType`       String,
    `tokenId`         String,
    `transactionHash` String,
    `logIndex`        Int64,
    `blockTimestamp`  DateTime,
    `blockNumber`     Int128,
    `blockHash`       String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:9092,kafka2:9092,kafka3:9092', kafka_topic_list = 'test_token_transfers_eee111', kafka_group_name = 'test_token_transfers_eee111_group', kafka_num_consumers = 1, kafka_skip_broken_messages = 65535, kafka_format = 'JSONEachRow'

CREATE TABLE default.queue_test_tokens
(
    `address`        String,
    `symbol`         String,
    `name`           String,
    `decimals`       String,
    `totalSupply`    String,
    `blockTimestamp` DateTime,
    `blockNumber`    Int128,
    `blockHash`      String,
    `tokenType`      String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:9092,kafka2:9092,kafka3:9092', kafka_topic_list = 'test_tokens', kafka_group_name = 'test_tokens_group', kafka_num_consumers = 1, kafka_skip_broken_messages = 65535, kafka_format = 'JSONEachRow'

CREATE TABLE default.queue_test_traces
(
    `transactionHash`  String,
    `transactionIndex` Int256,
    `fromAddress`      String,
    `toAddress`        String,
    `value`            String,
    `input`            String,
    `output`           String,
    `traceType`        String,
    `callType`         String,
    `rewardType`       String,
    `gas`              Int256,
    `gasUsed`          Int256,
    `subtraces`        Int256,
    `traceAddress`     Array(String),
    `error`            String,
    `status`           Int8,
    `traceId`          String,
    `traceIndex`       Int256,
    `blockTimestamp`   DateTime,
    `blockNumber`      Int256,
    `blockHash`        String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:9092,kafka2:9092,kafka3:9092', kafka_topic_list = 'test_traces', kafka_group_name = 'test_traces_group', kafka_num_consumers = 1, kafka_skip_broken_messages = 65535, kafka_format = 'JSONEachRow'

CREATE TABLE default.queue_test_transactions
(
    `hash`                     String,
    `nonce`                    Int256,
    `transactionIndex`         Int256,
    `fromAddress`              String,
    `toAddress`                String,
    `value`                    String,
    `gas`                      Int256,
    `gasPrice`                 String,
    `input`                    String,
    `blockTimestamp`           DateTime,
    `blockNumber`              Int128,
    `blockHash`                String,
    `receiptCumulativeGasUsed` Int256,
    `receiptGasUsed`           Int256,
    `receiptContractAddress`   String,
    `receiptRoot`              String,
    `receiptStatus`            Int256,
    `receiptEffectiveGasPrice` Int256,
    `transactionType`          Int256
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:9092,kafka2:9092,kafka3:9092', kafka_topic_list = 'test_transactions', kafka_group_name = 'test_transactions_group', kafka_num_consumers = 1, kafka_skip_broken_messages = 65535, kafka_format = 'JSONEachRow'

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181

# 合并分区

set
max_partitions_per_insert_block=10000000;
optimize
table {local的表} partition ({dt});

-- 例子
optimize
table ck_table_local partition (20230208);

1
2
3
4
5
6
7
8
9

# 删除数据

ALTER TABLE {local的表} on cluster {cluster_name} DELETE WHERE date >='{st}' AND date <'{et}'

--例子
ALTER TABLE ck_table_local on cluster {cluster_name} DELETE WHERE date >='20230208' AND date <'20230209';
1
2
3
4
Last Updated: 6/1/2024, 6:36:28 AM