tdengine

11/17/2023 flinkTDengineSql

建立连接入门 (opens new window)
TDengine Java Connector (opens new window)
TDengine Python Connector (opens new window)
TDengine SQL 查询语法 (opens new window)
TDengine SQL 查询语法部分补充细则及注意事项 (opens new window)
UDF编写 (opens new window)
UDF安装使用 (opens new window)
数据导入 (opens new window)
数据导出 (opens new window)

# 插入及更新技巧

插入及更新技巧

# flink1.12写TDEngine

依据官网的高效写入API (opens new window)

package com.example.person.bigdata.flink.connector.tdengine;

import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.example.person.bigdata.common.model.pub.Column;
import lombok.Builder;
import lombok.Getter;

import java.io.Serializable;
import java.util.List;

/**
 * @Author person
 * @File TDProperties
 * @Time 2023/11/13 15:03
 * @Description 工程描述
 */
@Builder
@Getter
public class TDSinkBuilder implements Serializable {
    @JsonPropertyDescription("批量写入的数据条数")
    private int batchSize;
    @JsonPropertyDescription("批量写入的时间间隔")
    private long batchIntervalMs;
    @JsonPropertyDescription("指定库名")
    private String db;
    @JsonPropertyDescription("指定子表名的字段,注意,是字段")
    private String tbName;
    @JsonPropertyDescription("指定超级表名")
    private String superTb;
    @JsonPropertyDescription("tag名列表")
    private List<Column> tagNames;
    @JsonPropertyDescription("表字段名列表,要求建表字段顺序一致")
    private List<Column> colNames;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.example.person.bigdata.flink.connector.tdengine;

import com.google.common.collect.Iterables;
import com.example.person.bigdata.common.model.EnvConf;
import com.example.person.bigdata.common.model.pub.Column;
import com.example.person.bigdata.common.service.DBService;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
 * @Author person
 * @File TDSink
 * @Time 2023/11/13 10:03
 * @Description 这里使用KeyedProcessFunction, keyBy是使用 表tag字段的hashcode取模 来作为分组键
 */
@Slf4j
public class TDSink extends KeyedProcessFunction<Integer, Map<String, String>, String> implements CheckpointedFunction {
    // sink需要的参数
    private final TDSinkBuilder tdSinkBuilder;
    // 批量写入的缓存状态
    private transient ListState<Map<String, String>> recordState;
    private transient QueryRunner qr;

    public TDSink(TDSinkBuilder tdSinkBuilder) {
        this.tdSinkBuilder = tdSinkBuilder;
    }


    /**
     * 判断字符串与非字符串值,获取字段值的字符串形态
     *
     * @param value
     * @param column
     * @return
     */
    private static String getColValStr(String value, Column column) {
        // 过滤掉null
        if (value == null || EnvConf.NULL.equalsIgnoreCase(value)) return null;
        switch (column.getType()) {
            case "str":
            case "string":
            case "char":
            case "text":
            case "longtext":
            case "varchar":
            case "binary":
                return String.format("'%s'", value);
        }
        return value;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        this.recordState = this.getRuntimeContext().getListState(new ListStateDescriptor<>("record-state", Types.MAP(Types.STRING, Types.STRING)));
        // 连接数据库
        Map<String, String> map = this.getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap();
        HikariDataSource dataSource = DBService.DBServiceBuilder.builder()
                .jdbcUrl(map.get(EnvConf.Component.DB_URL))
                .driverName(DBService.DRIVER_TDENGINE)
                .username(map.get(EnvConf.Component.DB_USERNAME))
                .password(map.get(EnvConf.Component.DB_PASSWORD))
                .maximumPoolSize(Optional.ofNullable(map.get(EnvConf.Component.DB_MAXIMUM_POOL_SIZE)).map(Integer::parseInt).orElse(null))
                .minimumIdle(Optional.ofNullable(map.get(EnvConf.Component.DB_MINIMUM_IDLE)).map(Integer::parseInt).orElse(null))
                .autoCommit(true)
                .readOnly(false)
                .build()
                .getDataSource();
        this.qr = new QueryRunner(dataSource);
    }

    private void flush() throws Exception {
        String db = this.tdSinkBuilder.getDb();
        String superTb = this.tdSinkBuilder.getSuperTb();
        String tbName = this.tdSinkBuilder.getTbName();
        List<Column> colNames = this.tdSinkBuilder.getColNames();
        List<Column> tagNames = this.tdSinkBuilder.getTagNames();

        // key=表名
        Map<String, String> tbValues = new HashMap<>();
        Map<String, String> tbTags = new HashMap<>();
        for (Map<String, String> recordMap : this.recordState.get()) {
            String table = recordMap.get(tbName);
            String values = colNames.stream().map(x -> getColValStr(recordMap.get(x.getName()), x)).collect(Collectors.joining(",", "(", ")"));
            String tags = tagNames.stream().map(x -> getColValStr(recordMap.get(x.getName()), x)).collect(Collectors.joining(",", "(", ")"));
            // 用于后面的insert sql
            tbValues.put(table, tbValues.getOrDefault(table, "") + values);
            // 用于后面的创建表 ddl sql
            tbTags.put(table, tags);
        }

        // 批量执行sql
        StringBuilder insertBuilder = new StringBuilder("INSERT INTO ");
        StringBuilder ddlBuilder = new StringBuilder("CREATE TABLE ");
        for (Map.Entry<String, String> entry : tbValues.entrySet()) {
            String table = entry.getKey();
            String values = entry.getValue();
            String q = String.format("%s.%s values %s ", db, table, values);
            if (insertBuilder.length() + q.length() > 800_000) {
                this.executeSQL(insertBuilder.toString(), ddlBuilder.toString());
                log.warn("increase maxSQLLength or decrease maxBatchSize to gain better performance");
                insertBuilder = new StringBuilder("INSERT INTO ");
                ddlBuilder = new StringBuilder("CREATE TABLE ");
            }
            insertBuilder.append(q);
            ddlBuilder.append("IF NOT EXISTS ")
                    .append(db).append(".").append(table)
                    .append(" USING ")
                    .append(db).append(".").append(superTb)
                    .append(" TAGS ")
                    .append(tbTags.get(table))
                    .append(" ");
        }
        // 收尾
        this.executeSQL(insertBuilder.toString(), ddlBuilder.toString());
        // 清空状态
        this.recordState.clear();
    }

    /**
     * 先执行insert,如果表不存在,执行ddl,再执行insert
     *
     * @param insert
     * @param ddl
     * @throws SQLException
     */
    private void executeSQL(String insert, String ddl) throws SQLException {
        try {
            this.qr.update(insert);
        } catch (SQLException e) {
            // convert to error code defined in taoserror.h
            int errorCode = e.getErrorCode() & 0xffff;
            if (errorCode == 0x2603) {
                // Table does not exist
                this.qr.update(ddl);
                this.executeSQL(insert, "no ddl,here is final");
            } else {
                log.error("Execute SQL: {}", insert);
                throw e;
            }
        } catch (Throwable throwable) {
            log.error("Execute SQL: {}", insert);
            throw throwable;
        }
    }

    @Override
    public void processElement(Map<String, String> value, KeyedProcessFunction<Integer, Map<String, String>, String>.Context ctx, Collector<String> out) throws Exception {
        int buffedSize = Iterables.size(this.recordState.get());
        // 定时器
        if (buffedSize == 0) {
            ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + this.tdSinkBuilder.getBatchIntervalMs());
        }
        this.recordState.add(value);
        buffedSize += 1;
        // 判断是否到阈值 是否加入数据
        if (buffedSize <= this.tdSinkBuilder.getBatchSize()) return;
        // 写表
        this.flush();
    }

    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<Integer, Map<String, String>, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
        // 定时执行 写表
        if (Iterables.size(this.recordState.get()) > 0) this.flush();
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // savepoint checkpoint 写表
        if (Iterables.size(this.recordState.get()) > 0) this.flush();
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
Last Updated: 6/1/2024, 6:36:28 AM